Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/cmcp_runtime/audit/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

if TYPE_CHECKING:
from cmcp_runtime.audit.store import SqliteAuditStore
from cmcp_runtime.tee.base import AttestationReport

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,6 +108,12 @@ def __init__(self, session_id: str, store: SqliteAuditStore | None = None) -> No
self._store = store
# AUDIT-002: TEE-anchored chain root. None until set_tee_anchor() is called.
self._tee_anchor: str | None = None
# AUDIT-006: per-session attestation report whose report_data commits the
# chain root (report_data[32:64] == SHA-256(chain_root)). None until
# set_session_report() is called. When present, the claim is built from
# this report instead of the shared startup report, so the hardware-signed
# report_data carries this session's chain-root commitment.
self._session_report: AttestationReport | None = None
self._append_session_start()

def _append_session_start(self) -> None:
Expand Down Expand Up @@ -151,6 +158,25 @@ def tee_anchor(self) -> str | None:
"""The TEE-committed chain root, or None if not yet anchored (dev/Level-0 mode)."""
return self._tee_anchor

def set_session_report(self, report: AttestationReport) -> None:
"""
AUDIT-006: record the per-session attestation report whose report_data
commits this chain's root.

The caller produces this report at session start by submitting a nonce of
the form jwk_thumbprint(key) || SHA-256(chain_root) to the TEE (see
tee.base.make_audit_bound_nonce). close_session() then builds the claim
from this report so the hardware-signed report_data surfaced in the claim
binds the chain root. If never set (TEE call failed / dev fallback), the
claim falls back to the shared startup report and a warning is emitted.
"""
self._session_report = report

@property
def session_report(self) -> AttestationReport | None:
"""The per-session chain-root-committing attestation report, or None."""
return self._session_report

