Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ known-first-party = ["openai", "tests"]
"scripts/**.py" = ["T201", "T203"]
"tests/**.py" = ["T201", "T203"]
"tests/instrument/**.py" = ["T201", "T203", "ARG"]
"tests/attestation/**.py" = ["T201", "T203", "ARG"]
"examples/**.py" = ["T201", "T203"]
"src/layerlens/cli/**" = ["T201", "T203"]
"src/layerlens/instrument/adapters/frameworks/langchain.py" = ["ARG002"]
Expand Down
29 changes: 29 additions & 0 deletions src/layerlens/attestation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

from ._hash import compute_hash
from ._chain import HashChain
from ._verify import (
TamperingResult,
ChainVerification,
TrialVerification,
verify_chain,
verify_trial,
detect_tampering,
)
from ._signing import hmac_sign, hmac_verify
from ._envelope import HashScope, AttestationEnvelope

__all__ = [
"AttestationEnvelope",
"ChainVerification",
"HashChain",
"HashScope",
"TamperingResult",
"TrialVerification",
"compute_hash",
"detect_tampering",
"hmac_sign",
"hmac_verify",
"verify_chain",
"verify_trial",
]
88 changes: 88 additions & 0 deletions src/layerlens/attestation/_chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from copy import copy
from typing import Any, Dict, List, Optional

from ._hash import compute_hash
from ._envelope import HashScope, AttestationEnvelope


class HashChain:
"""Builds a linear hash chain over a sequence of events.

Each event is hashed and linked to the previous hash, forming
a tamper-evident chain. If any event is modified after the fact,
the chain breaks at that point.

Signing is handled server-side at trace ingestion. The SDK builds
the hash chain for integrity; the backend signs for authenticity.
"""

def __init__(self) -> None:
self._chain: List[AttestationEnvelope] = []
self._last_hash: Optional[str] = None
self._terminated: bool = False
self._terminate_reason: Optional[str] = None

@property
def envelopes(self) -> List[AttestationEnvelope]:
return [copy(e) for e in self._chain]

@property
def is_terminated(self) -> bool:
return self._terminated

def _check_active(self) -> None:
if self._terminated:
raise RuntimeError(f"Hash chain terminated: {self._terminate_reason}. No further events can be added.")

def add_event(self, data: Dict[str, Any]) -> AttestationEnvelope:
"""Hash an event and append it to the chain."""
self._check_active()
# Include previous_hash in the hashed payload for chaining
payload = {**data, "_previous_hash": self._last_hash}
event_hash = compute_hash(payload)
envelope = AttestationEnvelope(
hash=event_hash,
scope=HashScope.EVENT,
previous_hash=self._last_hash,
)
self._chain.append(envelope)
self._last_hash = event_hash
return envelope

def terminate(self, reason: str) -> None:
"""Permanently stop the chain. No further events or finalization allowed."""
self._terminated = True
self._terminate_reason = reason

def finalize(self) -> AttestationEnvelope:
"""Compute a trial-level root hash over all event hashes and seal the chain."""
if self._terminated:
raise RuntimeError(
f"Cannot finalize terminated hash chain. Trial is non-attestable due to: {self._terminate_reason}"
)
if not self._chain:
raise RuntimeError("Cannot finalize empty hash chain.")
event_hashes = [e.hash for e in self._chain]
root_hash = compute_hash({"event_hashes": event_hashes})
trial_envelope = AttestationEnvelope(
hash=root_hash,
scope=HashScope.TRIAL,
previous_hash=self._last_hash,
)
# Seal — no more events after finalization
self._terminated = True
self._terminate_reason = "chain finalized"
return trial_envelope

def to_dict(self) -> Dict[str, Any]:
"""Serialize the chain for inclusion in trace uploads."""
result: Dict[str, Any] = {
"events": [e.to_dict() for e in self._chain],
}
# Only include termination details when the chain was stopped
# due to a policy violation (not normal finalization).
if self._terminated and self._terminate_reason != "chain finalized":
result["terminated_reason"] = self._terminate_reason
return result
38 changes: 38 additions & 0 deletions src/layerlens/attestation/_envelope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

from enum import Enum
from typing import Any, Dict, Optional
from datetime import datetime, timezone
from dataclasses import field, dataclass


class HashScope(Enum):
"""Level at which a hash was computed."""

EVENT = "event"
TRIAL = "trial"


@dataclass
class AttestationEnvelope:
"""Single entry in a hash chain."""

hash: str
scope: HashScope
previous_hash: Optional[str] = None
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
signature: Optional[str] = None
signing_key_id: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {
"hash": self.hash,
"scope": self.scope.value,
"previous_hash": self.previous_hash,
"timestamp": self.timestamp.isoformat(),
}
if self.signature is not None:
d["signature"] = self.signature
if self.signing_key_id is not None:
d["signing_key_id"] = self.signing_key_id
return d
39 changes: 39 additions & 0 deletions src/layerlens/attestation/_hash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

import json
import hashlib
from enum import Enum
from typing import Any
from datetime import datetime
from dataclasses import asdict


def _json_default(obj: Any) -> Any:
"""Handle non-standard types for canonical JSON serialization."""
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, Enum):
return obj.value
if hasattr(obj, "to_dict"):
return obj.to_dict()
if hasattr(obj, "__dataclass_fields__"):
return asdict(obj)
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")


def canonical_json(data: Any) -> str:
"""Serialize data to canonical JSON: sorted keys, compact, deterministic."""
return json.dumps(
data,
sort_keys=True,
separators=(",", ":"),
ensure_ascii=True,
default=_json_default,
)


