Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 201 additions & 2 deletions src/cmcp_verify/tdx.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import urllib.request
from dataclasses import dataclass, field

from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
from cryptography.hazmat.primitives.hashes import SHA256


class _TdReport(ctypes.LittleEndianStructure):
"""Named-field representation of the raw TDREPORT buffer returned by the
Expand Down Expand Up @@ -68,6 +74,8 @@ def verify_tdx_measurement(
measurement: str,
raw_evidence: bytes | None,
report_data_hex: str | None = None,
raw_quote: bytes | None = None,
trusted_intel_root_pem: bytes | None = None,
) -> TDXVerificationResult:
"""
Verify an Intel TDX attestation measurement.
Expand Down Expand Up @@ -155,10 +163,23 @@ def verify_tdx_measurement(
result.details["dcap_chain"] = "requires_intel_dcap_service"
return result

# Step 3: DCAP collateral -- network check
# Step 3: DCAP quote verification (offline, fail-closed) when a full quote and a
# pinned Intel SGX root are supplied. The PCK chain travels inside the quote, so
# no network is needed at verify time. Otherwise fall back to a reachability note
# and leave the quote signature unverified.
if raw_quote is not None and trusted_intel_root_pem is not None:
q = verify_tdx_quote(raw_quote, trusted_intel_root_pem, report_data_hex)
result.verified_fields.extend(q.verified_fields)
result.unverified_fields.extend(q.unverified_fields)
result.details.update(q.details)
if not q.verified:
result.verified = False
result.failure_reason = result.failure_reason or q.failure_reason
return result

if _check_dcap_reachable():
result.details["dcap_qe_identity"] = "reachable"
# Full Quote verification requires dcap-provider library -- mark unverified
# Full Quote verification requires a quote + pinned root -- mark unverified
result.unverified_fields.append("dcap_quote_signature")
result.details["dcap_chain"] = "dcap_service_reachable_full_verification_not_implemented"
else:
Expand All @@ -167,3 +188,181 @@ def verify_tdx_measurement(
result.details["dcap_endpoint"] = _DCAP_QE_IDENTITY_URL

return result


# --- DCAP TD quote v4 verification (issue #370, TDX portion) ------------------
#
# Offline verification of an Intel TDX ECDSA (att_key_type=2) quote. No network at
# verify time: the PCK cert chain travels inside the quote's certification data and
# the Intel SGX Provisioning Certification Root is pinned by the caller.
#
# TD Quote v4 layout (Intel DCAP): 48-byte header, 584-byte TD report body, a
# uint32 signature-data length, then the signature data (quote signature 64B,
# ECDSA attestation key 64B, QE report 384B, QE report signature 64B, QE auth data,
# and certification data carrying the PCK chain). The offsets below are per the Intel
# TDX DCAP spec and MUST be confirmed against a real Azure TDX quote fixture
# (capture-tdx-azure.sh); the skipped hardware test asserts them, which also settles
# the report_data offset in issue #371.
_QUOTE_HEADER_LEN = 48
_TD_REPORT_BODY_LEN = 584
_SIGNED_REGION_LEN = _QUOTE_HEADER_LEN + _TD_REPORT_BODY_LEN # 632
_TD_BODY_REPORT_DATA_OFF = 520 # report_data sits after the RTMRs in the TD body
_QE_REPORT_LEN = 384
_QE_REPORT_DATA_OFF = 320 # report_data offset within the SGX QE report
_ATT_KEY_TYPE_ECDSA_P256 = 2


def _raw_p256_sig_to_der(sig64: bytes) -> bytes:
if len(sig64) != 64:
raise ValueError(f"expected 64-byte raw ECDSA sig, got {len(sig64)}")
r = int.from_bytes(sig64[:32], "big")
s = int.from_bytes(sig64[32:], "big")
return encode_dss_signature(r, s)


def _p256_pubkey_from_raw(xy: bytes) -> ec.EllipticCurvePublicKey:
if len(xy) != 64:
raise ValueError(f"expected 64-byte raw P-256 pubkey, got {len(xy)}")
return ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), b"\x04" + xy)


def _verify_p256(pub: ec.EllipticCurvePublicKey, sig64: bytes, data: bytes) -> bool:
try:
pub.verify(_raw_p256_sig_to_der(sig64), data, ec.ECDSA(SHA256()))
return True
except (InvalidSignature, ValueError):
return False


