Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Peer-call enforcement decision core (Tier 2): `ca2a_runtime.policy.LocalPolicy` and `ca2a_runtime.peer` (`effective_scope`, `enforce_peer_call`). Effective permission is the delegated leaf scope intersected with the callee's local policy; a granted call emits a linked provenance record. New error `SCOPE_NOT_PERMITTED`. Claim C3 (scope-policy intersection) is now a validated experiment. Cedar-engine binding of the local policy and live A2A transport wiring remain open.
- Sealed peer channel (Tier 2): `ca2a_runtime.channel` (`SealedChannel`, `generate_channel_keypair`, `open_sealed`). HPKE-style X25519 -> HKDF-SHA256 -> ChaCha20-Poly1305 sealing a payload to the peer's attested key; only the peer's private key opens it, and a wrong key or tampered ciphertext fails closed. Claim C4 (sealed-payload confidentiality) is now a validated experiment at the cryptographic layer. The enclave-binding of the private key (a hardware property) and live-path wiring remain open.
- Cross-operator attestation (Claim C6) validated in software: a two-operator harness composing the SEV-SNP verifier, measurement pinning, and the sealed channel demonstrates independent keys, mutual attestation, confidential cross-operator delegation, and binary-swap detection. Synthetic report vectors (a genuine report needs SEV-SNP hardware); real hardware end to end remains open. **All six claims (C1-C6) are now validated experiments.**
- Transport-agnostic inbound peer request handler: `ca2a_runtime.peer.handle_peer_request` with `PeerRequest` / `PeerResult`. Composes the full pipeline (verify chain, intersect scope and enforce, open a sealed payload with the enclave key, emit a linked provenance record) fail-closed. A transport parses its wire format into a `PeerRequest`; cA2A does not define the transport (profile, not protocol).
- RFC 8785 (JSON Canonicalization Scheme) canonicalization: `ca2a_runtime.canonical.canonicalize`. Credential and provenance bodies are now signed over the JCS encoding (UTF-16 key ordering, JCS string escaping, literal non-ASCII, shortest-decimal integers), so cA2A signatures are cross-verifiable with agent-manifest. ASCII credentials are byte-identical to the previous encoding, so existing signatures still verify.
- Repository scaffold: governance, CI/CD, docs framework, and packaging at parity with the agentrust-io house standard

Expand Down
5 changes: 3 additions & 2 deletions docs/spec/call-graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

When a cA2A peer receives an inbound A2A task, it runs a fixed sequence of checks before it acts on the task and after it acts. The order is not arbitrary: cheap, offline, deterministic checks run first, and each step fails closed so a later step never runs against unverified input. This page states the full intended enforcement order and marks, for each step, what the code does today versus what is design.

Steps 1 (chain verification), 3 (scope intersection), and 5 (provenance emission) are implemented as a decision core in `ca2a_runtime.peer.enforce_peer_call`. Step 2 has a SEV-SNP verifier (`ca2a_verify.sev_snp`) but is not yet wired into the call path and needs real hardware to produce a report; step 4 (sealing) is not implemented. What remains for a live deployment is wiring the decision core to an actual inbound A2A transport, plus the attestation and sealing steps. See [LIMITATIONS.md](../../LIMITATIONS.md) and [ROADMAP.md](../../ROADMAP.md).
Steps 1 (chain verification), 3 (scope intersection), 4 (opening a sealed payload), and 5 (provenance emission) are composed into one transport-agnostic handler, `ca2a_runtime.peer.handle_peer_request`, which takes a parsed `PeerRequest` and runs the pipeline fail-closed. Step 2 has a SEV-SNP verifier (`ca2a_verify.sev_snp`), used counterparty-side to seal to a peer before sending (see the cross-operator flow), but is not part of the callee handler and needs real hardware to produce a report. What remains for a live deployment is a transport that parses actual A2A wire messages into a `PeerRequest`; cA2A leaves that to implementers by design (profile, not protocol). See [LIMITATIONS.md](../../LIMITATIONS.md) and [ROADMAP.md](../../ROADMAP.md).

## Decision flow

