Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- cA2A-compatible conformance suite: `tests/conformance/` with a normative README (stable MUST/SHOULD test IDs across delegation, scope-policy, attestation, sealed channel, provenance, and the inbound pipeline) and runnable checks that exercise every MUST-level requirement. Wired into CI and documented at `docs/spec/conformance.md`; ties to the CHARTER trademark language.
- TPM 2.0 attestation backend: `ca2a_runtime.tee.tpm` (TPMS_ATTEST parsing, `TpmProvider`) and `ca2a_verify.tpm.verify_tpm_quote` (AK chain to a caller-supplied vendor root, AK signature over the attest blob (ECDSA or RSA), magic/type checks, and qualifying-data/PCR-digest binding), all fail-closed. Synthetic-vector validated; TPM AK roots are per-vendor so the caller supplies its trusted roots. Quote generation requires a real TPM.
- Intel TDX attestation backend: `ca2a_runtime.tee.tdx` (DCAP Quote v4 parsing, `TdxProvider`) and `ca2a_verify.tdx.verify_tdx_quote` (PCK chain to a trusted Intel root, QE report signature, attestation-key binding, quote signature, and MRTD/report-data binding), all fail-closed. Chain path validated against the genuine Intel SGX Root CA; multi-level signature path validated with a synthetic self-consistent quote. Quote generation requires a real TDX guest.
- Real Cedar policy engine binding: `ca2a_runtime.cedar.CedarPolicy` (backed by `cedarpy`, the engine cMCP runs) evaluates each capability as a Cedar authorization request. A new `ca2a_runtime.policy.Policy` protocol makes `LocalPolicy` (allow set) and `CedarPolicy` interchangeable in the peer path. Adds the `cedarpy` dependency.
- Transport-agnostic inbound peer request handler: `ca2a_runtime.peer.handle_peer_request` with `PeerRequest` / `PeerResult`. Composes the full pipeline (verify chain, intersect scope and enforce, open a sealed payload with the enclave key, emit a linked provenance record) fail-closed. A transport parses its wire format into a `PeerRequest`; cA2A does not define the transport (profile, not protocol).
- RFC 8785 (JSON Canonicalization Scheme) canonicalization: `ca2a_runtime.canonical.canonicalize`. Credential and provenance bodies are now signed over the JCS encoding (UTF-16 key ordering, JCS string escaping, literal non-ASCII, shortest-decimal integers), so cA2A signatures are cross-verifiable with agent-manifest. ASCII credentials are byte-identical to the previous encoding, so existing signatures still verify.
- Repository scaffold: governance, CI/CD, docs framework, and packaging at parity with the agentrust-io house standard
Expand Down
2 changes: 1 addition & 1 deletion ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Already implemented and tested elsewhere; cA2A depends on it rather than reimple

## v0.2: Runtime enforcement and sealed channel