def _cert_signed_by(child: x509.Certificate, issuer: x509.Certificate) -> bool:
# PCK chain certs are ECDSA P-256.
pub = issuer.public_key()
if not isinstance(pub, ec.EllipticCurvePublicKey):
return False
try:
pub.verify(child.signature, child.tbs_certificate_bytes,
ec.ECDSA(child.signature_hash_algorithm))
return True
except (InvalidSignature, ValueError):
return False


@dataclass
class _ParsedQuote:
signed_region: bytes
report_data: bytes
quote_sig: bytes
att_pubkey_raw: bytes
qe_report: bytes
qe_report_sig: bytes
qe_auth_data: bytes
pck_chain_pem: bytes


def parse_td_quote(quote: bytes) -> _ParsedQuote:
"""Parse an Intel TDX ECDSA v4 quote. Raises ValueError on malformed input.

Offsets are per the Intel TDX DCAP spec; confirm against a real fixture.
"""
if len(quote) < _SIGNED_REGION_LEN + 4:
raise ValueError("quote too short for header + TD report body + sig length")
att_key_type = int.from_bytes(quote[2:4], "little")
if att_key_type != _ATT_KEY_TYPE_ECDSA_P256:
raise ValueError(f"unsupported att_key_type {att_key_type} (expected ECDSA-P256)")
signed_region = quote[:_SIGNED_REGION_LEN]
body = quote[_QUOTE_HEADER_LEN:_SIGNED_REGION_LEN]
report_data = body[_TD_BODY_REPORT_DATA_OFF:_TD_BODY_REPORT_DATA_OFF + 64]
off = _SIGNED_REGION_LEN
sig_len = int.from_bytes(quote[off:off + 4], "little")
off += 4
sig_data = quote[off:off + sig_len]
if len(sig_data) < 64 + 64 + _QE_REPORT_LEN + 64 + 2:
raise ValueError("signature data truncated")
p = 0
quote_sig = sig_data[p:p + 64]; p += 64
att_pubkey_raw = sig_data[p:p + 64]; p += 64
qe_report = sig_data[p:p + _QE_REPORT_LEN]; p += _QE_REPORT_LEN
qe_report_sig = sig_data[p:p + 64]; p += 64
qe_auth_len = int.from_bytes(sig_data[p:p + 2], "little"); p += 2
qe_auth_data = sig_data[p:p + qe_auth_len]; p += qe_auth_len
p += 2 # cert_data_type
cert_size = int.from_bytes(sig_data[p:p + 4], "little"); p += 4
pck_chain_pem = sig_data[p:p + cert_size]
return _ParsedQuote(signed_region, report_data, quote_sig, att_pubkey_raw,
qe_report, qe_report_sig, qe_auth_data, pck_chain_pem)


def verify_tdx_quote(
quote: bytes,
trusted_intel_root_pem: bytes,
expected_report_data_hex: str | None = None,
) -> TDXVerificationResult:
"""Offline DCAP verification of a TDX ECDSA quote, fail-closed.

Verifies: the quote signature over header+body by the attestation key; the
attestation key is bound into the QE report_data; the QE report is signed by the
PCK leaf; and the PCK chain verifies to the pinned Intel root. TCB status and QE
identity need Intel PCS collateral by FMSPC and are NOT checked here; they stay
in unverified_fields (do not treat this result as full TCB appraisal).
"""
def fail(reason: str) -> TDXVerificationResult:
return TDXVerificationResult(
verified=False, failure_reason=reason,
unverified_fields=["dcap_quote_signature", "tcb_status"])

try:
pq = parse_td_quote(quote)
except ValueError as exc:
return fail(f"quote_parse_error: {exc}")

# 1) quote signature over header+body by the attestation key
try:
att_pub = _p256_pubkey_from_raw(pq.att_pubkey_raw)
except ValueError as exc:
return fail(f"attestation_key_invalid: {exc}")
if not _verify_p256(att_pub, pq.quote_sig, pq.signed_region):
return fail("quote_signature_invalid")