Expand Down Expand Up @@ -39,7 +39,8 @@ If any step raises, the call is denied. Absence of evidence is denial, not a war
| 3. Scope intersection | Delegated scope intersected with local policy | `ca2a_runtime.peer.effective_scope`, `enforce_peer_call` | Implemented (decision core); Cedar engine binding pending (#10) |
| 4. Payload sealing | Payload sealed to the peer's attested key | `SealedChannel.seal`, `open_sealed` | Implemented (crypto); binding to a verified report on the live path pending |
| 5. Provenance record | A `DelegationRecord` emitted and linked to its parent | `enforce_peer_call`, `record_for`, `verify_dag` | Implemented (emitted by the decision core) |
| Live A2A transport wiring | The decision core runs off an actual inbound A2A request | (design) | Pending, Tier 2 |
| Inbound pipeline handler | Verify, enforce, open sealed payload, emit record off a parsed request | `handle_peer_request`, `PeerRequest` | Implemented (transport-agnostic) |
| A2A wire parsing into a `PeerRequest` | Parse actual A2A extension fields into the handler's input | (implementer/transport) | Left to implementers by design |

## Step 1: verify the delegation chain (implemented)

Expand Down
215 changes: 142 additions & 73 deletions src/ca2a_runtime/peer.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,142 @@
"""Inbound peer-call enforcement: the decision the callee makes before it acts.

When a peer presents a delegation chain and requests a capability, the callee:

1. verifies the chain (signature, continuity, attenuation, depth, replay);
2. computes the effective scope as the leaf's delegated scope intersected with
the callee's local policy;
3. enforces: the requested capability must be in the effective scope;
4. emits a provenance record for the accepted hop, linked to its parent.

This module is the enforcement decision core. Wiring it to a live A2A transport
(accepting the credential off an actual inbound request) is tracked separately;
attestation of the peer and sealing of the payload are Tier 2/3 and are not part
of this decision.
"""

from __future__ import annotations

from dataclasses import dataclass

from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain
from ca2a_runtime.errors import ScopeNotPermitted
from ca2a_runtime.policy import LocalPolicy
from ca2a_runtime.provenance import DelegationRecord, record_for


def effective_scope(
chain: list[DelegationCredential], policy: LocalPolicy, *, max_depth: int = 8
) -> frozenset[str]:
"""Verify the chain and return the effective scope (delegated ∩ local policy).

Raises the relevant CA2AError if the chain does not verify.
"""
verify_chain(chain, max_depth=max_depth)
return policy.intersect(chain[-1].scope)


@dataclass(frozen=True)
class PeerDecision:
"""The result of an accepted peer call."""

effective_scope: frozenset[str]
granted_capability: str
record: DelegationRecord


def enforce_peer_call(
chain: list[DelegationCredential],
requested_capability: str,
*,
policy: LocalPolicy,
record_id: str,
parent_record_hash: str | None = None,
max_depth: int = 8,
) -> PeerDecision:
"""Verify, intersect with local policy, enforce, and emit a provenance record.

Raises ScopeNotPermitted if the requested capability is not in the effective
scope, and the underlying CA2AError if the chain does not verify. On accept,
returns a PeerDecision carrying the linked provenance record.
"""
effective = effective_scope(chain, policy, max_depth=max_depth)
if requested_capability not in effective:
raise ScopeNotPermitted(
f"capability {requested_capability!r} is not in the effective scope",
detail=f"effective={sorted(effective)}",
)
record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash)
return PeerDecision(
effective_scope=effective,
granted_capability=requested_capability,
record=record,
)
"""Inbound peer-call enforcement: the decision the callee makes before it acts.

When a peer presents a delegation chain and requests a capability, the callee:

1. verifies the chain (signature, continuity, attenuation, depth, replay);
2. computes the effective scope as the leaf's delegated scope intersected with
the callee's local policy;
3. enforces: the requested capability must be in the effective scope;
4. emits a provenance record for the accepted hop, linked to its parent.

`enforce_peer_call` is the enforcement decision core. `handle_peer_request`
composes it into the full transport-agnostic inbound pipeline: verify, enforce,
open any sealed payload with the enclave key, and emit a provenance record. A
transport (an A2A server) parses its wire format into a `PeerRequest` and calls
this; cA2A does not define the transport itself, only what the peer does with a
parsed request.
"""

from __future__ import annotations

from dataclasses import dataclass

from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey

from ca2a_runtime.channel import open_sealed
from ca2a_runtime.delegation.credential import DelegationCredential, verify_chain
from ca2a_runtime.errors import ScopeNotPermitted, SealedChannelError
from ca2a_runtime.policy import LocalPolicy
from ca2a_runtime.provenance import DelegationRecord, record_for


def effective_scope(
chain: list[DelegationCredential], policy: LocalPolicy, *, max_depth: int = 8
) -> frozenset[str]:
"""Verify the chain and return the effective scope (delegated ∩ local policy).

Raises the relevant CA2AError if the chain does not verify.
"""
verify_chain(chain, max_depth=max_depth)
return policy.intersect(chain[-1].scope)


@dataclass(frozen=True)
class PeerDecision:
"""The result of an accepted peer call."""

effective_scope: frozenset[str]
granted_capability: str
record: DelegationRecord