- Runtime peer-delegation enforcement: **decision core landed** (`ca2a_runtime.peer.enforce_peer_call`: verify chain, intersect delegated scope with local policy, enforce, emit provenance record; claim C3 validated). Remaining: bind a Cedar policy engine as the local policy, and wire the decision core to a live A2A transport (Tier 2)
- Runtime peer-delegation enforcement: **decision core landed** (`ca2a_runtime.peer.enforce_peer_call`: verify chain, intersect delegated scope with local policy, enforce, emit provenance record; claim C3 validated), now with a **real Cedar policy engine** option (`ca2a_runtime.cedar.CedarPolicy`) alongside the allow-set `LocalPolicy`. Remaining: wire the decision core to a live A2A transport (Tier 2)
- Sealed peer channel: **landed** (`ca2a_runtime.channel`: HPKE-style X25519 -> HKDF-SHA256 -> ChaCha20-Poly1305 sealing to the peer's attested key; claim C4 validated). Remaining: bind the seal to a verified attestation report on a live call, and rely on the enclave to hold the private key (hardware property)
- Linked runtime evidence: each hop's TRACE record references the parent record hash and delegation credential id, producing a verifiable delegation DAG (Tier 2)

Expand Down
14 changes: 13 additions & 1 deletion docs/spec/cedar-policy.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@ When a peer accepts a delegated task, two independent trust decisions meet. The
effective = delegated_scope ∩ local_policy_allow
```

**Status.** The intersection semantics are **implemented** as an enforcement decision core in `ca2a_runtime.peer` (`effective_scope`, `enforce_peer_call`) against a `ca2a_runtime.policy.LocalPolicy` capability allow set, and validated by experiment C3. Two pieces remain: binding a full **Cedar policy engine** as the local policy (this page's title; the semantics are policy-language-agnostic and the allow-set model stands in today, tracked as #10), and wiring the decision core to a **live A2A transport** rather than a direct call. See [call-graph.md](call-graph.md), [ROADMAP.md](../../ROADMAP.md), and [LIMITATIONS.md](../../LIMITATIONS.md).
**Status.** The intersection semantics are **implemented** as an enforcement decision core in `ca2a_runtime.peer` (`effective_scope`, `enforce_peer_call`), and the local policy can be either a capability allow set (`ca2a_runtime.policy.LocalPolicy`) or a **real Cedar policy engine** (`ca2a_runtime.cedar.CedarPolicy`, backed by `cedarpy`, the same engine cMCP runs). Both satisfy the `ca2a_runtime.policy.Policy` protocol, so they are interchangeable in the peer path. Validated by experiment C3 and the Cedar unit tests. What remains is wiring the decision core to a **live A2A transport** rather than a direct call. See [call-graph.md](call-graph.md) and [ROADMAP.md](../../ROADMAP.md).

## Cedar policy

`CedarPolicy` evaluates each capability as a Cedar authorization request whose action id is the capability name; a capability is permitted iff Cedar returns `Allow`. The effective scope is the delegated leaf scope intersected with the capabilities Cedar permits.

```python
from ca2a_runtime.cedar import CedarPolicy
from ca2a_runtime.peer import effective_scope

policy = CedarPolicy('permit(principal, action == Action::"read", resource);')
effective_scope(chain, policy) # delegated leaf scope AND what Cedar allows
```

## Why an intersection

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ requires-python = ">=3.11"
dependencies = [
"cryptography>=42.0",
"pyyaml>=6.0",
"cedarpy>=4.8",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -91,5 +92,5 @@ warn_return_any = false
files = ["src/ca2a_runtime", "src/ca2a_verify"]

[[tool.mypy.overrides]]
module = ["ca2a_runtime.tee.sev_snp", "ca2a_runtime.tee.tdx", "ca2a_runtime.tee.tpm", "ca2a_verify.tdx", "ca2a_verify.tpm", "ca2a_runtime.channel.sealed"]
module = ["ca2a_runtime.tee.sev_snp", "ca2a_runtime.tee.tdx", "ca2a_runtime.tee.tpm", "ca2a_runtime.cedar", "ca2a_verify.tdx", "ca2a_verify.tpm", "ca2a_runtime.channel.sealed"]
warn_unused_ignores = false
46 changes: 46 additions & 0 deletions src/ca2a_runtime/cedar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""A local policy backed by a real Cedar policy engine.

`CedarPolicy` evaluates the callee's Cedar policy to decide which capabilities a
peer may exercise. It satisfies the `ca2a_runtime.policy.Policy` protocol, so it
is a drop-in for `LocalPolicy` in the peer path: the effective scope on an
inbound call is the delegated leaf scope intersected with what Cedar permits.

Each capability is evaluated as a Cedar authorization request whose action id is
the capability name; a capability is permitted iff Cedar returns Allow. This
reuses the same policy engine cMCP runs (see docs/spec/cedar-policy.md).
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from cedarpy import Decision, is_authorized


@dataclass(frozen=True)
class CedarPolicy:
"""A local policy backed by a Cedar policy set."""

policies: str
principal_type: str = "Agent"
principal_id: str = "peer"
resource_type: str = "Task"
resource_id: str = "task"

def _request(self, capability: str) -> dict[str, Any]:
return {
"principal": {"type": self.principal_type, "id": self.principal_id},
"action": {"type": "Action", "id": capability},
"resource": {"type": self.resource_type, "id": self.resource_id},
"context": {},
}

def permits(self, capability: str) -> bool:
"""Return True iff Cedar authorizes an action of this capability's name."""
result = is_authorized(self._request(capability), self.policies, [])
return bool(result.decision == Decision.Allow)

def intersect(self, delegated: frozenset[str]) -> frozenset[str]:
"""Return the effective scope: delegated capabilities Cedar permits."""
return frozenset(cap for cap in delegated if self.permits(cap))
284 changes: 142 additions & 142 deletions src/ca2a_runtime/peer.py
Original file line number Diff line number Diff line change
@@ -1,142 +1,142 @@
"""Inbound peer-call enforcement: the decision the callee makes before it acts.
When a peer presents a delegation chain and requests a capability, the callee:
1. verifies the chain (signature, continuity, attenuation, depth, replay);
2. computes the effective scope as the leaf's delegated scope intersected with
the callee's local policy;
3. enforces: the requested capability must be in the effective scope;
4. emits a provenance record for the accepted hop, linked to its parent.
`enforce_peer_call` is the enforcement decision core. `handle_peer_request`
composes it into the full transport-agnostic inbound pipeline: verify, enforce,
open any sealed payload with the enclave key, and emit a provenance record. A
transport (an A2A server) parses its wire format into a `PeerRequest` and calls
this; cA2A does not define the transport itself, only what the peer does with a
parsed request.
"""
from __future__ import annotations
from dataclasses import dataclass
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from ca2a_runtime.channel import open_sealed
from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain
from ca2a_runtime.errors import ScopeNotPermitted, SealedChannelError
from ca2a_runtime.policy import LocalPolicy
from ca2a_runtime.provenance import DelegationRecord, record_for
def effective_scope(
chain: list[DelegationCredential], policy: LocalPolicy, *, max_depth: int = 8
) -> frozenset[str]:
"""Verify the chain and return the effective scope (delegated ∩ local policy).
Raises the relevant CA2AError if the chain does not verify.
"""
verify_chain(chain, max_depth=max_depth)
return policy.intersect(chain[-1].scope)
@dataclass(frozen=True)
class PeerDecision:
"""The result of an accepted peer call."""
effective_scope: frozenset[str]
granted_capability: str
record: DelegationRecord
def enforce_peer_call(
chain: list[DelegationCredential],
requested_capability: str,
*,
policy: LocalPolicy,
record_id: str,
parent_record_hash: str | None = None,
max_depth: int = 8,
) -> PeerDecision:
"""Verify, intersect with local policy, enforce, and emit a provenance record.
Raises ScopeNotPermitted if the requested capability is not in the effective
scope, and the underlying CA2AError if the chain does not verify. On accept,
returns a PeerDecision carrying the linked provenance record.
"""
effective = effective_scope(chain, policy, max_depth=max_depth)
if requested_capability not in effective:
raise ScopeNotPermitted(
f"capability {requested_capability!r} is not in the effective scope",
detail=f"effective={sorted(effective)}",
)
record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash)
return PeerDecision(
effective_scope=effective,
granted_capability=requested_capability,
record=record,
)
@dataclass(frozen=True)
class PeerRequest:
"""A transport-agnostic inbound peer request.
A transport (an A2A server) parses its wire format into this shape and hands
it to ``handle_peer_request``. cA2A does not define the transport; it defines
what a peer does with the request once parsed.
"""
chain: list[DelegationCredential]
requested_capability: str
record_id: str
sealed_payload: bytes | None = None
parent_record_hash: str | None = None
@dataclass(frozen=True)
class PeerResult:
"""The outcome of handling an accepted peer request."""
effective_scope: frozenset[str]
granted_capability: str
record: DelegationRecord
payload: bytes | None
def handle_peer_request(
request: PeerRequest,
*,
policy: LocalPolicy,
enclave_private_key: X25519PrivateKey | None = None,
max_depth: int = 8,
) -> PeerResult:
"""Run the full inbound pipeline for a parsed peer request.
Verifies the delegation chain, intersects the delegated scope with the local
policy and enforces the requested capability, opens any sealed payload with
the enclave-bound key, and emits a linked provenance record. Fails closed:
any verification or authorization failure raises the relevant CA2AError and
no payload is returned.
"""
decision = enforce_peer_call(
request.chain,
request.requested_capability,
policy=policy,
record_id=request.record_id,
parent_record_hash=request.parent_record_hash,
max_depth=max_depth,
)
payload: bytes | None = None
if request.sealed_payload is not None:
if enclave_private_key is None:
raise SealedChannelError("a sealed payload was sent but no enclave key is available")
payload = open_sealed(request.sealed_payload, enclave_private_key)
return PeerResult(
effective_scope=decision.effective_scope,
granted_capability=decision.granted_capability,
record=decision.record,
payload=payload,
)
"""Inbound peer-call enforcement: the decision the callee makes before it acts.

When a peer presents a delegation chain and requests a capability, the callee:

1. verifies the chain (signature, continuity, attenuation, depth, replay);
2. computes the effective scope as the leaf's delegated scope intersected with
the callee's local policy;
3. enforces: the requested capability must be in the effective scope;
4. emits a provenance record for the accepted hop, linked to its parent.

`enforce_peer_call` is the enforcement decision core. `handle_peer_request`
composes it into the full transport-agnostic inbound pipeline: verify, enforce,
open any sealed payload with the enclave key, and emit a provenance record. A
transport (an A2A server) parses its wire format into a `PeerRequest` and calls
this; cA2A does not define the transport itself, only what the peer does with a
parsed request.
"""

from __future__ import annotations

from dataclasses import dataclass

from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey

from ca2a_runtime.channel import open_sealed
from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain
from ca2a_runtime.errors import ScopeNotPermitted, SealedChannelError
from ca2a_runtime.policy import Policy
from ca2a_runtime.provenance import DelegationRecord, record_for


def effective_scope(
chain: list[DelegationCredential], policy: Policy, *, max_depth: int = 8
) -> frozenset[str]:
"""Verify the chain and return the effective scope (delegated ∩ local policy).

Raises the relevant CA2AError if the chain does not verify.
"""
verify_chain(chain, max_depth=max_depth)
return policy.intersect(chain[-1].scope)


@dataclass(frozen=True)
class PeerDecision:
"""The result of an accepted peer call."""

effective_scope: frozenset[str]
granted_capability: str
record: DelegationRecord


def enforce_peer_call(
chain: list[DelegationCredential],
requested_capability: str,
*,
policy: Policy,
record_id: str,
parent_record_hash: str | None = None,
max_depth: int = 8,
) -> PeerDecision:
"""Verify, intersect with local policy, enforce, and emit a provenance record.

Raises ScopeNotPermitted if the requested capability is not in the effective
scope, and the underlying CA2AError if the chain does not verify. On accept,
returns a PeerDecision carrying the linked provenance record.
"""
effective = effective_scope(chain, policy, max_depth=max_depth)
if requested_capability not in effective:
raise ScopeNotPermitted(
f"capability {requested_capability!r} is not in the effective scope",
detail=f"effective={sorted(effective)}",
)
record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash)
return PeerDecision(
effective_scope=effective,
granted_capability=requested_capability,
record=record,
)


@dataclass(frozen=True)
class PeerRequest:
"""A transport-agnostic inbound peer request.

A transport (an A2A server) parses its wire format into this shape and hands
it to ``handle_peer_request``. cA2A does not define the transport; it defines
what a peer does with the request once parsed.
"""

chain: list[DelegationCredential]
requested_capability: str
record_id: str
sealed_payload: bytes | None = None
parent_record_hash: str | None = None


@dataclass(frozen=True)
class PeerResult:
"""The outcome of handling an accepted peer request."""

effective_scope: frozenset[str]
granted_capability: str
record: DelegationRecord
payload: bytes | None


def handle_peer_request(
request: PeerRequest,
*,
policy: Policy,
enclave_private_key: X25519PrivateKey | None = None,
max_depth: int = 8,
) -> PeerResult:
"""Run the full inbound pipeline for a parsed peer request.

Verifies the delegation chain, intersects the delegated scope with the local
policy and enforces the requested capability, opens any sealed payload with
the enclave-bound key, and emits a linked provenance record. Fails closed:
any verification or authorization failure raises the relevant CA2AError and
no payload is returned.
"""
decision = enforce_peer_call(
request.chain,
request.requested_capability,
policy=policy,
record_id=request.record_id,
parent_record_hash=request.parent_record_hash,
max_depth=max_depth,
)

payload: bytes | None = None
if request.sealed_payload is not None:
if enclave_private_key is None:
raise SealedChannelError("a sealed payload was sent but no enclave key is available")
payload = open_sealed(request.sealed_payload, enclave_private_key)

return PeerResult(
effective_scope=decision.effective_scope,
granted_capability=decision.granted_capability,
record=decision.record,
payload=payload,
)
Loading
Loading