# 2) attestation key bound into QE report_data[:32] = SHA256(att_key || qe_auth)
qe_rd = pq.qe_report[_QE_REPORT_DATA_OFF:_QE_REPORT_DATA_OFF + 64]
if qe_rd[:32] != hashlib.sha256(pq.att_pubkey_raw + pq.qe_auth_data).digest():
return fail("attestation_key_not_bound_to_qe")

# 3) QE report signed by the PCK leaf; 4) PCK chain to the pinned Intel root
try:
certs = x509.load_pem_x509_certificates(pq.pck_chain_pem)
except ValueError as exc:
return fail(f"pck_chain_parse_error: {exc}")
if not certs:
return fail("pck_chain_empty")
pck_pub = certs[0].public_key()
if not isinstance(pck_pub, ec.EllipticCurvePublicKey) or \
not _verify_p256(pck_pub, pq.qe_report_sig, pq.qe_report):
return fail("qe_report_signature_invalid")
try:
root = x509.load_pem_x509_certificate(trusted_intel_root_pem)
except ValueError as exc:
return fail(f"intel_root_parse_error: {exc}")
if not _cert_signed_by(root, root):
return fail("intel_root_not_self_signed")
for child, issuer in zip([*certs, root], [*certs[1:], root]):
if not _cert_signed_by(child, issuer):
return fail("pck_chain_invalid")

result = TDXVerificationResult(verified=True)
result.verified_fields.extend(["dcap_quote_signature", "pck_chain"])

# optional: confirm report_data binds our expected value (nonce / cnf)
if expected_report_data_hex is not None:
exp = bytes.fromhex(expected_report_data_hex[:128]).ljust(64, b"\x00")
if pq.report_data == exp:
result.verified_fields.append("report_data")
else:
result.verified = False
result.failure_reason = "report_data_mismatch"
return result