def enforce_peer_call(
chain: list[DelegationCredential],
requested_capability: str,
*,
policy: LocalPolicy,
record_id: str,
parent_record_hash: str | None = None,
max_depth: int = 8,
) -> PeerDecision:
"""Verify, intersect with local policy, enforce, and emit a provenance record.

Raises ScopeNotPermitted if the requested capability is not in the effective
scope, and the underlying CA2AError if the chain does not verify. On accept,
returns a PeerDecision carrying the linked provenance record.
"""
effective = effective_scope(chain, policy, max_depth=max_depth)
if requested_capability not in effective:
raise ScopeNotPermitted(
f"capability {requested_capability!r} is not in the effective scope",
detail=f"effective={sorted(effective)}",
)
record = record_for(chain[-1], record_id=record_id, parent_record_hash=parent_record_hash)
return PeerDecision(
effective_scope=effective,
granted_capability=requested_capability,
record=record,
)


@dataclass(frozen=True)
class PeerRequest:
"""A transport-agnostic inbound peer request.

A transport (an A2A server) parses its wire format into this shape and hands
it to ``handle_peer_request``. cA2A does not define the transport; it defines
what a peer does with the request once parsed.
"""

chain: list[DelegationCredential]
requested_capability: str
record_id: str
sealed_payload: bytes | None = None
parent_record_hash: str | None = None


@dataclass(frozen=True)
class PeerResult:
"""The outcome of handling an accepted peer request."""

effective_scope: frozenset[str]
granted_capability: str
record: DelegationRecord
payload: bytes | None


def handle_peer_request(
request: PeerRequest,
*,
policy: LocalPolicy,
enclave_private_key: X25519PrivateKey | None = None,
max_depth: int = 8,
) -> PeerResult:
"""Run the full inbound pipeline for a parsed peer request.

Verifies the delegation chain, intersects the delegated scope with the local
policy and enforces the requested capability, opens any sealed payload with
the enclave-bound key, and emits a linked provenance record. Fails closed:
any verification or authorization failure raises the relevant CA2AError and
no payload is returned.
"""
decision = enforce_peer_call(
request.chain,
request.requested_capability,
policy=policy,
record_id=request.record_id,
parent_record_hash=request.parent_record_hash,
max_depth=max_depth,
)

payload: bytes | None = None
if request.sealed_payload is not None:
if enclave_private_key is None:
raise SealedChannelError("a sealed payload was sent but no enclave key is available")
payload = open_sealed(request.sealed_payload, enclave_private_key)

return PeerResult(
effective_scope=decision.effective_scope,
granted_capability=decision.granted_capability,
record=decision.record,
payload=payload,
)
66 changes: 66 additions & 0 deletions tests/unit/test_peer_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Tests for the transport-agnostic inbound peer request handler."""

from __future__ import annotations

import pytest

from ca2a_runtime.channel import SealedChannel, generate_channel_keypair
from ca2a_runtime.errors import ScopeEscalation, ScopeNotPermitted, SealedChannelError
from ca2a_runtime.peer import PeerRequest, PeerResult, handle_peer_request
from ca2a_runtime.policy import LocalPolicy
from ca2a_runtime.provenance import verify_dag
from tests.unit.conftest import build_chain


def _chain():
return build_chain([frozenset({"read", "write", "admin"}), frozenset({"read", "write"})])


def test_handles_request_without_payload() -> None:
req = PeerRequest(chain=_chain(), requested_capability="read", record_id="rec-0")
result = handle_peer_request(req, policy=LocalPolicy.of(["read", "audit"]))
assert isinstance(result, PeerResult)
assert result.granted_capability == "read"
assert result.effective_scope == frozenset({"read"})
assert result.payload is None
assert verify_dag([result.record]) == [result.record]


def test_handles_request_with_sealed_payload() -> None:
priv, pub = generate_channel_keypair()
payload = b"do the thing"
req = PeerRequest(
chain=_chain(), requested_capability="read", record_id="rec-0",
sealed_payload=SealedChannel(pub).seal(payload),
)
result = handle_peer_request(
req, policy=LocalPolicy.of(["read"]), enclave_private_key=priv
)
assert result.payload == payload


def test_denied_capability_raises_before_payload() -> None:
priv, pub = generate_channel_keypair()
req = PeerRequest(
chain=_chain(), requested_capability="admin", record_id="rec-0",
sealed_payload=SealedChannel(pub).seal(b"secret"),
)
with pytest.raises(ScopeNotPermitted):
handle_peer_request(req, policy=LocalPolicy.of(["read"]), enclave_private_key=priv)


def test_sealed_payload_without_key_fails_closed() -> None:
_, pub = generate_channel_keypair()
req = PeerRequest(
chain=_chain(), requested_capability="read", record_id="rec-0",
sealed_payload=SealedChannel(pub).seal(b"secret"),
)
with pytest.raises(SealedChannelError):
handle_peer_request(req, policy=LocalPolicy.of(["read"])) # no enclave key


def test_invalid_chain_rejected() -> None:
bad = build_chain([frozenset({"read"}), frozenset({"read", "write"})]) # escalation
req = PeerRequest(chain=bad, requested_capability="read", record_id="rec-0")
with pytest.raises(ScopeEscalation):
handle_peer_request(req, policy=LocalPolicy.of(["read", "write"]))
Loading