def append(
self,
entry_type: EntryType,
Expand Down
71 changes: 48 additions & 23 deletions src/cmcp_runtime/session/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from cmcp_runtime.session.call_log import CallLog, SessionCallLog
from cmcp_runtime.session.state import SessionState
from cmcp_runtime.startup import RuntimeContext
from cmcp_runtime.tee.base import AttestationReport, make_audit_bound_nonce

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,13 +62,18 @@ def create_session(self) -> tuple[SessionState, AuditChain]:
"""
Create a new session. Returns (state, chain).

AUDIT-002: after constructing the chain, request a per-session TEE
attestation report whose nonce encodes the chain root. This commits
the root into hardware-attested evidence so an attacker cannot silently
swap out the chain and pass verify_chain(). In dev / Level-0 mode
(software-only provider) the anchor is still set so that verify_chain()
performs the root comparison - the security guarantee is limited to what
a software TEE provides, and a warning is emitted.
AUDIT-002 / AUDIT-006: after constructing the chain, request a per-session
TEE attestation report whose nonce commits the chain root. The nonce is
jwk_thumbprint(key) || SHA-256(chain_root) so the hardware-signed
report_data binds BOTH the gateway key (report_data[:32]) and this chain's
root (report_data[32:64]). This report is stored on the chain and used to
build the claim in close_session(), so the chain root reaches the verifier
inside the attested report_data - not just as an unbound advisory field.
A rogue operator who rebuilds a fresh, internally-consistent chain gets a
different root that no longer matches report_data[32:64], so verification
fails. In dev / Level-0 mode (software-only provider) the anchor is still
set and the report is still stored so the binding is exercised - the
security guarantee is limited to what a software TEE provides.
"""
# Kill switch: reject sessions for blocked agent identities before allocating resources.
binding = getattr(self._ctx, "agent_manifest", None)
Expand All @@ -82,27 +88,41 @@ def create_session(self) -> tuple[SessionState, AuditChain]:
state = SessionState(session_id=session_id)
chain = AuditChain(session_id=session_id, store=self._ctx.audit_store)

# AUDIT-002: derive a per-session nonce that encodes the chain root so
# the TEE report binds this specific chain to the attestation evidence.
# nonce = SHA-256(chain_root_bytes || session_id_bytes)
# AUDIT-006: derive a per-session nonce that commits the chain root into
# report_data, then KEEP the resulting report so close_session() builds the
# claim from it. nonce = jwk_thumbprint(key) || SHA-256(chain_root):
# report_data[:32] -> gateway key (unchanged key binding, CRYPTO-001)
# report_data[32:64] -> SHA-256(chain_root) (new chain-root commitment)
chain_root = chain.chain_root
nonce = hashlib.sha256(
bytes.fromhex(chain_root) + session_id.encode()
).digest()

try:
self._ctx.tee_provider.get_attestation_report(nonce)
# The report itself is not stored here - the startup-time report in
# ctx.attestation_report already covers the gateway instance. What
# matters is that the nonce (containing chain_root) was submitted to
# the TEE, making chain_root part of the attested evidence.
nonce = make_audit_bound_nonce(
self._ctx.signing_key.public_key_bytes, chain_root
)
report = self._ctx.tee_provider.get_attestation_report(nonce)
# AUDIT-006: store the report so its report_data (committing chain_root)
# is the one surfaced in the claim, not the shared startup report.
# Guard on the concrete type so a provider that returns something
# malformed cannot displace the well-formed startup report.
if isinstance(report, AttestationReport):
chain.set_session_report(report)
else:
logger.warning(
"AUDIT-006: per-session TEE provider returned a %s, not an "
"AttestationReport - chain root is not hardware-bound into "
"report_data. session_id=%s",
type(report).__name__,
session_id,
)
except Exception as exc:
# Non-fatal: log and continue. The anchor is still set so that
# internal chain-substitution detection works. In production,
# callers should validate that the TEE provider is not software-only.
# internal chain-substitution detection works. The claim falls back
# to the shared startup report (no chain-root commitment) and the
# verifier will flag the missing binding. In production, callers
# should validate that the TEE provider is not software-only.
logger.warning(
"AUDIT-002: per-session TEE attestation call failed - "
"chain root is not hardware-anchored. session_id=%s error=%s",
"AUDIT-006: per-session TEE attestation call failed - "
"chain root is not hardware-bound into report_data. session_id=%s error=%s",
session_id,
exc,
)
Expand Down Expand Up @@ -134,7 +154,12 @@ def close_session(
)

ctx = self._ctx
report = ctx.attestation_report
# AUDIT-006: prefer the per-session report whose report_data commits this
# chain's root. Fall back to the shared startup report only if the
# per-session TEE call failed at create_session() time (a warning was
# already emitted there); in that case the chain root is not bound into
# report_data and a strict verifier will reject the claim.
report = chain.session_report or ctx.attestation_report

# Convert AttestationReport (datetime) to AttestationReportInfo (str).
generated_at_str = report.attestation_generated_at.isoformat()
Expand Down
29 changes: 29 additions & 0 deletions src/cmcp_runtime/tee/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,35 @@ def make_nonce(tee_public_key: bytes, salt: bytes) -> bytes:
return jwk_thumbprint(tee_public_key) + salt


def audit_root_commitment(chain_root_hex: str) -> bytes:
"""SHA-256 over the audit-chain root bytes: the 32-byte commitment bound into
the attestation report's second nonce half (AUDIT-006).

chain_root_hex is the hex-encoded chain root (the hash of the session_start
entry, immutable for the session's lifetime). A verifier re-derives this from
gateway.audit_chain.root and compares it against report_data[32:64].
"""
return hashlib.sha256(bytes.fromhex(chain_root_hex)).digest()


def make_audit_bound_nonce(tee_public_key: bytes, chain_root_hex: str) -> bytes:
"""Compute the per-session attestation nonce that commits the audit-chain root.

Layout (64 bytes, AUDIT-006):

jwk_thumbprint(pubkey) (32) || SHA-256(chain_root_bytes) (32)

The first 32 bytes keep the existing key binding intact (report_data[:32] is
the RFC 7638 thumbprint, re-derivable from cnf.jwk.x). The second 32 bytes
commit the audit-chain root into the hardware-signed report_data so a rogue
operator who rebuilds a fresh, internally-consistent chain produces a
different root that no longer matches report_data[32:64]. Freshness is
preserved because the chain root is unique per session (it hashes the
session_id and the session_start timestamp).
"""
return jwk_thumbprint(tee_public_key) + audit_root_commitment(chain_root_hex)


class SoftwareOnlyProvider(TEEProvider):
"""
Software-only attestation stub for CI and local development.
Expand Down
115 changes: 115 additions & 0 deletions src/cmcp_verify/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import base64
import hashlib
import hmac
import json
import logging
import re
Expand Down Expand Up @@ -91,6 +92,7 @@ class VerificationError(StrEnum):
CATALOG_HASH_MISMATCH = "CATALOG_HASH_MISMATCH"
ATTESTATION_STALE = "ATTESTATION_STALE"
CHAIN_BROKEN = "CHAIN_BROKEN"
CHAIN_ROOT_NOT_BOUND = "CHAIN_ROOT_NOT_BOUND"
CLAIM_MALFORMED = "CLAIM_MALFORMED"
HARDWARE_ATTESTATION_FAILED = "HARDWARE_ATTESTATION_FAILED"
AGENT_MANIFEST_MISMATCH = "AGENT_MANIFEST_MISMATCH"
Expand Down Expand Up @@ -268,6 +270,95 @@ def _check_audit_chain(claim: dict[str, Any]) -> tuple[bool, str | None]:
return True, None


def _check_audit_chain_binding(
claim: dict[str, Any],
*,
is_sw_only: bool,
) -> tuple[bool | None, str | None]:
"""
AUDIT-006: verify that the audit-chain root is committed to the hardware-signed
report_data, not merely carried as an unauthenticated advisory field.

The gateway submits a per-session attestation nonce of the form

jwk_thumbprint(key) (32) || SHA-256(chain_root_bytes) (32)

so the TEE commits SHA-256(chain_root) into report_data[32:64]. The nonce is
surfaced as trace.runtime.nonce (base64url of the full 64-byte value).

The verifier re-derives SHA-256(SHA-256-hex-decode(gateway.audit_chain.root))
and compares it constant-time against nonce[32:64]. A mismatch means the chain
root in the claim is NOT the one attested by the hardware: a rogue operator who
rebuilt a fresh, internally-consistent chain (different root) re-signed the claim
with the in-enclave key but could not forge the TEE-committed report_data. This
is FATAL.

Returns:
(True, None) -- chain root commitment matches report_data[32:64]
(False, reason) -- mismatch / missing commitment; reject (fail closed)
(None, warning_msg) -- software-only / Level-0 mode; not hardware-backed
"""
root = claim.get("gateway", {}).get("audit_chain", {}).get("root", "")
if not root:
# _check_audit_chain already reports the empty-root failure; nothing to bind.
return False, "gateway.audit_chain.root is empty -- cannot verify chain-root binding"

# The chain root may carry a "sha256:" prefix in some serializations; the bytes
# committed to the TEE are those of the bare hex digest (the entry_hash).
root_hex = root.removeprefix("sha256:").removeprefix("sha384:")
try:
root_bytes = bytes.fromhex(root_hex)
except ValueError as exc:
return False, f"gateway.audit_chain.root is not valid hex: {exc}"
expected_commitment = hashlib.sha256(root_bytes).digest()

nonce_b64 = claim.get("trace", {}).get("runtime", {}).get("nonce", "")
if not nonce_b64:
if is_sw_only:
return None, "software-only mode -- chain-root binding not applicable"
return False, (
"trace.runtime.nonce is absent -- attestation report_data does not "
"commit the audit-chain root"
)

try:
padding = 4 - (len(nonce_b64) % 4)
padded = nonce_b64 + ("=" * padding if padding != 4 else "")
nonce_bytes = base64.urlsafe_b64decode(padded)
except Exception as exc:
return False, f"cannot decode trace.runtime.nonce: {exc}"

if len(nonce_bytes) < 64:
if is_sw_only:
return None, (
"software-only mode -- report_data does not carry a chain-root "
"commitment in bytes [32:64]"
)
return False, (
f"trace.runtime.nonce is too short ({len(nonce_bytes)} bytes); "
"expected 64 bytes (key fingerprint || chain-root commitment)"
)

actual_commitment = nonce_bytes[32:64]
if not hmac.compare_digest(actual_commitment, expected_commitment):
# A mismatch is always fatal, including in software-only mode: the chain
# root presented in the claim is not the one bound into report_data.
return False, (
"gateway.audit_chain.root does not match report_data[32:64] -- the "
"audit-chain root was not committed to this attestation report; the "
"chain may have been substituted after attestation"
)

if is_sw_only:
logger.warning(
"AUDIT-006: software-only (dev) mode -- chain-root commitment matches "
"but provides no hardware provenance guarantee"
)
return None, "software-only mode -- chain-root binding not hardware-backed"

return True, None


def _validate_schema(claim: dict[str, Any]) -> tuple[bool, str | None]:
"""Validate claim structure using the RuntimeClaim Pydantic model."""
try:
Expand Down Expand Up @@ -472,6 +563,7 @@ def verify_trace_claim(
issuer keys are provided.
6. Attestation freshness check
7. Audit chain consistency check
7b. AUDIT-006: audit-chain root binding -- report_data[32:64] commits SHA-256(chain_root)
8. Platform-specific attestation verification (dispatched per-platform)

Returns VerificationResult with status and details.
Expand Down Expand Up @@ -646,6 +738,29 @@ def verify_trace_claim(
if chain_err:
details["chain_error"] = chain_err

# Step 7b: AUDIT-006 -- audit-chain root binding into report_data.
# The chain root must be committed to the hardware-signed report_data
# (report_data[32:64] == SHA-256(chain_root)), not merely asserted in the
# advisory gateway.audit_chain.root field. A mismatch is FATAL: it means a
# rogue operator rebuilt the chain and re-signed the claim but could not forge
# the TEE-committed commitment. Only attempted when the chain is well-formed.
if chain_ok:
root_binding, root_binding_msg = _check_audit_chain_binding(
claim_json, is_sw_only=_is_sw_only
)
if root_binding is True:
verified.append("audit_chain_binding")
elif root_binding is False:
unverified.append("audit_chain_binding")
failure = failure or VerificationError.CHAIN_ROOT_NOT_BOUND
details["audit_chain_binding"] = (
root_binding_msg or "audit-chain root binding verification failed"
)
else:
# software-only / Level-0 mode: not hardware-backed, no penalty/credit.
if root_binding_msg:
details["audit_chain_binding"] = root_binding_msg

# Step 8: Platform-specific attestation
platform = _runtime.get("platform", "")

Expand Down
31 changes: 19 additions & 12 deletions tests/unit/test_audit_chain_anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,35 +143,42 @@ def test_create_session_sets_tee_anchor():


def test_create_session_calls_tee_provider_with_chain_root_nonce():
"""create_session must pass a nonce derived from the chain root to the TEE provider."""
"""create_session must pass the AUDIT-006 audit-bound nonce to the TEE provider.

nonce = jwk_thumbprint(key) (32) || SHA-256(chain_root) (32) = 64 bytes.
"""
ctx = _make_ctx_with_tee()
mgr = SessionManager(ctx)
_, chain = mgr.create_session()

# The nonce is SHA-256(chain_root_bytes || session_id_bytes).
# We can verify the TEE provider was called (exactly once) with a 32-byte nonce.
ctx.tee_provider.get_attestation_report.assert_called_once()
call_args = ctx.tee_provider.get_attestation_report.call_args
nonce_arg = call_args[0][0]
assert isinstance(nonce_arg, bytes)
assert len(nonce_arg) == 32
assert len(nonce_arg) == 64


def test_create_session_anchor_nonce_encodes_chain_root():
"""Verify that the nonce passed to TEE matches SHA-256(chain_root_bytes || session_id_bytes)."""
"""AUDIT-006: report_data[32:64] must equal SHA-256(chain_root_bytes).

The first 32 bytes are the RFC 7638 JWK thumbprint of the gateway key (key
binding, unchanged); the second 32 bytes commit the chain root.
"""
from cmcp_runtime.tee.base import jwk_thumbprint

ctx = _make_ctx_with_tee()
mgr = SessionManager(ctx)
state, chain = mgr.create_session()
_, chain = mgr.create_session()

# Re-derive the expected nonce.
chain_root = chain.chain_root
session_id = state.session_id
expected_nonce = hashlib.sha256(
bytes.fromhex(chain_root) + session_id.encode()
).digest()
expected_nonce = (
jwk_thumbprint(ctx.signing_key.public_key_bytes)
+ hashlib.sha256(bytes.fromhex(chain_root)).digest()
)

actual_nonce = ctx.tee_provider.get_attestation_report.call_args[0][0]
assert actual_nonce == expected_nonce
assert actual_nonce[32:64] == hashlib.sha256(bytes.fromhex(chain_root)).digest()


def test_verify_chain_passes_after_create_session():
Expand Down Expand Up @@ -215,4 +222,4 @@ def test_tee_provider_failure_still_sets_anchor(caplog):
_, chain = mgr.create_session()

assert chain.tee_anchor is not None
assert "chain root is not hardware-anchored" in caplog.text
assert "chain root is not hardware-bound into report_data" in caplog.text
Loading