# TCB status / QE identity need Intel PCS collateral by FMSPC (not done offline).
result.unverified_fields.append("tcb_status")
result.details["tcb_status"] = "requires_intel_pcs_collateral_by_fmspc"
return result
151 changes: 151 additions & 0 deletions tests/unit/test_tdx_quote_verify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""TDX DCAP quote-verification tests (issue #370, TDX portion).

These exercise the verification LOGIC against a locally generated, synthetic TDX
ECDSA v4 quote and a synthetic PCK chain (leaf -> intermediate -> root), so parsing
and all four checks (quote signature, attestation-key binding, QE report signature,
PCK chain to a pinned root) run end to end. The real-hardware test is marked skipped
below and unblocks when an Azure TDX quote fixture lands (also settles the real
report_data offset for #371).
"""
from __future__ import annotations

import hashlib
import os
from datetime import UTC, datetime, timedelta

import pytest
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509.oid import NameOID

from cmcp_verify.tdx import (
_QE_REPORT_DATA_OFF,
_QUOTE_HEADER_LEN,
_TD_BODY_REPORT_DATA_OFF,
_TD_REPORT_BODY_LEN,
verify_tdx_quote,
)

_RD = b"cmcp-tdx-fixture-v1".ljust(64, b"\0") # known report_data (matches capture script)


def _name(cn: str) -> x509.Name:
return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])


def _cert(subject: str, issuer: str, sub_pub, iss_priv) -> x509.Certificate:
now = datetime.now(UTC)
return (
x509.CertificateBuilder()
.subject_name(_name(subject))
.issuer_name(_name(issuer))
.public_key(sub_pub)
.serial_number(x509.random_serial_number())
.not_valid_before(now - timedelta(days=1))
.not_valid_after(now + timedelta(days=3650))
.sign(iss_priv, hashes.SHA256())
)


def _raw_sig(priv, data: bytes) -> bytes:
r, s = decode_dss_signature(priv.sign(data, ec.ECDSA(hashes.SHA256())))
return r.to_bytes(32, "big") + s.to_bytes(32, "big")


def _raw_pub(pub) -> bytes:
n = pub.public_numbers()
return n.x.to_bytes(32, "big") + n.y.to_bytes(32, "big")


def _pck_chain():
root_k = ec.generate_private_key(ec.SECP256R1())
inter_k = ec.generate_private_key(ec.SECP256R1())
leaf_k = ec.generate_private_key(ec.SECP256R1())
root = _cert("Intel Root", "Intel Root", root_k.public_key(), root_k) # self-signed
inter = _cert("Intel PCK Intermediate", "Intel Root", inter_k.public_key(), root_k)
leaf = _cert("Intel PCK Leaf", "Intel PCK Intermediate", leaf_k.public_key(), inter_k)
chain_pem = leaf.public_bytes(Encoding.PEM) + inter.public_bytes(Encoding.PEM)
return chain_pem, root.public_bytes(Encoding.PEM), leaf_k, root_k


def _build_quote(*, report_data: bytes = _RD, qe_auth: bytes = b"") -> tuple[bytes, bytes]:
"""Return (quote_bytes, trusted_intel_root_pem) for a well-formed synthetic quote."""
att_k = ec.generate_private_key(ec.SECP256R1())
att_pub_raw = _raw_pub(att_k.public_key())

header = bytearray(_QUOTE_HEADER_LEN)
header[2:4] = (2).to_bytes(2, "little") # att_key_type = ECDSA-P256
body = bytearray(_TD_REPORT_BODY_LEN)
body[_TD_BODY_REPORT_DATA_OFF:_TD_BODY_REPORT_DATA_OFF + 64] = report_data
signed_region = bytes(header) + bytes(body)
quote_sig = _raw_sig(att_k, signed_region)

chain_pem, root_pem, leaf_k, _root_k = _pck_chain()
qe_report = bytearray(384)
bind = hashlib.sha256(att_pub_raw + qe_auth).digest()
qe_report[_QE_REPORT_DATA_OFF:_QE_REPORT_DATA_OFF + 32] = bind
qe_report_sig = _raw_sig(leaf_k, bytes(qe_report))

sig = bytearray()
sig += quote_sig
sig += att_pub_raw
sig += bytes(qe_report)
sig += qe_report_sig
sig += len(qe_auth).to_bytes(2, "little") + qe_auth
sig += (5).to_bytes(2, "little") # cert_data_type (PCK chain)
sig += len(chain_pem).to_bytes(4, "little") + chain_pem

quote = signed_region + len(sig).to_bytes(4, "little") + bytes(sig)
return quote, root_pem


def test_valid_quote_verifies() -> None:
quote, root = _build_quote()
r = verify_tdx_quote(quote, root, _RD.hex())
assert r.verified, r.failure_reason
assert "dcap_quote_signature" in r.verified_fields
assert "pck_chain" in r.verified_fields
assert "report_data" in r.verified_fields
assert "tcb_status" in r.unverified_fields # honest: not appraised offline


def test_tampered_quote_signature_fails_closed() -> None:
quote, root = _build_quote()
corrupted = bytearray(quote)
corrupted[0] ^= 0xFF # flip a byte inside the signed region
r = verify_tdx_quote(bytes(corrupted), root)
assert not r.verified
assert r.failure_reason == "quote_signature_invalid"


def test_wrong_pinned_root_fails_closed() -> None:
quote, _good_root = _build_quote()
_q2, other_root = _build_quote() # a different, untrusted root
r = verify_tdx_quote(quote, other_root)
assert not r.verified
assert r.failure_reason == "pck_chain_invalid"


def test_report_data_mismatch_fails() -> None:
quote, root = _build_quote(report_data=b"something-else".ljust(64, b"\0"))
r = verify_tdx_quote(quote, root, _RD.hex())
assert not r.verified
assert r.failure_reason == "report_data_mismatch"


@pytest.mark.skipif(
not os.environ.get("CMCP_TDX_FIXTURE_DIR"),
reason="needs a real Azure TDX quote fixture (capture-tdx-azure.sh); set CMCP_TDX_FIXTURE_DIR",
)
def test_real_azure_tdx_quote() -> None:
d = os.environ["CMCP_TDX_FIXTURE_DIR"]
quote = open(os.path.join(d, "tdx_quote.bin"), "rb").read()
root = open(os.path.join(d, "collateral", "intel_root_ca.pem"), "rb").read()
expected_rd = open(os.path.join(d, "report_data.hex")).read().strip()
r = verify_tdx_quote(quote, root, expected_rd)
assert r.verified, r.failure_reason
# confirms the report_data offset used by parse_td_quote is correct (issue #371)
assert "report_data" in r.verified_fields
Loading