def compute_hash(data: Any) -> str:
"""Compute SHA-256 hash of canonicalized data. Returns 'sha256:<64 hex chars>'."""
raw = canonical_json(data)
digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()
return f"sha256:{digest}"
19 changes: 19 additions & 0 deletions src/layerlens/attestation/_signing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""HMAC-SHA256 signing for attestation envelopes."""

from __future__ import annotations

import hmac as hmac_mod
import base64
import hashlib


def hmac_sign(secret: bytes, data: bytes) -> str:
"""Sign data with HMAC-SHA256, returning a base64-encoded signature."""
sig = hmac_mod.new(secret, data, hashlib.sha256).digest()
return base64.b64encode(sig).decode("ascii")


def hmac_verify(secret: bytes, data: bytes, signature: str) -> bool:
"""Verify a base64-encoded HMAC-SHA256 signature. Timing-safe."""
expected = hmac_sign(secret, data)
return hmac_mod.compare_digest(signature, expected)
157 changes: 157 additions & 0 deletions src/layerlens/attestation/_verify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

from typing import Any, Dict, List, Optional
from dataclasses import field, dataclass

from ._hash import compute_hash
from ._signing import hmac_verify
from ._envelope import HashScope, AttestationEnvelope


@dataclass
class ChainVerification:
"""Result of verifying a hash chain's integrity."""

valid: bool
break_index: Optional[int] = None
error: Optional[str] = None


@dataclass
class TrialVerification:
"""Result of verifying a full trial: chain + root hash + signatures."""

valid: bool
chain_valid: bool = True
trial_hash_valid: bool = True
signatures_valid: bool = True
errors: List[str] = field(default_factory=list)


@dataclass
class TamperingResult:
"""Result of checking whether trace data was modified after hashing."""

tampered: bool
modified_indices: List[int] = field(default_factory=list)
chain_broken: bool = False


def verify_chain(envelopes: List[AttestationEnvelope]) -> ChainVerification:
"""Verify that a hash chain is continuous and unbroken.

Checks:
- First envelope has previous_hash=None
- Each subsequent envelope's previous_hash matches the prior envelope's hash
"""
if not envelopes:
return ChainVerification(valid=True)

if envelopes[0].previous_hash is not None:
return ChainVerification(
valid=False,
break_index=0,
error="First envelope must have previous_hash=None",
)

for i in range(1, len(envelopes)):
if envelopes[i].previous_hash != envelopes[i - 1].hash:
return ChainVerification(
valid=False,
break_index=i,
error=f"Chain broken at index {i}: "
f"expected previous_hash={envelopes[i - 1].hash!r}, "
f"got {envelopes[i].previous_hash!r}",
)

return ChainVerification(valid=True)


def verify_trial(
envelopes: List[AttestationEnvelope],
trial_envelope: AttestationEnvelope,
signing_secret: Optional[bytes] = None,
) -> TrialVerification:
"""Verify a trial envelope against its event chain.

Checks chain integrity, trial hash correctness, and (optionally) signatures.
Pass ``signing_secret`` to verify HMAC-SHA256 signatures.
"""
errors: List[str] = []

# 1. Chain continuity
chain_result = verify_chain(envelopes)
chain_valid = chain_result.valid
if not chain_valid:
errors.append(f"Chain integrity failed: {chain_result.error}")

# 2. Trial scope + hash
trial_hash_valid = True
if trial_envelope.scope != HashScope.TRIAL:
trial_hash_valid = False
errors.append(f"Trial envelope has wrong scope: {trial_envelope.scope}")
else:
event_hashes = [e.hash for e in envelopes]
expected_hash = compute_hash({"event_hashes": event_hashes})
if trial_envelope.hash != expected_hash:
trial_hash_valid = False
errors.append("Trial hash does not match event hashes")

# 3. Signatures (only if a signing secret is provided)
signatures_valid = True
if signing_secret is not None:
for i, envelope in enumerate(envelopes):
if not envelope.signature:
signatures_valid = False
errors.append(f"Missing signature on event {i}")
else:
if not hmac_verify(signing_secret, envelope.hash.encode("utf-8"), envelope.signature):
signatures_valid = False
errors.append(f"Invalid signature on event {i}")

if not trial_envelope.signature:
signatures_valid = False
errors.append("Missing signature on trial envelope")
else:
if not hmac_verify(signing_secret, trial_envelope.hash.encode("utf-8"), trial_envelope.signature):
signatures_valid = False
errors.append("Invalid signature on trial envelope")

valid = chain_valid and trial_hash_valid and signatures_valid
return TrialVerification(
valid=valid,
chain_valid=chain_valid,
trial_hash_valid=trial_hash_valid,
signatures_valid=signatures_valid,
errors=errors,
)


def detect_tampering(
envelopes: List[AttestationEnvelope],
original_data: List[Dict[str, Any]],
) -> TamperingResult:
"""Detect which events were modified after being hashed.

Recomputes the hash for each event (using its stored previous_hash
for chain linkage) and compares against the stored hash.
"""
if len(envelopes) != len(original_data):
return TamperingResult(
tampered=True,
chain_broken=True,
)

modified: List[int] = []
for i, (envelope, data) in enumerate(zip(envelopes, original_data)):
payload = {**data, "_previous_hash": envelope.previous_hash}
recomputed = compute_hash(payload)
if recomputed != envelope.hash:
modified.append(i)

chain_result = verify_chain(envelopes)
return TamperingResult(
tampered=len(modified) > 0 or not chain_result.valid,
modified_indices=modified,
chain_broken=not chain_result.valid,
)
Loading
Loading