diff --git a/src/cmcp_verify/tdx.py b/src/cmcp_verify/tdx.py index 2bd5bac..8b7ebdb 100644 --- a/src/cmcp_verify/tdx.py +++ b/src/cmcp_verify/tdx.py @@ -6,6 +6,12 @@ import urllib.request from dataclasses import dataclass, field +from cryptography import x509 +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature +from cryptography.hazmat.primitives.hashes import SHA256 + class _TdReport(ctypes.LittleEndianStructure): """Named-field representation of the raw TDREPORT buffer returned by the @@ -68,6 +74,8 @@ def verify_tdx_measurement( measurement: str, raw_evidence: bytes | None, report_data_hex: str | None = None, + raw_quote: bytes | None = None, + trusted_intel_root_pem: bytes | None = None, ) -> TDXVerificationResult: """ Verify an Intel TDX attestation measurement. @@ -155,10 +163,23 @@ def verify_tdx_measurement( result.details["dcap_chain"] = "requires_intel_dcap_service" return result - # Step 3: DCAP collateral -- network check + # Step 3: DCAP quote verification (offline, fail-closed) when a full quote and a + # pinned Intel SGX root are supplied. The PCK chain travels inside the quote, so + # no network is needed at verify time. Otherwise fall back to a reachability note + # and leave the quote signature unverified. + if raw_quote is not None and trusted_intel_root_pem is not None: + q = verify_tdx_quote(raw_quote, trusted_intel_root_pem, report_data_hex) + result.verified_fields.extend(q.verified_fields) + result.unverified_fields.extend(q.unverified_fields) + result.details.update(q.details) + if not q.verified: + result.verified = False + result.failure_reason = result.failure_reason or q.failure_reason + return result + if _check_dcap_reachable(): result.details["dcap_qe_identity"] = "reachable" - # Full Quote verification requires dcap-provider library -- mark unverified + # Full Quote verification requires a quote + pinned root -- mark unverified result.unverified_fields.append("dcap_quote_signature") result.details["dcap_chain"] = "dcap_service_reachable_full_verification_not_implemented" else: @@ -167,3 +188,181 @@ def verify_tdx_measurement( result.details["dcap_endpoint"] = _DCAP_QE_IDENTITY_URL return result + + +# --- DCAP TD quote v4 verification (issue #370, TDX portion) ------------------ +# +# Offline verification of an Intel TDX ECDSA (att_key_type=2) quote. No network at +# verify time: the PCK cert chain travels inside the quote's certification data and +# the Intel SGX Provisioning Certification Root is pinned by the caller. +# +# TD Quote v4 layout (Intel DCAP): 48-byte header, 584-byte TD report body, a +# uint32 signature-data length, then the signature data (quote signature 64B, +# ECDSA attestation key 64B, QE report 384B, QE report signature 64B, QE auth data, +# and certification data carrying the PCK chain). The offsets below are per the Intel +# TDX DCAP spec and MUST be confirmed against a real Azure TDX quote fixture +# (capture-tdx-azure.sh); the skipped hardware test asserts them, which also settles +# the report_data offset in issue #371. +_QUOTE_HEADER_LEN = 48 +_TD_REPORT_BODY_LEN = 584 +_SIGNED_REGION_LEN = _QUOTE_HEADER_LEN + _TD_REPORT_BODY_LEN # 632 +_TD_BODY_REPORT_DATA_OFF = 520 # report_data sits after the RTMRs in the TD body +_QE_REPORT_LEN = 384 +_QE_REPORT_DATA_OFF = 320 # report_data offset within the SGX QE report +_ATT_KEY_TYPE_ECDSA_P256 = 2 + + +def _raw_p256_sig_to_der(sig64: bytes) -> bytes: + if len(sig64) != 64: + raise ValueError(f"expected 64-byte raw ECDSA sig, got {len(sig64)}") + r = int.from_bytes(sig64[:32], "big") + s = int.from_bytes(sig64[32:], "big") + return encode_dss_signature(r, s) + + +def _p256_pubkey_from_raw(xy: bytes) -> ec.EllipticCurvePublicKey: + if len(xy) != 64: + raise ValueError(f"expected 64-byte raw P-256 pubkey, got {len(xy)}") + return ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), b"\x04" + xy) + + +def _verify_p256(pub: ec.EllipticCurvePublicKey, sig64: bytes, data: bytes) -> bool: + try: + pub.verify(_raw_p256_sig_to_der(sig64), data, ec.ECDSA(SHA256())) + return True + except (InvalidSignature, ValueError): + return False + + +def _cert_signed_by(child: x509.Certificate, issuer: x509.Certificate) -> bool: + # PCK chain certs are ECDSA P-256. + pub = issuer.public_key() + if not isinstance(pub, ec.EllipticCurvePublicKey): + return False + try: + pub.verify(child.signature, child.tbs_certificate_bytes, + ec.ECDSA(child.signature_hash_algorithm)) + return True + except (InvalidSignature, ValueError): + return False + + +@dataclass +class _ParsedQuote: + signed_region: bytes + report_data: bytes + quote_sig: bytes + att_pubkey_raw: bytes + qe_report: bytes + qe_report_sig: bytes + qe_auth_data: bytes + pck_chain_pem: bytes + + +def parse_td_quote(quote: bytes) -> _ParsedQuote: + """Parse an Intel TDX ECDSA v4 quote. Raises ValueError on malformed input. + + Offsets are per the Intel TDX DCAP spec; confirm against a real fixture. + """ + if len(quote) < _SIGNED_REGION_LEN + 4: + raise ValueError("quote too short for header + TD report body + sig length") + att_key_type = int.from_bytes(quote[2:4], "little") + if att_key_type != _ATT_KEY_TYPE_ECDSA_P256: + raise ValueError(f"unsupported att_key_type {att_key_type} (expected ECDSA-P256)") + signed_region = quote[:_SIGNED_REGION_LEN] + body = quote[_QUOTE_HEADER_LEN:_SIGNED_REGION_LEN] + report_data = body[_TD_BODY_REPORT_DATA_OFF:_TD_BODY_REPORT_DATA_OFF + 64] + off = _SIGNED_REGION_LEN + sig_len = int.from_bytes(quote[off:off + 4], "little") + off += 4 + sig_data = quote[off:off + sig_len] + if len(sig_data) < 64 + 64 + _QE_REPORT_LEN + 64 + 2: + raise ValueError("signature data truncated") + p = 0 + quote_sig = sig_data[p:p + 64]; p += 64 + att_pubkey_raw = sig_data[p:p + 64]; p += 64 + qe_report = sig_data[p:p + _QE_REPORT_LEN]; p += _QE_REPORT_LEN + qe_report_sig = sig_data[p:p + 64]; p += 64 + qe_auth_len = int.from_bytes(sig_data[p:p + 2], "little"); p += 2 + qe_auth_data = sig_data[p:p + qe_auth_len]; p += qe_auth_len + p += 2 # cert_data_type + cert_size = int.from_bytes(sig_data[p:p + 4], "little"); p += 4 + pck_chain_pem = sig_data[p:p + cert_size] + return _ParsedQuote(signed_region, report_data, quote_sig, att_pubkey_raw, + qe_report, qe_report_sig, qe_auth_data, pck_chain_pem) + + +def verify_tdx_quote( + quote: bytes, + trusted_intel_root_pem: bytes, + expected_report_data_hex: str | None = None, +) -> TDXVerificationResult: + """Offline DCAP verification of a TDX ECDSA quote, fail-closed. + + Verifies: the quote signature over header+body by the attestation key; the + attestation key is bound into the QE report_data; the QE report is signed by the + PCK leaf; and the PCK chain verifies to the pinned Intel root. TCB status and QE + identity need Intel PCS collateral by FMSPC and are NOT checked here; they stay + in unverified_fields (do not treat this result as full TCB appraisal). + """ + def fail(reason: str) -> TDXVerificationResult: + return TDXVerificationResult( + verified=False, failure_reason=reason, + unverified_fields=["dcap_quote_signature", "tcb_status"]) + + try: + pq = parse_td_quote(quote) + except ValueError as exc: + return fail(f"quote_parse_error: {exc}") + + # 1) quote signature over header+body by the attestation key + try: + att_pub = _p256_pubkey_from_raw(pq.att_pubkey_raw) + except ValueError as exc: + return fail(f"attestation_key_invalid: {exc}") + if not _verify_p256(att_pub, pq.quote_sig, pq.signed_region): + return fail("quote_signature_invalid") + + # 2) attestation key bound into QE report_data[:32] = SHA256(att_key || qe_auth) + qe_rd = pq.qe_report[_QE_REPORT_DATA_OFF:_QE_REPORT_DATA_OFF + 64] + if qe_rd[:32] != hashlib.sha256(pq.att_pubkey_raw + pq.qe_auth_data).digest(): + return fail("attestation_key_not_bound_to_qe") + + # 3) QE report signed by the PCK leaf; 4) PCK chain to the pinned Intel root + try: + certs = x509.load_pem_x509_certificates(pq.pck_chain_pem) + except ValueError as exc: + return fail(f"pck_chain_parse_error: {exc}") + if not certs: + return fail("pck_chain_empty") + pck_pub = certs[0].public_key() + if not isinstance(pck_pub, ec.EllipticCurvePublicKey) or \ + not _verify_p256(pck_pub, pq.qe_report_sig, pq.qe_report): + return fail("qe_report_signature_invalid") + try: + root = x509.load_pem_x509_certificate(trusted_intel_root_pem) + except ValueError as exc: + return fail(f"intel_root_parse_error: {exc}") + if not _cert_signed_by(root, root): + return fail("intel_root_not_self_signed") + for child, issuer in zip([*certs, root], [*certs[1:], root]): + if not _cert_signed_by(child, issuer): + return fail("pck_chain_invalid") + + result = TDXVerificationResult(verified=True) + result.verified_fields.extend(["dcap_quote_signature", "pck_chain"]) + + # optional: confirm report_data binds our expected value (nonce / cnf) + if expected_report_data_hex is not None: + exp = bytes.fromhex(expected_report_data_hex[:128]).ljust(64, b"\x00") + if pq.report_data == exp: + result.verified_fields.append("report_data") + else: + result.verified = False + result.failure_reason = "report_data_mismatch" + return result + + # TCB status / QE identity need Intel PCS collateral by FMSPC (not done offline). + result.unverified_fields.append("tcb_status") + result.details["tcb_status"] = "requires_intel_pcs_collateral_by_fmspc" + return result diff --git a/tests/unit/test_tdx_quote_verify.py b/tests/unit/test_tdx_quote_verify.py new file mode 100644 index 0000000..544a186 --- /dev/null +++ b/tests/unit/test_tdx_quote_verify.py @@ -0,0 +1,151 @@ +"""TDX DCAP quote-verification tests (issue #370, TDX portion). + +These exercise the verification LOGIC against a locally generated, synthetic TDX +ECDSA v4 quote and a synthetic PCK chain (leaf -> intermediate -> root), so parsing +and all four checks (quote signature, attestation-key binding, QE report signature, +PCK chain to a pinned root) run end to end. The real-hardware test is marked skipped +below and unblocks when an Azure TDX quote fixture lands (also settles the real +report_data offset for #371). +""" +from __future__ import annotations + +import hashlib +import os +from datetime import UTC, datetime, timedelta + +import pytest +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.x509.oid import NameOID + +from cmcp_verify.tdx import ( + _QE_REPORT_DATA_OFF, + _QUOTE_HEADER_LEN, + _TD_BODY_REPORT_DATA_OFF, + _TD_REPORT_BODY_LEN, + verify_tdx_quote, +) + +_RD = b"cmcp-tdx-fixture-v1".ljust(64, b"\0") # known report_data (matches capture script) + + +def _name(cn: str) -> x509.Name: + return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) + + +def _cert(subject: str, issuer: str, sub_pub, iss_priv) -> x509.Certificate: + now = datetime.now(UTC) + return ( + x509.CertificateBuilder() + .subject_name(_name(subject)) + .issuer_name(_name(issuer)) + .public_key(sub_pub) + .serial_number(x509.random_serial_number()) + .not_valid_before(now - timedelta(days=1)) + .not_valid_after(now + timedelta(days=3650)) + .sign(iss_priv, hashes.SHA256()) + ) + + +def _raw_sig(priv, data: bytes) -> bytes: + r, s = decode_dss_signature(priv.sign(data, ec.ECDSA(hashes.SHA256()))) + return r.to_bytes(32, "big") + s.to_bytes(32, "big") + + +def _raw_pub(pub) -> bytes: + n = pub.public_numbers() + return n.x.to_bytes(32, "big") + n.y.to_bytes(32, "big") + + +def _pck_chain(): + root_k = ec.generate_private_key(ec.SECP256R1()) + inter_k = ec.generate_private_key(ec.SECP256R1()) + leaf_k = ec.generate_private_key(ec.SECP256R1()) + root = _cert("Intel Root", "Intel Root", root_k.public_key(), root_k) # self-signed + inter = _cert("Intel PCK Intermediate", "Intel Root", inter_k.public_key(), root_k) + leaf = _cert("Intel PCK Leaf", "Intel PCK Intermediate", leaf_k.public_key(), inter_k) + chain_pem = leaf.public_bytes(Encoding.PEM) + inter.public_bytes(Encoding.PEM) + return chain_pem, root.public_bytes(Encoding.PEM), leaf_k, root_k + + +def _build_quote(*, report_data: bytes = _RD, qe_auth: bytes = b"") -> tuple[bytes, bytes]: + """Return (quote_bytes, trusted_intel_root_pem) for a well-formed synthetic quote.""" + att_k = ec.generate_private_key(ec.SECP256R1()) + att_pub_raw = _raw_pub(att_k.public_key()) + + header = bytearray(_QUOTE_HEADER_LEN) + header[2:4] = (2).to_bytes(2, "little") # att_key_type = ECDSA-P256 + body = bytearray(_TD_REPORT_BODY_LEN) + body[_TD_BODY_REPORT_DATA_OFF:_TD_BODY_REPORT_DATA_OFF + 64] = report_data + signed_region = bytes(header) + bytes(body) + quote_sig = _raw_sig(att_k, signed_region) + + chain_pem, root_pem, leaf_k, _root_k = _pck_chain() + qe_report = bytearray(384) + bind = hashlib.sha256(att_pub_raw + qe_auth).digest() + qe_report[_QE_REPORT_DATA_OFF:_QE_REPORT_DATA_OFF + 32] = bind + qe_report_sig = _raw_sig(leaf_k, bytes(qe_report)) + + sig = bytearray() + sig += quote_sig + sig += att_pub_raw + sig += bytes(qe_report) + sig += qe_report_sig + sig += len(qe_auth).to_bytes(2, "little") + qe_auth + sig += (5).to_bytes(2, "little") # cert_data_type (PCK chain) + sig += len(chain_pem).to_bytes(4, "little") + chain_pem + + quote = signed_region + len(sig).to_bytes(4, "little") + bytes(sig) + return quote, root_pem + + +def test_valid_quote_verifies() -> None: + quote, root = _build_quote() + r = verify_tdx_quote(quote, root, _RD.hex()) + assert r.verified, r.failure_reason + assert "dcap_quote_signature" in r.verified_fields + assert "pck_chain" in r.verified_fields + assert "report_data" in r.verified_fields + assert "tcb_status" in r.unverified_fields # honest: not appraised offline + + +def test_tampered_quote_signature_fails_closed() -> None: + quote, root = _build_quote() + corrupted = bytearray(quote) + corrupted[0] ^= 0xFF # flip a byte inside the signed region + r = verify_tdx_quote(bytes(corrupted), root) + assert not r.verified + assert r.failure_reason == "quote_signature_invalid" + + +def test_wrong_pinned_root_fails_closed() -> None: + quote, _good_root = _build_quote() + _q2, other_root = _build_quote() # a different, untrusted root + r = verify_tdx_quote(quote, other_root) + assert not r.verified + assert r.failure_reason == "pck_chain_invalid" + + +def test_report_data_mismatch_fails() -> None: + quote, root = _build_quote(report_data=b"something-else".ljust(64, b"\0")) + r = verify_tdx_quote(quote, root, _RD.hex()) + assert not r.verified + assert r.failure_reason == "report_data_mismatch" + + +@pytest.mark.skipif( + not os.environ.get("CMCP_TDX_FIXTURE_DIR"), + reason="needs a real Azure TDX quote fixture (capture-tdx-azure.sh); set CMCP_TDX_FIXTURE_DIR", +) +def test_real_azure_tdx_quote() -> None: + d = os.environ["CMCP_TDX_FIXTURE_DIR"] + quote = open(os.path.join(d, "tdx_quote.bin"), "rb").read() + root = open(os.path.join(d, "collateral", "intel_root_ca.pem"), "rb").read() + expected_rd = open(os.path.join(d, "report_data.hex")).read().strip() + r = verify_tdx_quote(quote, root, expected_rd) + assert r.verified, r.failure_reason + # confirms the report_data offset used by parse_td_quote is correct (issue #371) + assert "report_data" in r.verified_fields