diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7f726cc..a00b03a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -48,6 +48,8 @@ jobs: python examples/tutorial.py python examples/readme_quickstart.py python examples/trace_export_demo.py + python examples/ocsf_export_demo.py + python examples/trace_replay_demo.py conformance_stub: name: "Weaver Spec Conformance Stub (v0.1.0)" diff --git a/CHANGELOG.md b/CHANGELOG.md index a495022..5bde971 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- **Handle expansions and policy denials are now first-class audit records + (#175).** `ActionTrace` gained an additive `event_type` + (`invoke`/`expand`/`deny`) and `reason_code`. A successful `Kernel.expand()` + records an `expand` event (and expansion Frames now carry a non-empty + `Provenance.principal_id`); a `PolicyDenied` grant records a `deny` event with + the stable reason code before the exception propagates, so `explain()` and the + trace listing answer "who was refused what, when, and why" (I-02). +- **TraceStore query API (#177).** New `TraceQuery` dataclass and pure + `query_traces()` filter by principal, capability, event type, outcome, reason + code, and time window (since-inclusive / until-exclusive) with deterministic + `(invoked_at, action_id)` ordering and pagination. Exposed as + `TraceStore.query()` / `Kernel.query_traces()` and added to + `TraceStoreProtocol`, so the SQLite and JSONL backends share the contract. +- **Programmatic kernel metrics counters (#179).** `Kernel.stats` returns an + immutable `StatsSnapshot` (grants, denials by reason code, invocations, + invocation failures, fallback activations, redaction events, budget downgrades, + handle stores, expansions); `Kernel.reset_stats()` zeroes them. Dependency-free + and lock-guarded — telemetry without exporting the full trail or installing the + `otel` extra. +- **OCSF / OWASP-AOS SIEM export (#176).** `trace_to_ocsf()` / `traces_to_ocsf()` + map any `ActionTrace` (invoke/expand/deny) to OCSF API Activity (class 6003) + events, AOS-enriched, as a pure dependency-free dict construction. See the SIEM + section in `docs/integrations.md` for the field-mapping table and a runnable + recipe (`examples/ocsf_export_demo.py`). +- **Policy-replay regression harness (#213).** `DecisionRecord`, + `record_decision()`, and `replay(records, engine) -> DecisionDiff` re-evaluate a + recorded decision corpus against a candidate policy and report allow→deny, + deny→allow, and reason-code flips deterministically. Rate-limit-dependent flips + are surfaced separately (`DecisionDiff.rate_limited`). + Companion: `examples/trace_replay_demo.py`. + +### Changed +- **Bounded memory for in-memory audit and revocation state (#182).** + `TraceStore` now caps at `max_entries` (default 10 000) with oldest-first + eviction, a one-time warning, and an observable `evicted_count`. The revocation + store tracks each token's expiry and sweeps state for already-expired tokens + (lazily on an interval and via `HMACTokenProvider.sweep_revocations()`), never + un-revoking a live token. `RevocationStoreProtocol.track()` now takes an + `expires_at` argument and the protocol gained `sweep_expired()` (breaking for + custom revocation backends). + ## [0.11.0] - 2026-06-19 ### Fixed diff --git a/Makefile b/Makefile index d92f3a4..4c93992 100644 --- a/Makefile +++ b/Makefile @@ -27,5 +27,7 @@ example: python examples/evaluation_artifact_policy.py python examples/trace_export_demo.py python examples/persistent_audit_demo.py + python examples/ocsf_export_demo.py + python examples/trace_replay_demo.py ci: fmt-check lint type test example diff --git a/docs/architecture.md b/docs/architecture.md index 8140c54..35668fe 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -158,6 +158,51 @@ Records every `ActionTrace`. `explain(action_id)` returns the full audit record. `export_action_trace` / `export_action_traces` serialise traces into a stable, versioned, JSON-serialisable shape for downstream analysis tools (distinct from the OpenTelemetry observability export); `Kernel.list_traces()` is the public accessor that feeds them the audit trail. See [trace_export.md](trace_export.md). +#### Audited event types (#175) + +`ActionTrace.event_type` distinguishes three kinds of audited event, so the +audit trail covers authorization decisions and data-access events, not only +successful invocations (I-02): + +| `event_type` | Recorded when | Notable fields | +|--------------|---------------|----------------| +| `invoke` (default) | A capability invocation runs | `driver_id`, `result_summary`, `handle_id` | +| `expand` | `Kernel.expand()` serves more rows of a handle | `handle_id`, `result_summary`; expansion Frames carry `Provenance.principal_id` | +| `deny` | A `grant_capability()` call is rejected by policy | `reason_code` (stable `DenialReason`), redacted `error`; no token is issued | + +`reason_code` is populated for `deny` events. All three fields are additive with +defaults, so a directly-constructed trace keeps the original `invoke` meaning. + +#### Querying the audit trail (#177) + +`Kernel.query_traces(TraceQuery(...))` (and `TraceStore.query(...)` on any +backend) filters records by `principal_id`, `capability_id`, `event_type`, +`outcome` (`succeeded`/`failed`), `reason_code`, and a `since`/`until` window +(`since` inclusive, `until` exclusive), with `limit`/`offset` pagination. Results +are ordered deterministically by `(invoked_at, action_id)`, so successive pages +over an unchanged store are disjoint and complete. The pure `query_traces()` +function applies the same semantics to any iterable of traces. + +#### Bounded memory (#182) + +The in-memory `TraceStore` caps itself at `max_entries` (default 10 000) and +evicts oldest-first when exceeded; eviction is *loud* (first eviction logs a +warning) and observable via `TraceStore.evicted_count`. Re-recording an existing +`action_id` overwrites in place and never evicts. Deployments needing unbounded +retention should use a durable backend. Revocation state is bounded similarly — +see [Persistence & durable stores](#persistence--durable-stores) and +[security.md](security.md). + +#### Kernel metrics counters (#179) + +`Kernel.stats` returns an immutable `StatsSnapshot` of aggregate counters +(grants, denials by reason code, invocations, invocation failures, fallback +activations, redaction events, budget downgrades, handle stores, expansions); +`Kernel.reset_stats()` zeroes them. The counters are dependency-free and +lock-guarded — cheap health-check telemetry that needs neither a trace export nor +the `otel` extra. They are *aggregates*; the `TraceStore` remains the record of +individual events. + ## Persistence & durable stores The stateful stores are protocol-based seams (`weaver_kernel.stores`), mirroring @@ -189,6 +234,14 @@ an append-only log that is easy to ship to a collector. Use or apply across workers sharing a database file. All durable backends use only the standard library (`sqlite3`, `json`) — no new runtime dependency. +**Bounded revocation state (#182).** Every revocation backend tracks each +token's `expires_at` and can `sweep_expired(now)` to drop bookkeeping for tokens +that have already expired — they fail the verifier's expiry check regardless, so +a sweep never un-revokes a *live* token. The in-memory store sweeps lazily on an +interval; `HMACTokenProvider.sweep_revocations()` triggers it explicitly (call it +on a schedule for durable backends). `RevocationStoreProtocol.track()` therefore +takes an `expires_at` argument and the protocol includes `sweep_expired()`. + **Verifiable audit chain.** Persisted traces are hash-chained (`prev_hash`/`record_hash`, HMAC-SHA256 keyed by `WEAVER_KERNEL_SECRET`). `verify_chain()` detects mutation, insertion, deletion, and reordering; diff --git a/docs/capabilities.md b/docs/capabilities.md index ed38bc4..56b7278 100644 --- a/docs/capabilities.md +++ b/docs/capabilities.md @@ -222,3 +222,42 @@ Both built-in engines support `explain()`. If you bring a custom policy engine that implements only `PolicyEngine.evaluate`, `explain_denial` raises `AgentKernelError` with guidance — implement the `ExplainingPolicyEngine` protocol to enable structured explanations. + +## Validating a policy change with replay (#213) + +A policy edit is the highest-blast-radius change in the system: one rule can +silently widen access or break every agent. The replay harness re-evaluates a +corpus of recorded decisions against a *candidate* policy and reports the +decision diff, so you get a deterministic "what would have changed" answer before +deploying. + +```python +from weaver_kernel import DefaultPolicyEngine, record_decision, replay + +baseline = DefaultPolicyEngine() +# Build a corpus (a real one would come from historical traffic). +records = [ + record_decision(baseline, request, capability, principal, justification="..."), + # ... +] + +diff = replay(records, candidate_engine) +assert diff.empty # replaying against the same engine → no flips +for flip in diff.flips: # allow_to_deny | deny_to_allow | reason_code_change + print(flip.record.capability.capability_id, flip.kind, + flip.baseline_reason_code, "->", flip.candidate_reason_code) +``` + +Determinism and fidelity: + +- Output order is the input record order; replaying records against the engine + that produced them yields `diff.empty`. +- Rate-limit decisions are replay-order-sensitive (the default engine's limiter is + stateful), so flips involving `DenialReason.RATE_LIMITED` are surfaced in + `diff.rate_limited` rather than `diff.flips`. +- Replay validates **policy structure** (role/justification/constraint rules), not + argument-dependent rules whose inputs the audit trail redacts. + +Runnable recipe: [`examples/trace_replay_demo.py`](../examples/trace_replay_demo.py). +This complements shadow mode (live-traffic comparison) and the fixture-based +policy testing framework with real-traffic, pre-deployment coverage. diff --git a/docs/integrations.md b/docs/integrations.md index d02b4c9..d82a857 100644 --- a/docs/integrations.md +++ b/docs/integrations.md @@ -363,6 +363,45 @@ instrument_kernel(kernel) no-op. Use `weaver_kernel.otel.reset_instrumentation(kernel)` in tests to re-instrument with a different provider. +## SIEM export (OCSF / OWASP AOS) + +OpenTelemetry feeds the *observability* pipeline; SIEMs speak **OCSF** (the Open +Cybersecurity Schema Framework), the *security-operations* pipeline. The audit +trail maps to OCSF **API Activity** events (class `6003`), enriched per the OWASP +Agent Observability Standard (AOS), with no new dependency — the mapping is a pure +dict construction. + +```python +from weaver_kernel import traces_to_ocsf + +events = traces_to_ocsf(kernel.list_traces()) # list[dict], OCSF-shaped +# ship `events` to your SIEM (one JSON object per event) +``` + +`trace_to_ocsf(trace)` maps a single record. Runnable recipe: +[`examples/ocsf_export_demo.py`](../examples/ocsf_export_demo.py). + +Field mapping (kernel `ActionTrace` → OCSF API Activity 6003): + +| OCSF field | Source | +|------------|--------| +| `class_uid` / `class_name` | constant `6003` / `"API Activity"` | +| `category_uid` / `category_name` | constant `6` / `"Application Activity"` | +| `activity_id` / `activity_name` | `event_type`: invoke→Other(99), expand→Read(2), deny→Other(99) | +| `type_uid` | `class_uid * 100 + activity_id` | +| `status_id` / `status` | `2`/`Failure` when `error` is set, else `1`/`Success` | +| `status_detail` | `error` (already redacted at record time) | +| `severity_id` / `severity` | deny→Medium(3), else Informational(1) | +| `time` | `invoked_at` as epoch milliseconds (UTC) | +| `actor.user.uid` | `principal_id` | +| `api.operation` / `api.service.name` | `capability_id` / `driver_id` (or `"weaver-kernel"`) | +| `metadata` | product + OCSF version (`OCSF_VERSION`) + AOS extension marker | +| `unmapped` | kernel specifics: `action_id`, `token_id`, `event_type`, `response_mode`, `sensitivity`, `reason_code`, `handle_id`, `result_summary` | + +The mapping is built only from already-redaction-safe trace fields, so exporting +cannot widen the I-01 boundary. AOS is young, so the mapping is versioned and +isolated in `weaver_kernel.ocsf`; output is validated structurally in the tests. + ## Ecosystem integration patterns These reference flows show how agent-kernel composes with neighboring Weaver diff --git a/docs/security.md b/docs/security.md index ea0cb16..3e136f1 100644 --- a/docs/security.md +++ b/docs/security.md @@ -120,6 +120,38 @@ in-memory trace did not already hold and cannot widen the I-01 boundary. The CLI exposes verification to operators: `weaver-kernel audit verify --store audit.db` exits non-zero on any divergence (see [cli.md](cli.md)). +## What the audit trail captures (#175) + +Auditability (I-02) covers authorization decisions and data-access events, not +only successful invocations. Every recorded `ActionTrace` carries an `event_type`: + +- `invoke` — a capability invocation (success or driver failure). +- `expand` — a `Kernel.expand()` data-access event (more rows of a stored + handle). Expansion Frames carry the expanding principal in + `Provenance.principal_id`. +- `deny` — a `grant_capability()` rejected by policy, recorded with the stable + `reason_code` (a `DenialReason`) and a redacted reason message *before* the + `PolicyDenied` exception propagates. + +So `explain()` and `query_traces()` can answer "who was refused what, when, and +why" and "which rows were expanded by whom". Expansion query arguments and denial +messages pass through the same firewall redactor as invocation args, so these new +records never make the trace store a sensitive-data sink. + +## Retention bounding (#182) + +Long-lived processes accumulate one trace per invocation and one revocation entry +per revoked token. Both in-memory structures are bounded: + +- The in-memory `TraceStore` caps at `max_entries` (default 10 000), evicting + oldest-first. Eviction discards audit data, so it is deliberately loud (a + warning on first eviction) and counted (`evicted_count`). For unbounded + retention, use a durable backend. +- Revocation state records each token's expiry and is swept for already-expired + tokens (lazily, and via `HMACTokenProvider.sweep_revocations()`). A sweep never + un-revokes a live token — only entries for tokens that already fail the expiry + check are removed. + ## Security disclaimers > **v0.1 is not production-hardened for real authentication.** diff --git a/docs/trace_export.md b/docs/trace_export.md index 77a2d90..214a522 100644 --- a/docs/trace_export.md +++ b/docs/trace_export.md @@ -134,6 +134,10 @@ envelope = export_action_traces( } ``` +The envelope also carries `event_type` (`invoke`/`expand`/`deny`) and, for +denials, a stable `reason_code` (#175), so an exported trail distinguishes +invocations, handle expansions, and policy denials. + ## Stability `TRACE_EXPORT_VERSION` is bumped only on a **breaking** change to the field @@ -141,3 +145,13 @@ shape. New optional fields may be added without a bump, so consumers should ignore unknown keys. Assert on `status`, `sensitivity`, and the presence of `error` rather than on human-readable strings (the `error` text itself may evolve). + +## Related + +- **Querying the trail:** `Kernel.query_traces(TraceQuery(...))` filters by + principal, capability, event type, outcome, reason code, and time window — see + [architecture.md](architecture.md#querying-the-audit-trail-177). +- **SIEM export:** `traces_to_ocsf()` renders the trail as OCSF/AOS events for a + security pipeline — see + [integrations.md](integrations.md#siem-export-ocsf--owasp-aos). The OTel + observability export is distinct from both (live spans/metrics). diff --git a/examples/ocsf_export_demo.py b/examples/ocsf_export_demo.py new file mode 100644 index 0000000..18abe9e --- /dev/null +++ b/examples/ocsf_export_demo.py @@ -0,0 +1,121 @@ +"""ocsf_export_demo.py — export the audit trail as OCSF/AOS SIEM events (#176). + +Security teams consume agent activity through SIEMs, and OCSF is the schema +those pipelines speak. This script records the three audited event kinds — an +invocation, a policy **denial**, and a handle **expansion** — then maps the +whole audit trail to OCSF API Activity (class 6003) events enriched per the +OWASP Agent Observability Standard, ready to ship to a SIEM. + +It also prints the in-process :class:`KernelStats` snapshot (#179), the cheap +counters that answer "how many grants/denials/expansions happened?" without +exporting anything. + +Everything is offline and deterministic. Run with: +``python examples/ocsf_export_demo.py`` +""" + +from __future__ import annotations + +import asyncio +import json + +from weaver_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + Kernel, + PolicyDenied, + Principal, + SafetyClass, + SensitivityTag, + StaticRouter, + traces_to_ocsf, +) +from weaver_kernel.drivers.memory import InMemoryDriver +from weaver_kernel.models import CapabilityRequest, ImplementationRef + +_SECRET = "example-secret-do-not-use-in-prod" + + +def _build_kernel() -> Kernel: + registry = CapabilityRegistry() + registry.register_many( + [ + Capability( + capability_id="billing.list_invoices", + name="List Invoices", + description="List invoices for a customer", + safety_class=SafetyClass.READ, + sensitivity=SensitivityTag.PII, + impl=ImplementationRef(driver_id="billing", operation="list_invoices"), + ), + Capability( + capability_id="billing.delete_invoice", + name="Delete Invoice", + description="Permanently delete an invoice", + safety_class=SafetyClass.DESTRUCTIVE, + impl=ImplementationRef(driver_id="billing", operation="delete_invoice"), + ), + ] + ) + driver = InMemoryDriver(driver_id="billing") + driver.register_handler( + "list_invoices", + lambda _ctx: [{"id": i, "amount": i * 10.0, "status": "paid"} for i in range(5)], + ) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret=_SECRET), + router=StaticRouter( + routes={"billing.list_invoices": ["billing"], "billing.delete_invoice": ["billing"]} + ), + ) + kernel.register_driver(driver) + return kernel + + +async def main() -> None: + kernel = _build_kernel() + reader = Principal(principal_id="agent-007", roles=["reader"], attributes={"tenant": "acme"}) + + # 1. A successful invocation (event_type="invoke"). + req = CapabilityRequest(capability_id="billing.list_invoices", goal="list invoices") + token = kernel.get_token(req, reader, justification="monthly review") + frame = await kernel.invoke(token, principal=reader, args={"operation": "list_invoices"}) + + # 2. A handle expansion (event_type="expand"). + assert frame.handle is not None + kernel.expand(frame.handle, query={"limit": 2}, principal=reader) + + # 3. A policy denial (event_type="deny") — a reader cannot delete. + deny_req = CapabilityRequest(capability_id="billing.delete_invoice", goal="delete invoice") + try: + kernel.grant_capability(deny_req, reader, justification="cleanup") + except PolicyDenied as exc: + print(f"denied as expected: reason_code={exc.reason_code}") + + # Map the whole audit trail to OCSF/AOS events for a SIEM. + events = traces_to_ocsf(kernel.list_traces()) + print(f"\n{len(events)} OCSF API Activity events:") + print(json.dumps(events, indent=2)) + + # In-process counters (no export required). + stats = kernel.stats + print("\nKernelStats snapshot:") + print( + json.dumps( + { + "grants": stats.grants, + "denials": stats.denials, + "invocations": stats.invocations, + "expansions": stats.expansions, + "handle_stores": stats.handle_stores, + "denials_by_reason": dict(stats.denials_by_reason), + }, + indent=2, + ) + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/trace_replay_demo.py b/examples/trace_replay_demo.py new file mode 100644 index 0000000..93e077b --- /dev/null +++ b/examples/trace_replay_demo.py @@ -0,0 +1,108 @@ +"""trace_replay_demo.py — validate a policy change against recorded decisions (#213). + +Policy edits are the highest-blast-radius change in the kernel: one rule can +silently widen access or break every agent. This script records a corpus of +grant decisions under a *baseline* policy, then replays it against a *candidate* +policy and prints the decision diff — the deterministic "what would have +changed" answer a policy author wants before shipping. + +The candidate here disables ``billing.list_invoices`` while leaving every other +rule intact, so the diff shows exactly one ``allow_to_deny`` flip. Everything is +offline and deterministic. Run with: ``python examples/trace_replay_demo.py`` +""" + +from __future__ import annotations + +from typing import Any + +from weaver_kernel import ( + Capability, + DecisionRecord, + DefaultPolicyEngine, + Principal, + SafetyClass, + record_decision, + replay, +) +from weaver_kernel.errors import PolicyDenied +from weaver_kernel.models import CapabilityRequest +from weaver_kernel.policy_reasons import DenialReason + +_LIST = Capability( + capability_id="billing.list_invoices", + name="List Invoices", + description="List invoices", + safety_class=SafetyClass.READ, +) +_DELETE = Capability( + capability_id="billing.delete_invoice", + name="Delete Invoice", + description="Delete an invoice", + safety_class=SafetyClass.DESTRUCTIVE, +) + +_READER = Principal(principal_id="agent-007", roles=["reader"], attributes={"tenant": "acme"}) + + +class _CandidatePolicy: + """Baseline policy plus a new rule disabling ``billing.list_invoices``.""" + + def __init__(self, base: DefaultPolicyEngine) -> None: + self._base = base + + def evaluate( + self, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str, + ) -> Any: + if capability.capability_id == "billing.list_invoices": + raise PolicyDenied( + "list_invoices retired by 2026-Q3 policy", + reason_code=DenialReason.EXPLICIT_DENY_RULE.value, + ) + return self._base.evaluate(request, capability, principal, justification=justification) + + +def main() -> None: + baseline = DefaultPolicyEngine() + + # Record the baseline decisions (a real corpus would come from history). + records: list[DecisionRecord] = [ + record_decision( + baseline, + CapabilityRequest(capability_id="billing.list_invoices", goal="list"), + _LIST, + _READER, + justification="monthly review", + ), + record_decision( + baseline, + CapabilityRequest(capability_id="billing.delete_invoice", goal="delete"), + _DELETE, + _READER, + justification="cleanup", + ), + ] + print("Baseline decisions:") + for record in records: + verdict = "allow" if record.baseline_allowed else f"deny({record.baseline_reason_code})" + print(f" {record.capability.capability_id:<24} -> {verdict}") + + diff = replay(records, _CandidatePolicy(baseline)) + + print(f"\nReplayed {diff.evaluated} decisions against the candidate policy.") + print(f"Structural flips: {len(diff.flips)}") + for flip in diff.flips: + print( + f" {flip.record.capability.capability_id:<24} {flip.kind}: " + f"{flip.baseline_reason_code or 'allow'} -> {flip.candidate_reason_code or 'allow'}" + ) + if diff.rate_limited: + print(f"(rate-limit-dependent flips, surfaced separately: {len(diff.rate_limited)})") + + +if __name__ == "__main__": + main() diff --git a/src/weaver_kernel/__init__.py b/src/weaver_kernel/__init__.py index 48f8ed8..f51c38d 100644 --- a/src/weaver_kernel/__init__.py +++ b/src/weaver_kernel/__init__.py @@ -27,6 +27,13 @@ from weaver_kernel import HandleStore, TraceStore from weaver_kernel import export_action_trace, export_action_traces + from weaver_kernel import TraceQuery, query_traces + +Observability & SIEM export:: + + from weaver_kernel import KernelStats, StatsSnapshot + from weaver_kernel import trace_to_ocsf, traces_to_ocsf + from weaver_kernel import replay, record_decision, DecisionRecord, DecisionDiff Durable persistence & verifiable audit log:: @@ -141,14 +148,24 @@ ResponseMode, RoutePlan, ToolHints, + TraceEventType, TrustLevel, ) +from .ocsf import AOS_EXTENSION, OCSF_VERSION, trace_to_ocsf, traces_to_ocsf from .otel import OTEL_AVAILABLE, instrument_kernel from .policy import DefaultPolicyEngine, ExplainingPolicyEngine, PolicyEngine from .policy_dsl import DeclarativePolicyEngine, PolicyMatch, PolicyRule from .policy_reasons import AllowReason, DenialReason from .registry import CapabilityRegistry +from .replay import ( + DecisionDiff, + DecisionFlip, + DecisionRecord, + record_decision, + replay, +) from .router import StaticRouter +from .stats import KernelStats, StatsSnapshot from .stores import ( ChainVerificationResult, HandleStoreProtocol, @@ -169,6 +186,7 @@ export_action_trace, export_action_traces, ) +from .trace_query import TraceQuery, query_traces # Single source of truth: read the version from the installed distribution # metadata (the PyPI dist name is ``weaver-kernel``, distinct from the import @@ -208,6 +226,7 @@ "ResponseMode", "RoutePlan", "ActionTrace", + "TraceEventType", "ToolHints", "TrustLevel", # enums @@ -294,6 +313,23 @@ "TRACE_EXPORT_VERSION", "export_action_trace", "export_action_traces", + # trace query (issue #177) + "TraceQuery", + "query_traces", + # kernel metrics counters (issue #179) + "KernelStats", + "StatsSnapshot", + # OCSF / AOS SIEM export (issue #176) + "AOS_EXTENSION", + "OCSF_VERSION", + "trace_to_ocsf", + "traces_to_ocsf", + # policy-replay harness (issue #213) + "DecisionDiff", + "DecisionFlip", + "DecisionRecord", + "record_decision", + "replay", # adapters "AnthropicMiddleware", "OpenAIMiddleware", diff --git a/src/weaver_kernel/handles.py b/src/weaver_kernel/handles.py index 852ff6c..9049e8d 100644 --- a/src/weaver_kernel/handles.py +++ b/src/weaver_kernel/handles.py @@ -349,7 +349,7 @@ def expand( handle=handle, provenance=Provenance( capability_id=handle.capability_id, - principal_id="", + principal_id=principal_id, invoked_at=datetime.datetime.now(tz=datetime.timezone.utc), action_id=action_id, ), diff --git a/src/weaver_kernel/kernel/__init__.py b/src/weaver_kernel/kernel/__init__.py index 08ed5bd..2a20899 100644 --- a/src/weaver_kernel/kernel/__init__.py +++ b/src/weaver_kernel/kernel/__init__.py @@ -15,7 +15,7 @@ from typing import Any, Literal, overload from ..drivers.base import Driver, StreamingDriver -from ..errors import AgentKernelError +from ..errors import AgentKernelError, PolicyDenied from ..federation import TrustPolicy from ..firewall.budget_manager import BudgetManager from ..firewall.transform import Firewall @@ -37,9 +37,12 @@ from ..policy import DefaultPolicyEngine, PolicyEngine from ..registry import CapabilityRegistry from ..router import Router, StaticRouter +from ..stats import KernelStats, StatsSnapshot from ..stores import TraceStoreProtocol from ..tokens import CapabilityToken, HMACTokenProvider, TokenProvider from ..trace import TraceStore +from ..trace_query import TraceQuery +from ._audit import record_denial_trace, record_expansion_trace from ._dry_run import build_dry_run_result from ._federation import ( perform_advertise, @@ -94,6 +97,7 @@ def __init__( self._budget_manager = budget_manager self._drivers: dict[str, Driver] = {} self._kernel_id = kernel_id + self._stats = KernelStats() @property def kernel_id(self) -> str: @@ -134,11 +138,44 @@ def grant_capability( *, justification: str, ) -> CapabilityGrant: - """Evaluate the policy and, if approved, issue a signed token.""" + """Evaluate the policy and, if approved, issue a signed token. + + On a :class:`~weaver_kernel.PolicyDenied` rejection, a ``"deny"`` audit + record (carrying the stable reason code) is written to the trace store + (best-effort) before the exception propagates, so the audit trail answers + "who was refused what, and why" (#175). A trace-store write failure is + logged but never masks the denial. Denials are also counted in + :attr:`stats`. + """ capability = self._registry.get(request.capability_id) - decision = self._policy.evaluate( - request, capability, principal, justification=justification - ) + try: + decision = self._policy.evaluate( + request, capability, principal, justification=justification + ) + except PolicyDenied as exc: + self._stats.on_denial(exc.reason_code) + # The denial is authoritative and already fails closed (no token is + # issued). Recording its audit trace is best-effort: a trace-store + # write failure must never mask the PolicyDenied the caller expects. + try: + record_denial_trace( + capability_id=request.capability_id, + principal_id=principal.principal_id, + reason_code=exc.reason_code, + message=str(exc), + trace_store=self._trace_store, + ) + except Exception: + logger.warning( + "deny_trace_record_failed", + extra={ + "capability_id": request.capability_id, + "principal_id": principal.principal_id, + "reason_code": exc.reason_code, + }, + exc_info=True, + ) + raise audit_id = str(uuid.uuid4()) token = self._token_provider.issue( capability.capability_id, @@ -156,6 +193,7 @@ def grant_capability( "token_id": token.token_id, }, ) + self._stats.on_grant() return CapabilityGrant( request=request, principal=principal, @@ -312,20 +350,40 @@ def expand( grant's persisted constraints (``max_rows``, ``allowed_fields``, ``scope``) or is requested by a different principal. """ + principal_id = principal.principal_id if principal else "" + action_id = str(uuid.uuid4()) logger.info( "expand", extra={ "handle_id": handle.handle_id, "capability_id": handle.capability_id, - "principal_id": principal.principal_id if principal else "", + "principal_id": principal_id, + "action_id": action_id, }, ) - return self._handle_store.expand( + frame = self._handle_store.expand( handle, query=query, - principal_id=principal.principal_id if principal else "", + action_id=action_id, + principal_id=principal_id, max_depth=self._firewall.budgets.max_depth, ) + # A successful expansion is an authorized data-access event — record it + # in the audit trail (I-02) and count it (#175, #179). Unlike the + # best-effort denial trace (a denial is already authoritative and fails + # closed), this record is *not* wrapped: an audit-write failure here + # propagates so a served expansion is never left unaudited (I-02). + record_expansion_trace( + action_id=action_id, + capability_id=handle.capability_id, + principal_id=principal_id, + handle_id=handle.handle_id, + query=query, + frame=frame, + trace_store=self._trace_store, + ) + self._stats.on_expansion() + return frame def explain(self, action_id: str) -> ActionTrace: """Retrieve the audit trace for a past invocation.""" @@ -342,6 +400,30 @@ def list_traces(self) -> list[ActionTrace]: """ return self._trace_store.list_all() + def query_traces(self, query: TraceQuery) -> list[ActionTrace]: + """Return audit records matching *query*, ordered and paginated (#177). + + Operator-facing entry point over the configured trace store's + :meth:`~weaver_kernel.stores.TraceStoreProtocol.query`. Answers + questions like "what did principal X do in the last hour?" or "which + capabilities were denied today?" without iterating store internals. + """ + return self._trace_store.query(query) + + @property + def stats(self) -> StatsSnapshot: + """An immutable snapshot of the kernel's aggregate counters (#179). + + Cheap operational telemetry (grants, denials by reason, invocations, + fallback activations, redaction events, budget downgrades, handle + stores/expansions) that needs no trace export and no optional extra. + """ + return self._stats.snapshot() + + def reset_stats(self) -> None: + """Zero every counter returned by :attr:`stats`.""" + self._stats.reset() + def explain_denial( self, request: CapabilityRequest, diff --git a/src/weaver_kernel/kernel/_audit.py b/src/weaver_kernel/kernel/_audit.py new file mode 100644 index 0000000..c1932c8 --- /dev/null +++ b/src/weaver_kernel/kernel/_audit.py @@ -0,0 +1,99 @@ +"""Audit-record builders for non-invoke events (#175). + +Auditability extends beyond successful invocations to other authorized +data-access events and grant-time authorization decisions, which were previously +only logged. These helpers record two of them as first-class +:class:`~weaver_kernel.models.ActionTrace` entries: a successful handle +*expansion* (``event_type="expand"``, a data-access event) and a grant-time +policy *denial* (``event_type="deny"``, raised by +:meth:`PolicyEngine.evaluate`). So :meth:`Kernel.explain`, the trace query API, +and the audit CLI can answer "who was refused a grant, when, and why" and "which +rows were expanded". + +Scope note: only grant-time denials are recorded here. Expansion-time access +failures (principal/constraint violations raised by ``HandleStore.expand``) +still surface as exceptions and logs, not ``"deny"`` traces. + +The redaction helpers are shared with :mod:`._invoke` so an expansion's query +arguments and a denial's message pass through the same firewall scrub used for +invocation traces — the audit store never becomes a sensitive-data sink. +""" + +from __future__ import annotations + +import datetime +import uuid + +from ..models import ActionTrace, Frame +from ..stores import TraceStoreProtocol +from ._invoke import _frame_result_summary, _redact_args_for_trace, _redact_trace_text + + +def _now() -> datetime.datetime: + return datetime.datetime.now(tz=datetime.timezone.utc) + + +def record_denial_trace( + *, + capability_id: str, + principal_id: str, + reason_code: str | None, + message: str, + trace_store: TraceStoreProtocol, +) -> None: + """Record a ``"deny"`` audit event for a refused grant. + + No token is issued on a denial, so ``token_id`` and ``driver_id`` are empty; + the stable ``reason_code`` and the (redacted) denial ``message`` capture why. + """ + trace_store.record( + ActionTrace( + action_id=str(uuid.uuid4()), + capability_id=capability_id, + principal_id=principal_id, + token_id="", + invoked_at=_now(), + args={}, + response_mode="summary", + driver_id="", + event_type="deny", + reason_code=reason_code, + error=_redact_trace_text(message), + ) + ) + + +def record_expansion_trace( + *, + action_id: str, + capability_id: str, + principal_id: str, + handle_id: str, + query: dict[str, object], + frame: Frame, + trace_store: TraceStoreProtocol, +) -> None: + """Record an ``"expand"`` audit event for a served handle expansion. + + Args mirror the expansion query (redacted like invocation args) and the + redaction-safe result summary is taken from the already-firewalled + expansion :class:`Frame`, so no raw row data enters the trail. + """ + trace_store.record( + ActionTrace( + action_id=action_id, + capability_id=capability_id, + principal_id=principal_id, + token_id="", + invoked_at=_now(), + args=_redact_args_for_trace(capability_id, dict(query)), + response_mode=frame.response_mode, + driver_id="", + handle_id=handle_id, + event_type="expand", + result_summary=_frame_result_summary(frame), + ) + ) + + +__all__ = ["record_denial_trace", "record_expansion_trace"] diff --git a/src/weaver_kernel/kernel/_invoke.py b/src/weaver_kernel/kernel/_invoke.py index 3edee5f..151a3da 100644 --- a/src/weaver_kernel/kernel/_invoke.py +++ b/src/weaver_kernel/kernel/_invoke.py @@ -136,14 +136,20 @@ async def execute_with_fallback( *, ctx: ExecutionContext, log_ctx: dict[str, str], -) -> tuple[RawResult | None, str, Exception | None]: +) -> tuple[RawResult | None, str, Exception | None, bool]: """Iterate the route plan's drivers until one succeeds. Returns: - ``(raw_result, driver_id, last_error)``. ``raw_result`` is - ``None`` if every driver failed. + ``(raw_result, driver_id, last_error, fell_back)``. ``raw_result`` is + ``None`` if every driver failed; ``fell_back`` is ``True`` when at least + one earlier driver raised before the one that ultimately ran (or before + all-failed), so callers can count fallback activations. Only a + ``DriverError`` counts as a failed attempt: a route entry whose driver is + unregistered (``drivers.get(driver_id) is None``) is skipped silently and + does **not** set ``fell_back``. """ last_error: Exception | None = None + failed_attempts = 0 for driver_id in plan.driver_ids: driver = drivers.get(driver_id) if driver is None: @@ -151,15 +157,16 @@ async def execute_with_fallback( try: raw_result = await driver.execute(ctx) logger.debug("driver_success", extra={**log_ctx, "driver_id": driver_id}) - return raw_result, driver_id, None + return raw_result, driver_id, None, failed_attempts > 0 except DriverError as exc: logger.warning( "driver_failure", extra={**log_ctx, "driver_id": driver_id, "error": str(exc)}, ) last_error = exc + failed_attempts += 1 continue - return None, "", last_error + return None, "", last_error, failed_attempts > 0 def record_failure_trace( @@ -284,7 +291,8 @@ async def perform_invoke( constraints=token.constraints, action_id=action_id, ) - raw_result, used_driver_id, last_error = await execute_with_fallback( + downgraded = effective_mode != response_mode + raw_result, used_driver_id, last_error, fell_back = await execute_with_fallback( kernel._driver_map, plan, ctx=ctx, log_ctx=log_ctx ) @@ -304,6 +312,9 @@ async def perform_invoke( trace_store=kernel._traces, sensitivity=capability.sensitivity, ) + kernel._stats.on_invocation( + failed=True, fallback=fell_back, redacted=False, downgraded=downgraded + ) raise DriverError( f"All drivers failed for capability '{token.capability_id}'. Last error: {err_msg}" ) @@ -316,6 +327,7 @@ async def perform_invoke( principal_id=principal.principal_id, constraints=token.constraints, ) + kernel._stats.on_handle_store() reservation_consumed = False try: @@ -349,6 +361,12 @@ async def perform_invoke( trace_store=kernel._traces, sensitivity=capability.sensitivity, ) + kernel._stats.on_invocation( + failed=False, + fallback=fell_back, + redacted=bool(frame.warnings), + downgraded=frame.response_mode != response_mode, + ) logger.info( "invoke_success", extra={ diff --git a/src/weaver_kernel/kernel/_stream.py b/src/weaver_kernel/kernel/_stream.py index 94c3242..e6835c9 100644 --- a/src/weaver_kernel/kernel/_stream.py +++ b/src/weaver_kernel/kernel/_stream.py @@ -96,6 +96,7 @@ async def invoke_stream_impl( fallback_driver_id = driver_id # last non-streaming candidate yielded_any = False + redacted_any = False handle: Handle | None = None last_frame: Frame | None = None try: @@ -110,6 +111,7 @@ async def invoke_stream_impl( action_id=action_id, ): yielded_any = True + redacted_any = redacted_any or bool(frame.warnings) last_frame = frame yield frame else: @@ -125,6 +127,7 @@ async def invoke_stream_impl( principal_id=principal.principal_id, constraints=token.constraints, ) + kernel._stats.on_handle_store() frame = kernel._fw.transform( raw, action_id=action_id, @@ -136,6 +139,7 @@ async def invoke_stream_impl( ) frame = replace(frame, is_final=True) yielded_any = True + redacted_any = redacted_any or bool(frame.warnings) last_frame = frame yield frame finally: @@ -155,6 +159,14 @@ async def invoke_stream_impl( error=None if yielded_any else "stream produced no chunks", ) ) + kernel._stats.on_invocation( + failed=not yielded_any, + fallback=False, + # apply_stream attaches redaction warnings per chunk, so any frame + # (not just the last) may have carried one — count if *any* did. + redacted=redacted_any, + downgraded=initial_mode != response_mode, + ) logger.info( "invoke_stream_end", extra={ diff --git a/src/weaver_kernel/models.py b/src/weaver_kernel/models.py index 453ba7d..e96ed8e 100644 --- a/src/weaver_kernel/models.py +++ b/src/weaver_kernel/models.py @@ -21,6 +21,14 @@ ResponseMode = Literal["summary", "table", "handle_only", "raw"] +TraceEventType = Literal["invoke", "expand", "deny"] +"""Kind of audited event an :class:`ActionTrace` records. + +``"invoke"`` is a capability invocation (the original and default shape), +``"expand"`` is a handle-expansion data-access event, and ``"deny"`` is a +policy denial at grant time. A single homogeneous record type keeps the trace +query, OCSF export, and replay paths uniform across all three (#175).""" + # ── Capability ──────────────────────────────────────────────────────────────── @@ -435,8 +443,25 @@ class ActionTrace: registry lookup. Defaults to :attr:`SensitivityTag.NONE` for traces constructed directly (e.g. in tests) or for non-sensitive capabilities. - Declared last so adding it does not shift the positional ``__init__`` order - of the pre-existing fields (``ActionTrace`` is part of the public API). + Declared (with the fields below) after the original positional arguments so + adding it does not shift their ``__init__`` order (``ActionTrace`` is part + of the public API). + """ + + event_type: TraceEventType = "invoke" + """Which kind of audited event this record describes (#175). + + Defaults to ``"invoke"`` so every pre-existing trace and any record + constructed directly keeps its original meaning. Handle expansions record + ``"expand"`` and policy denials record ``"deny"``. + """ + + reason_code: str | None = None + """Stable :class:`~weaver_kernel.policy_reasons.DenialReason` value for a + ``"deny"`` event (``None`` for invoke/expand events). + + Lets audit consumers branch on *why* a request was refused without parsing + the human-readable :attr:`error` message. """ diff --git a/src/weaver_kernel/ocsf.py b/src/weaver_kernel/ocsf.py new file mode 100644 index 0000000..0d852b8 --- /dev/null +++ b/src/weaver_kernel/ocsf.py @@ -0,0 +1,128 @@ +"""Map :class:`ActionTrace` records to OCSF API Activity events (#176). + +Security teams consume agent activity through SIEMs, and the Open Cybersecurity +Schema Framework (OCSF) is the cross-vendor schema those pipelines speak. This +module renders each audit record as an **OCSF API Activity** event (``class_uid`` +6003), enriched per the OWASP Agent Observability Standard (AOS), which extends +OCSF for agent traces. It complements — does not replace — the OpenTelemetry +export (:mod:`weaver_kernel.otel`, #125): OTel feeds the *observability* +pipeline, OCSF feeds the *security-operations* pipeline. + +The mapping is a **pure, dependency-free** dict construction — no new runtime +dependency, deterministic for identical input. Output is built only from +already-redaction-safe :class:`ActionTrace` fields, so exporting cannot widen the +I-01 firewall boundary. + +Field mapping (kernel → OCSF API Activity 6003) +----------------------------------------------- + +========================== ===================================================== +OCSF field Source +========================== ===================================================== +``class_uid`` / ``class_name`` Constant ``6003`` / ``"API Activity"`` +``category_uid`` / name Constant ``6`` / ``"Application Activity"`` +``activity_id`` / name ``event_type`` maps invoke→Other(99), + expand→Read(2), deny→Other(99) +``type_uid`` ``class_uid * 100 + activity_id`` +``status_id`` / ``status`` ``2``/``Failure`` when ``error`` is set else ``1``/``Success`` +``status_detail`` ``error`` (already redacted) +``severity_id`` / name deny maps to Medium(3) else Informational(1) +``time`` ``invoked_at`` as epoch milliseconds (UTC) +``actor.user.uid`` ``principal_id`` +``api.operation`` ``capability_id`` +``api.service.name`` ``driver_id`` (or ``"weaver-kernel"`` when empty) +``metadata`` product + OCSF version + AOS extension marker +``unmapped`` kernel-specific enrichment (see below) +========================== ===================================================== + +Kernel specifics that have no native OCSF home — ``action_id``, ``token_id``, +``event_type``, ``response_mode``, ``sensitivity``, ``reason_code``, +``handle_id``, ``result_summary`` — are carried under ``unmapped`` so no audit +detail is lost while the top-level shape stays OCSF-valid. +""" + +from __future__ import annotations + +from collections.abc import Iterable + +from .models import ActionTrace + +OCSF_VERSION = "1.3.0" +"""OCSF schema version this mapping targets (carried in ``metadata.version``).""" + +AOS_EXTENSION = "aos" +"""OWASP Agent Observability Standard extension marker (``metadata.extensions``).""" + +_CLASS_UID = 6003 +_CATEGORY_UID = 6 + +# event_type → (activity_id, activity_name) per the table in the module docstring. +_ACTIVITY: dict[str, tuple[int, str]] = { + "invoke": (99, "Other"), + "expand": (2, "Read"), + "deny": (99, "Other"), +} + + +def trace_to_ocsf(trace: ActionTrace) -> dict[str, object]: + """Render *trace* as a single OCSF API Activity (6003) event. + + Args: + trace: A recorded :class:`ActionTrace` (any ``event_type``). + + Returns: + A JSON-serialisable dict in OCSF API Activity shape, AOS-enriched. The + result is deterministic: identical traces produce identical output. + """ + activity_id, activity_name = _ACTIVITY.get(trace.event_type, (99, "Other")) + failed = trace.error is not None + epoch_ms = int(trace.invoked_at.timestamp() * 1000) + severity_id, severity = (3, "Medium") if trace.event_type == "deny" else (1, "Informational") + + return { + "activity_id": activity_id, + "activity_name": activity_name, + "category_uid": _CATEGORY_UID, + "category_name": "Application Activity", + "class_uid": _CLASS_UID, + "class_name": "API Activity", + "type_uid": _CLASS_UID * 100 + activity_id, + "severity_id": severity_id, + "severity": severity, + "status_id": 2 if failed else 1, + "status": "Failure" if failed else "Success", + "status_detail": trace.error, + "time": epoch_ms, + "metadata": { + "version": OCSF_VERSION, + "product": {"name": "weaver-kernel", "vendor_name": "Weaver"}, + "extensions": [{"name": AOS_EXTENSION, "uid": "aos", "version": "draft"}], + }, + "actor": {"user": {"uid": trace.principal_id}}, + "api": { + "operation": trace.capability_id, + "service": {"name": trace.driver_id or "weaver-kernel"}, + }, + "unmapped": { + "action_id": trace.action_id, + "token_id": trace.token_id, + "event_type": trace.event_type, + "response_mode": trace.response_mode, + "sensitivity": trace.sensitivity.value, + "reason_code": trace.reason_code, + "handle_id": trace.handle_id, + "result_summary": trace.result_summary, + }, + } + + +def traces_to_ocsf(traces: Iterable[ActionTrace]) -> list[dict[str, object]]: + """Map an iterable of traces to a list of OCSF API Activity events. + + Order is preserved from *traces* (typically ``Kernel.list_traces()`` / + ``Kernel.query_traces(...)``), so the caller controls ordering. + """ + return [trace_to_ocsf(trace) for trace in traces] + + +__all__ = ["AOS_EXTENSION", "OCSF_VERSION", "trace_to_ocsf", "traces_to_ocsf"] diff --git a/src/weaver_kernel/replay.py b/src/weaver_kernel/replay.py new file mode 100644 index 0000000..f67c4f8 --- /dev/null +++ b/src/weaver_kernel/replay.py @@ -0,0 +1,203 @@ +"""Replay recorded policy decisions against a candidate policy (#213). + +Policy changes are the highest-blast-radius configuration change in the kernel: +one edited rule can silently widen access or break every agent. Shadow mode +compares policies on *live* traffic; this harness covers the *pre-deployment* +gap by replaying a recorded set of grant decisions against a candidate engine +and reporting the **decision diffs** (allow→deny, deny→allow, reason-code +changes), so a policy author gets a deterministic "what would have changed" +answer before shipping. + +A :class:`DecisionRecord` captures the inputs to one +:meth:`~weaver_kernel.PolicyEngine.evaluate` call plus the *baseline* outcome. +:func:`replay` re-evaluates every record against a candidate engine and diffs +against the baseline; replaying records against the engine that produced them +yields an empty diff. + +Fidelity note: the diff validates **policy structure** (role/justification/ +constraint rules), not argument-dependent rules whose inputs the audit trail +redacts. Rate-limit decisions are replay-order-sensitive (the default engine's +limiter is stateful), so flips involving :attr:`DenialReason.RATE_LIMITED` are +surfaced separately in :attr:`DecisionDiff.rate_limited` rather than mixed into +:attr:`DecisionDiff.flips`. +""" + +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass, field +from typing import Literal + +from .errors import PolicyDenied +from .models import Capability, CapabilityRequest, Principal +from .policy import PolicyEngine +from .policy_reasons import DenialReason + +FlipKind = Literal["allow_to_deny", "deny_to_allow", "reason_code_change"] + + +@dataclass(frozen=True, slots=True) +class DecisionRecord: + """One replayable policy decision: evaluate inputs plus the baseline outcome. + + Build these with :func:`record_decision` (which captures the baseline by + evaluating against a known-good engine) or construct them directly from + historical data. + """ + + request: CapabilityRequest + capability: Capability + principal: Principal + justification: str = "" + baseline_allowed: bool = True + baseline_reason_code: str | None = None + + +@dataclass(frozen=True, slots=True) +class DecisionFlip: + """A single decision that changed between baseline and candidate policy.""" + + record: DecisionRecord + kind: FlipKind + baseline_allowed: bool + candidate_allowed: bool + baseline_reason_code: str | None + candidate_reason_code: str | None + + +@dataclass(frozen=True, slots=True) +class DecisionDiff: + """The outcome of replaying records against a candidate policy. + + Attributes: + evaluated: Number of records replayed. + flips: Structural decision changes (allow↔deny, reason-code changes) + excluding rate-limit noise, in input order. + rate_limited: Flips where either side was a rate-limit denial, surfaced + separately because they depend on replay order, not policy structure. + """ + + evaluated: int + flips: list[DecisionFlip] = field(default_factory=list) + rate_limited: list[DecisionFlip] = field(default_factory=list) + + @property + def empty(self) -> bool: + """``True`` when no structural flips were found (rate-limit flips ignored).""" + return not self.flips + + +def _is_rate_limited(code: str | None) -> bool: + return code is not None and code == DenialReason.RATE_LIMITED.value + + +def _evaluate(engine: PolicyEngine, record: DecisionRecord) -> tuple[bool, str | None]: + """Return ``(allowed, reason_code)`` for *record* under *engine*.""" + try: + engine.evaluate( + record.request, + record.capability, + record.principal, + justification=record.justification, + ) + return True, None + except PolicyDenied as exc: + return False, exc.reason_code + + +def record_decision( + engine: PolicyEngine, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str = "", +) -> DecisionRecord: + """Evaluate *engine* once and capture the result as a baseline record. + + Convenience for building a replay corpus from a known-good engine, so a + later :func:`replay` against the same engine yields an empty diff. + """ + allowed, reason_code = _evaluate( + engine, + DecisionRecord( + request=request, + capability=capability, + principal=principal, + justification=justification, + ), + ) + return DecisionRecord( + request=request, + capability=capability, + principal=principal, + justification=justification, + baseline_allowed=allowed, + baseline_reason_code=reason_code, + ) + + +def _classify( + record: DecisionRecord, candidate_allowed: bool, candidate_reason_code: str | None +) -> DecisionFlip | None: + """Return a :class:`DecisionFlip` if the candidate differs, else ``None``.""" + if record.baseline_allowed and not candidate_allowed: + kind: FlipKind = "allow_to_deny" + elif not record.baseline_allowed and candidate_allowed: + kind = "deny_to_allow" + elif ( + not record.baseline_allowed + and not candidate_allowed + and record.baseline_reason_code != candidate_reason_code + ): + kind = "reason_code_change" + else: + return None + return DecisionFlip( + record=record, + kind=kind, + baseline_allowed=record.baseline_allowed, + candidate_allowed=candidate_allowed, + baseline_reason_code=record.baseline_reason_code, + candidate_reason_code=candidate_reason_code, + ) + + +def replay(records: Iterable[DecisionRecord], engine: PolicyEngine) -> DecisionDiff: + """Replay *records* against *engine* and report decision diffs. + + Args: + records: Baseline decisions to replay, in evaluation order. + engine: The candidate policy engine to evaluate against. + + Returns: + A :class:`DecisionDiff`. Ordering is deterministic (input order). Flips + involving a rate-limit denial on either side are placed in + :attr:`DecisionDiff.rate_limited`, never :attr:`DecisionDiff.flips`. + """ + flips: list[DecisionFlip] = [] + rate_limited: list[DecisionFlip] = [] + evaluated = 0 + for record in records: + evaluated += 1 + candidate_allowed, candidate_reason_code = _evaluate(engine, record) + flip = _classify(record, candidate_allowed, candidate_reason_code) + if flip is None: + continue + if _is_rate_limited(record.baseline_reason_code) or _is_rate_limited( + candidate_reason_code + ): + rate_limited.append(flip) + else: + flips.append(flip) + return DecisionDiff(evaluated=evaluated, flips=flips, rate_limited=rate_limited) + + +__all__ = [ + "DecisionDiff", + "DecisionFlip", + "DecisionRecord", + "FlipKind", + "record_decision", + "replay", +] diff --git a/src/weaver_kernel/stats.py b/src/weaver_kernel/stats.py new file mode 100644 index 0000000..29cb8e8 --- /dev/null +++ b/src/weaver_kernel/stats.py @@ -0,0 +1,183 @@ +"""In-process kernel metrics counters (#179). + +A dependency-free, lock-guarded set of aggregate counters that answers cheap +operational questions — "is the firewall actually redacting anything?", "how +often are budgets downgrading responses?", "what's my denial rate by reason?" — +without exporting the full audit trail. This is the "counters first, exporters +second" layering: the counters work everywhere, and the OpenTelemetry exporter +(:mod:`weaver_kernel.otel`, #125) can read them as gauges where that extra is +installed. + +Counters are *aggregates*, never a substitute for :class:`ActionTrace` records: +they hold no principal/capability detail, only totals. Streaming invocations are +counted once per stream; dry runs are not counted (they execute no driver and +record no trace). + +Usage:: + + snapshot = kernel.stats # immutable StatsSnapshot + print(snapshot.redaction_events, snapshot.denials_by_reason) + kernel.reset_stats() # zero the live counters +""" + +from __future__ import annotations + +import threading +from collections.abc import Mapping +from dataclasses import dataclass, field +from types import MappingProxyType + + +@dataclass(frozen=True, slots=True) +class StatsSnapshot: + """Immutable point-in-time copy of the kernel's counters. + + Returned by :meth:`KernelStats.snapshot` (and by ``Kernel.stats``). Integer + fields are plain counts; :attr:`denials_by_reason` is a read-only mapping of + stable :class:`~weaver_kernel.policy_reasons.DenialReason` value → count. + """ + + grants: int = 0 + """Successful capability grants (a signed token was issued).""" + + denials: int = 0 + """Grant attempts rejected by the policy engine (``PolicyDenied``).""" + + invocations: int = 0 + """Capability invocations that produced a Frame (one per stream).""" + + invocation_failures: int = 0 + """Invocations where every routed driver failed (a failure trace was recorded).""" + + fallback_activations: int = 0 + """Invocations where the first routed driver failed and a later one was tried.""" + + redaction_events: int = 0 + """Invocations whose firewalled Frame carried at least one redaction warning.""" + + budget_downgrades: int = 0 + """Invocations whose response mode was downgraded (admin gate or budget pressure).""" + + handle_stores: int = 0 + """Full-result handles created (one per non-``raw`` invocation that stored data).""" + + expansions: int = 0 + """Handle-expansion data-access events served.""" + + denials_by_reason: Mapping[str, int] = field(default_factory=dict) + """Denial counts keyed by stable reason code (``None`` codes counted as ``"unknown"``).""" + + +class KernelStats: + """Live, thread-safe collector behind ``Kernel.stats``. + + Increment methods are called at the kernel's natural choke points (grant, + invoke, fallback, firewall, handle store/expand). Each increment is guarded + by a single lock; contention is negligible at counter granularity. + :meth:`snapshot` returns an immutable :class:`StatsSnapshot`; :meth:`reset` + zeroes every counter. + """ + + __slots__ = ( + "_lock", + "_grants", + "_denials", + "_invocations", + "_invocation_failures", + "_fallback_activations", + "_redaction_events", + "_budget_downgrades", + "_handle_stores", + "_expansions", + "_denials_by_reason", + ) + + def __init__(self) -> None: + self._lock = threading.Lock() + self._grants = 0 + self._denials = 0 + self._invocations = 0 + self._invocation_failures = 0 + self._fallback_activations = 0 + self._redaction_events = 0 + self._budget_downgrades = 0 + self._handle_stores = 0 + self._expansions = 0 + self._denials_by_reason: dict[str, int] = {} + + def on_grant(self) -> None: + """Count a successful grant.""" + with self._lock: + self._grants += 1 + + def on_denial(self, reason_code: str | None) -> None: + """Count a policy denial, bucketed by stable reason code.""" + key = reason_code or "unknown" + with self._lock: + self._denials += 1 + self._denials_by_reason[key] = self._denials_by_reason.get(key, 0) + 1 + + def on_invocation( + self, *, failed: bool, fallback: bool, redacted: bool, downgraded: bool + ) -> None: + """Count one invocation outcome and its firewall/routing side effects. + + Args: + failed: Every routed driver failed (a failure trace was recorded). + fallback: A driver other than the first in the route plan served it. + redacted: The resulting Frame carried at least one redaction warning. + downgraded: The effective response mode differed from the requested one. + """ + with self._lock: + self._invocations += 1 + if failed: + self._invocation_failures += 1 + if fallback: + self._fallback_activations += 1 + if redacted: + self._redaction_events += 1 + if downgraded: + self._budget_downgrades += 1 + + def on_handle_store(self) -> None: + """Count a full-result handle creation.""" + with self._lock: + self._handle_stores += 1 + + def on_expansion(self) -> None: + """Count a handle-expansion data-access event.""" + with self._lock: + self._expansions += 1 + + def snapshot(self) -> StatsSnapshot: + """Return an immutable copy of the current counters.""" + with self._lock: + return StatsSnapshot( + grants=self._grants, + denials=self._denials, + invocations=self._invocations, + invocation_failures=self._invocation_failures, + fallback_activations=self._fallback_activations, + redaction_events=self._redaction_events, + budget_downgrades=self._budget_downgrades, + handle_stores=self._handle_stores, + expansions=self._expansions, + denials_by_reason=MappingProxyType(dict(self._denials_by_reason)), + ) + + def reset(self) -> None: + """Zero every counter.""" + with self._lock: + self._grants = 0 + self._denials = 0 + self._invocations = 0 + self._invocation_failures = 0 + self._fallback_activations = 0 + self._redaction_events = 0 + self._budget_downgrades = 0 + self._handle_stores = 0 + self._expansions = 0 + self._denials_by_reason.clear() + + +__all__ = ["KernelStats", "StatsSnapshot"] diff --git a/src/weaver_kernel/stores/_protocols.py b/src/weaver_kernel/stores/_protocols.py index 56bc344..2533cab 100644 --- a/src/weaver_kernel/stores/_protocols.py +++ b/src/weaver_kernel/stores/_protocols.py @@ -10,9 +10,11 @@ from __future__ import annotations +import datetime from typing import Protocol, runtime_checkable from ..models import ActionTrace +from ..trace_query import TraceQuery @runtime_checkable @@ -36,6 +38,15 @@ def list_all(self) -> list[ActionTrace]: """Return all recorded traces in insertion order.""" ... + def query(self, query: TraceQuery) -> list[ActionTrace]: + """Return traces matching *query*, ordered and paginated (#177). + + Ordering and pagination follow + :func:`~weaver_kernel.trace_query.query_traces` so every backend exposes + the same filter semantics. + """ + ... + @runtime_checkable class RevocationStoreProtocol(Protocol): @@ -54,8 +65,16 @@ def revoke(self, token_id: str) -> None: """Revoke a single token. Idempotent.""" ... - def track(self, principal_id: str, token_id: str) -> None: - """Record that *token_id* was issued to *principal_id* (for ``revoke_all``).""" + def track(self, principal_id: str, token_id: str, expires_at: datetime.datetime) -> None: + """Record that *token_id* was issued to *principal_id* (for ``revoke_all``). + + Args: + principal_id: The principal the token was issued to. + token_id: The issued token's id. + expires_at: The token's expiry. Lets :meth:`sweep_expired` drop + bookkeeping for tokens that can no longer verify, bounding memory + growth in long-lived processes (#182). + """ ... def revoke_principal(self, principal_id: str) -> int: @@ -67,6 +86,20 @@ def revoke_principal(self, principal_id: str) -> int: """ ... + def sweep_expired(self, now: datetime.datetime) -> int: + """Drop revocation/tracking state for tokens already expired at *now*. + + An expired token fails verification on its own (expiry is checked before + the revocation list is even consulted), so retaining its revocation entry + no longer changes any decision — but it would otherwise accumulate + forever. Removing it bounds memory without ever un-revoking a *live* + token (#182). + + Returns: + The number of tracked tokens whose state was removed. + """ + ... + @runtime_checkable class HandleStoreProtocol(Protocol): diff --git a/src/weaver_kernel/stores/_trace_codec.py b/src/weaver_kernel/stores/_trace_codec.py index 20d975b..34ac098 100644 --- a/src/weaver_kernel/stores/_trace_codec.py +++ b/src/weaver_kernel/stores/_trace_codec.py @@ -45,6 +45,8 @@ def decode_trace(payload: dict[str, Any]) -> ActionTrace: error=payload.get("error"), result_summary=payload.get("result_summary"), sensitivity=SensitivityTag(payload.get("sensitivity", "NONE")), + event_type=payload.get("event_type", "invoke"), + reason_code=payload.get("reason_code"), ) except KeyError as exc: raise AgentKernelError( diff --git a/src/weaver_kernel/stores/jsonl.py b/src/weaver_kernel/stores/jsonl.py index 38bc189..b60e77a 100644 --- a/src/weaver_kernel/stores/jsonl.py +++ b/src/weaver_kernel/stores/jsonl.py @@ -16,6 +16,7 @@ from .._secrets import resolve_hmac_secret from ..errors import AgentKernelError from ..models import ActionTrace +from ..trace_query import TraceQuery, query_traces from ._trace_codec import decode_trace, encode_trace from .audit_chain import ( GENESIS_HASH, @@ -104,6 +105,15 @@ def list_all(self) -> list[ActionTrace]: """Return all traces in append order.""" return [decode_trace(record.trace) for record in self._iter_records()] + def query(self, query: TraceQuery) -> list[ActionTrace]: + """Return traces matching *query* (#177). + + Filters the decoded append-only log via the shared + :func:`~weaver_kernel.trace_query.query_traces` so semantics match the + in-memory and SQLite backends. + """ + return query_traces(self.list_all(), query) + # ── Audit chain (issue #127) ────────────────────────────────────────────── def list_records(self) -> list[TraceRecord]: diff --git a/src/weaver_kernel/stores/memory.py b/src/weaver_kernel/stores/memory.py index c469b5e..bbaa361 100644 --- a/src/weaver_kernel/stores/memory.py +++ b/src/weaver_kernel/stores/memory.py @@ -3,13 +3,25 @@ This is the behaviour previously inlined in ``HMACTokenProvider`` (a revoked-id set plus a principal→token-ids index, guarded by a lock), extracted behind :class:`~weaver_kernel.stores.RevocationStoreProtocol` so a durable backend can -be swapped in. Semantics are unchanged. +be swapped in. + +Memory is bounded (#182): every tracked token carries its ``expires_at``, and +state for already-expired tokens is swept — lazily on an interval (mirroring the +``HandleStore`` eviction pattern) and explicitly via :meth:`sweep_expired`. A +sweep never un-revokes a *live* token: only entries whose token has already +expired (and therefore fails verification on the expiry check regardless) are +removed. """ from __future__ import annotations +import datetime import threading +# Run a lazy expiry sweep once every this many ``track`` calls, amortising the +# cost across issuance the way ``HandleStore`` amortises handle eviction. +_SWEEP_INTERVAL = 256 + class InMemoryRevocationStore: """Process-local revocation list. State is lost on restart. @@ -21,6 +33,8 @@ class InMemoryRevocationStore: def __init__(self) -> None: self._revoked: set[str] = set() self._principal_tokens: dict[str, set[str]] = {} + self._expiry: dict[str, datetime.datetime] = {} + self._track_count = 0 self._lock = threading.Lock() def is_revoked(self, token_id: str) -> bool: @@ -33,10 +47,21 @@ def revoke(self, token_id: str) -> None: with self._lock: self._revoked.add(token_id) - def track(self, principal_id: str, token_id: str) -> None: - """Record that *token_id* was issued to *principal_id*.""" + def track(self, principal_id: str, token_id: str, expires_at: datetime.datetime) -> None: + """Record that *token_id* was issued to *principal_id*. + + A naive *expires_at* is treated as UTC (consistent with + :class:`~weaver_kernel.stores.SQLiteRevocationStore`) so the sweep never + compares naive and aware datetimes. + """ + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=datetime.timezone.utc) with self._lock: self._principal_tokens.setdefault(principal_id, set()).add(token_id) + self._expiry[token_id] = expires_at + self._track_count += 1 + if self._track_count % _SWEEP_INTERVAL == 0: + self._sweep_locked(datetime.datetime.now(tz=datetime.timezone.utc)) def revoke_principal(self, principal_id: str) -> int: """Revoke every tracked token for *principal_id*. @@ -51,3 +76,34 @@ def revoke_principal(self, principal_id: str) -> int: # Drop the index entry; new tokens for this principal start fresh. self._principal_tokens.pop(principal_id, None) return len(newly_revoked) + + def sweep_expired(self, now: datetime.datetime) -> int: + """Drop revocation/tracking state for tokens expired at *now*. + + A naive *now* is treated as UTC, matching ``track`` and the durable + backends, so the comparison never mixes naive and aware datetimes. + + Returns: + The number of tracked tokens whose state was removed. + """ + if now.tzinfo is None: + now = now.replace(tzinfo=datetime.timezone.utc) + with self._lock: + return self._sweep_locked(now) + + def _sweep_locked(self, now: datetime.datetime) -> int: + """Remove expired tokens' state. Caller must hold ``self._lock``. + + Iterates in sorted ``token_id`` order so the sweep is deterministic. + """ + expired = sorted(tid for tid, exp in self._expiry.items() if exp <= now) + for token_id in expired: + self._expiry.pop(token_id, None) + self._revoked.discard(token_id) + if expired: + expired_set = set(expired) + for principal_id in list(self._principal_tokens): + self._principal_tokens[principal_id] -= expired_set + if not self._principal_tokens[principal_id]: + del self._principal_tokens[principal_id] + return len(expired) diff --git a/src/weaver_kernel/stores/sqlite.py b/src/weaver_kernel/stores/sqlite.py index 798498d..c8a7b51 100644 --- a/src/weaver_kernel/stores/sqlite.py +++ b/src/weaver_kernel/stores/sqlite.py @@ -22,6 +22,7 @@ from .._secrets import resolve_hmac_secret from ..errors import AgentKernelError from ..models import ActionTrace +from ..trace_query import TraceQuery, query_traces from ._trace_codec import decode_trace, encode_trace from .audit_chain import ( GENESIS_HASH, @@ -165,6 +166,16 @@ def list_all(self) -> list[ActionTrace]: for r in rows ] + def query(self, query: TraceQuery) -> list[ActionTrace]: + """Return traces matching *query* (#177). + + Decodes the chain and filters in Python via the shared + :func:`~weaver_kernel.trace_query.query_traces`, so every backend shares + identical semantics. O(n); acceptable until a pushed-down SQL filter is + warranted by volume. + """ + return query_traces(self.list_all(), query) + # ── Audit chain (issue #127) ────────────────────────────────────────────── def list_records(self) -> list[TraceRecord]: @@ -240,6 +251,12 @@ def __init__(self, path: str | Path) -> None: "principal_id TEXT NOT NULL, token_id TEXT NOT NULL, " "PRIMARY KEY (principal_id, token_id))" ) + # Token expiry, so sweep_expired can bound growth without un-revoking a + # live token (#182). Separate table keeps the existing schema intact. + self._conn.execute( + "CREATE TABLE IF NOT EXISTS token_expiry (" + "token_id TEXT PRIMARY KEY, expires_at TEXT NOT NULL)" + ) self._conn.commit() def is_revoked(self, token_id: str) -> bool: @@ -255,13 +272,20 @@ def revoke(self, token_id: str) -> None: self._conn.execute("INSERT OR IGNORE INTO revoked (token_id) VALUES (?)", (token_id,)) self._conn.commit() - def track(self, principal_id: str, token_id: str) -> None: - """Record that *token_id* was issued to *principal_id*.""" + def track(self, principal_id: str, token_id: str, expires_at: datetime.datetime) -> None: + """Record that *token_id* was issued to *principal_id*, with its expiry.""" + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=datetime.timezone.utc) + expiry_iso = expires_at.astimezone(datetime.timezone.utc).isoformat() with self._lock: self._conn.execute( "INSERT OR IGNORE INTO principal_tokens (principal_id, token_id) VALUES (?, ?)", (principal_id, token_id), ) + self._conn.execute( + "INSERT OR REPLACE INTO token_expiry (token_id, expires_at) VALUES (?, ?)", + (token_id, expiry_iso), + ) self._conn.commit() def revoke_principal(self, principal_id: str) -> int: @@ -294,6 +318,33 @@ def revoke_principal(self, principal_id: str) -> int: self._conn.commit() return len(newly) + def sweep_expired(self, now: datetime.datetime) -> int: + """Drop revocation/tracking state for tokens expired at *now* (#182). + + Removes only entries whose token has already expired (and therefore + fails the verifier's expiry check regardless), so a revoked but unexpired + token is never un-revoked. + + Returns: + The number of tracked tokens whose state was removed. + """ + if now.tzinfo is None: + now = now.replace(tzinfo=datetime.timezone.utc) + cutoff = now.astimezone(datetime.timezone.utc).isoformat() + with self._lock: + rows = self._conn.execute( + "SELECT token_id FROM token_expiry WHERE expires_at <= ?", (cutoff,) + ).fetchall() + expired = [str(r[0]) for r in rows] + if not expired: + return 0 + params = [(tid,) for tid in expired] + self._conn.executemany("DELETE FROM revoked WHERE token_id = ?", params) + self._conn.executemany("DELETE FROM principal_tokens WHERE token_id = ?", params) + self._conn.executemany("DELETE FROM token_expiry WHERE token_id = ?", params) + self._conn.commit() + return len(expired) + def close(self) -> None: """Close the underlying database connection.""" self._conn.close() diff --git a/src/weaver_kernel/tokens.py b/src/weaver_kernel/tokens.py index f5cfa55..8b3a450 100644 --- a/src/weaver_kernel/tokens.py +++ b/src/weaver_kernel/tokens.py @@ -222,7 +222,7 @@ def issue( audit_id=audit_id, ) token.signature = self._sign(token._signable_payload()) - self._revocation.track(principal_id, token.token_id) + self._revocation.track(principal_id, token.token_id, token.expires_at) logger.debug( "token_issued", extra={ @@ -257,6 +257,24 @@ def revoke_all(self, principal_id: str) -> int: """ return self._revocation.revoke_principal(principal_id) + def sweep_revocations(self, now: datetime.datetime | None = None) -> int: + """Drop revocation bookkeeping for tokens that have already expired. + + Bounds revocation-state growth in long-lived processes (#182). Safe to + call at any time: an expired token fails the verifier's expiry check + regardless, so sweeping its entry never un-revokes a live token. The + in-memory store also sweeps itself lazily; durable backends expose this + for an operator to call on a schedule. + + Args: + now: Reference time; defaults to the current UTC time. + + Returns: + The number of tracked tokens whose state was removed. + """ + when = now or datetime.datetime.now(tz=datetime.timezone.utc) + return self._revocation.sweep_expired(when) + def verify( self, token: CapabilityToken, diff --git a/src/weaver_kernel/trace.py b/src/weaver_kernel/trace.py index 1650c40..390ce08 100644 --- a/src/weaver_kernel/trace.py +++ b/src/weaver_kernel/trace.py @@ -20,11 +20,15 @@ from __future__ import annotations +import logging from collections.abc import Iterable from typing import Any from .errors import AgentKernelError from .models import ActionTrace +from .trace_query import TraceQuery, query_traces + +logger = logging.getLogger("weaver_kernel.trace") # ── Export contract ───────────────────────────────────────────────────────── @@ -57,11 +61,13 @@ def export_action_trace( Returns: A dict with the stable export fields. ``status`` is ``"failed"`` when - the invocation recorded an ``error`` and ``"succeeded"`` otherwise. - (A *denied* request never produces an :class:`ActionTrace` — policy - gates before invocation, per I-02 — so the export only ever describes - authorised invocations; denials surface via - :class:`~weaver_kernel.PolicyDenied` / ``explain_denial``.) + the trace recorded an ``error`` and ``"succeeded"`` otherwise. + ``event_type`` distinguishes ``"invoke"`` (a capability invocation), + ``"expand"`` (a handle-expansion data-access event), and ``"deny"`` (a + policy denial at grant time, carrying a stable ``reason_code``); see + #175. A denial therefore *does* appear in the export as a ``"deny"`` + event with ``status == "failed"`` — its structured form is still + available via :class:`~weaver_kernel.PolicyDenied` / ``explain_denial``. """ return { "action_id": trace.action_id, @@ -74,6 +80,8 @@ def export_action_trace( "handle_id": trace.handle_id, "sensitivity": trace.sensitivity.value, "status": "failed" if trace.error is not None else "succeeded", + "event_type": trace.event_type, + "reason_code": trace.reason_code, "error": trace.error, "args": trace.args, "result_summary": trace.result_summary, @@ -109,23 +117,71 @@ def export_action_traces( } +_DEFAULT_MAX_ENTRIES = 10_000 + + class TraceStore: """Stores :class:`ActionTrace` records indexed by ``action_id``. All invocations recorded by the :class:`~weaver_kernel.kernel.Kernel` are retrievable here for audit and explainability purposes. + + Memory is bounded (#182): a long-lived agent process records one trace per + invocation, so the store caps itself at ``max_entries`` and evicts the + oldest record (insertion order, FIFO) when the cap is exceeded. Eviction + discards audit data, so it is *loud* — the first eviction logs a warning and + :attr:`evicted_count` records how many records were dropped. Deployments + that need unbounded retention should use a durable backend + (:class:`~weaver_kernel.stores.SQLiteTraceStore` / + :class:`~weaver_kernel.stores.JsonlTraceStore`). """ - def __init__(self) -> None: + def __init__(self, *, max_entries: int = _DEFAULT_MAX_ENTRIES) -> None: + """Initialise the store. + + Args: + max_entries: Maximum number of records retained. Must be positive; + defaults high enough (10 000) that typical sessions never evict. + + Raises: + AgentKernelError: If ``max_entries`` is not positive. + """ + if max_entries <= 0: + raise AgentKernelError(f"TraceStore max_entries must be positive, got {max_entries}.") self._traces: dict[str, ActionTrace] = {} + self._max_entries = max_entries + self.evicted_count = 0 + self._eviction_warned = False def record(self, trace: ActionTrace) -> None: - """Store an action trace. + """Store an action trace, evicting the oldest if the cap is exceeded. + + Re-recording an existing ``action_id`` overwrites in place and never + evicts (the record count is unchanged). A genuinely new record beyond + :attr:`max_entries` drops the oldest record first. Args: trace: The :class:`ActionTrace` to record. """ self._traces[trace.action_id] = trace + while len(self._traces) > self._max_entries: + oldest_id = next(iter(self._traces)) + del self._traces[oldest_id] + self.evicted_count += 1 + if not self._eviction_warned: + logger.warning( + "trace_store_eviction", + extra={"max_entries": self._max_entries, "evicted_action_id": oldest_id}, + ) + self._eviction_warned = True + + def query(self, query: TraceQuery) -> list[ActionTrace]: + """Return recorded traces matching *query*, ordered and paginated. + + See :func:`~weaver_kernel.trace_query.query_traces` for ordering and + pagination semantics. + """ + return query_traces(self._traces.values(), query) def get(self, action_id: str) -> ActionTrace: """Retrieve an action trace by its ID. diff --git a/src/weaver_kernel/trace_query.py b/src/weaver_kernel/trace_query.py new file mode 100644 index 0000000..9930749 --- /dev/null +++ b/src/weaver_kernel/trace_query.py @@ -0,0 +1,119 @@ +"""Filtering and pagination over :class:`~weaver_kernel.models.ActionTrace` records. + +The audit trail is the product's flagship artifact, yet per-``action_id`` lookup +is the only access today. :class:`TraceQuery` plus the pure +:func:`query_traces` give operators a small, stable filter surface ("what did +principal X do in the last hour?", "which capabilities failed today?") that +every :class:`~weaver_kernel.stores.TraceStoreProtocol` backend shares (#177). + +The function is pure and deterministic: identical inputs yield identical output, +ordered by ``(invoked_at, action_id)`` so pagination slices are disjoint and +complete (AGENTS.md forbids non-deterministic ordering). +""" + +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Literal + +from .errors import AgentKernelError +from .models import ActionTrace, TraceEventType + +Outcome = Literal["succeeded", "failed"] +"""Coarse invocation outcome: ``"succeeded"`` when no ``error`` was recorded, +``"failed"`` otherwise (a denial records its reason as an ``error``).""" + + +@dataclass(slots=True) +class TraceQuery: + """Filter criteria for :func:`query_traces` / ``TraceStore.query``. + + Every field is optional; unset fields impose no constraint, so an empty + :class:`TraceQuery` matches all records. Filters combine with logical AND. + + Attributes: + principal_id: Exact principal match. + capability_id: Exact capability match. + event_type: Restrict to ``"invoke"`` / ``"expand"`` / ``"deny"`` events. + outcome: ``"succeeded"`` or ``"failed"`` (by presence of ``error``). + reason_code: Exact denial reason-code match (matches ``"deny"`` events). + since: Lower bound on ``invoked_at``, **inclusive**. + until: Upper bound on ``invoked_at``, **exclusive**. + limit: Maximum number of records to return after ordering. ``None`` + means no limit; ``0`` returns an empty list. + offset: Number of leading records to skip after ordering (for pagination). + """ + + principal_id: str | None = None + capability_id: str | None = None + event_type: TraceEventType | None = None + outcome: Outcome | None = None + reason_code: str | None = None + since: datetime | None = None + until: datetime | None = None + limit: int | None = None + offset: int = 0 + + +def _as_utc(value: datetime) -> datetime: + """Treat a naive datetime as UTC (matching the rest of the codebase). + + ``ActionTrace.invoked_at`` is always timezone-aware, so comparing a naive + bound (common when parsing user input) against it would raise ``TypeError``; + normalising the bound to UTC avoids that surprising runtime failure. + """ + return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc) + + +def _matches(trace: ActionTrace, query: TraceQuery) -> bool: + """Return whether *trace* satisfies every set filter on *query*.""" + if query.principal_id is not None and trace.principal_id != query.principal_id: + return False + if query.capability_id is not None and trace.capability_id != query.capability_id: + return False + if query.event_type is not None and trace.event_type != query.event_type: + return False + if query.reason_code is not None and trace.reason_code != query.reason_code: + return False + if query.outcome is not None: + outcome: Outcome = "failed" if trace.error is not None else "succeeded" + if outcome != query.outcome: + return False + if query.since is not None and trace.invoked_at < _as_utc(query.since): + return False + return not (query.until is not None and trace.invoked_at >= _as_utc(query.until)) + + +def query_traces(traces: Iterable[ActionTrace], query: TraceQuery) -> list[ActionTrace]: + """Filter, order, and paginate *traces* per *query*. + + Args: + traces: Records to filter (e.g. ``TraceStore.list_all()``). + query: The filter criteria. + + Returns: + Matching traces ordered by ``(invoked_at, action_id)``, then sliced by + ``offset``/``limit``. The ordering is deterministic, so successive + pages with the same query over an unchanged store are disjoint and + jointly complete. + + Raises: + AgentKernelError: If ``offset`` or ``limit`` is negative. + """ + if query.offset < 0: + raise AgentKernelError(f"TraceQuery.offset must be >= 0, got {query.offset}.") + if query.limit is not None and query.limit < 0: + raise AgentKernelError(f"TraceQuery.limit must be >= 0 or None, got {query.limit}.") + + matched = [trace for trace in traces if _matches(trace, query)] + matched.sort(key=lambda trace: (trace.invoked_at, trace.action_id)) + + sliced = matched[query.offset :] + if query.limit is not None: + sliced = sliced[: query.limit] + return sliced + + +__all__ = ["Outcome", "TraceQuery", "query_traces"] diff --git a/tests/test_firewall_stream.py b/tests/test_firewall_stream.py index bb32722..e0e593b 100644 --- a/tests/test_firewall_stream.py +++ b/tests/test_firewall_stream.py @@ -196,3 +196,37 @@ async def test_kernel_invoke_stream_emits_trace_event() -> None: assert trace.principal_id == "streamer" assert trace.driver_id == "stream-test" assert trace.error is None + + +@pytest.mark.asyncio +async def test_stream_redaction_event_counts_any_frame_not_just_last() -> None: + """Streaming redaction is counted if *any* frame warned, not only the last. + + `apply_stream` holds back a trailing overlap window, so a secret early in a + long stream commits (and warns) in an early frame while the final frame is + clean. The stats increment must reflect that earlier warning (#179 fix). + """ + # Chunk 1: a contiguous email then >256 clean chars, so the email commits + # (and warns) in the first emitted frame. Chunk 2 (final) is clean. + driver = _FakeStreamingDriver( + chunks=[ + {"text": "leaked@example.com " + ("x" * 300)}, + {"text": "all clear", "__is_final__": True}, + ] + ) + kernel, principal = _build_streaming_kernel(driver) + req = CapabilityRequest(capability_id="stream.read", goal="t") + token = kernel.get_token(req, principal, justification="") + + frames: list[Any] = [] + async for frame in kernel.invoke_stream( + token, principal=principal, args={}, response_mode="summary" + ): + frames.append(frame) + + # The secret warned on an earlier frame, and the final frame is clean. + assert frames[0].warnings + assert not frames[-1].warnings + # The secret never leaks, and the redaction is counted exactly once. + assert "leaked@example.com" not in repr([f.facts for f in frames]) + assert kernel.stats.redaction_events == 1 diff --git a/tests/test_handles.py b/tests/test_handles.py index 7526e9a..66fedbd 100644 --- a/tests/test_handles.py +++ b/tests/test_handles.py @@ -336,3 +336,11 @@ def test_negative_offset_rejected(store: HandleStore) -> None: with pytest.raises(HandleConstraintViolation) as exc: store.expand(handle, query={"offset": -1}) assert exc.value.reason_code == DenialReason.INVALID_CONSTRAINT + + +def test_expand_fills_provenance_principal(store: HandleStore) -> None: + """Expansion Frames carry the expanding principal in provenance (#175).""" + handle = _make_handle(store) + frame = store.expand(handle, query={}, action_id="act-9", principal_id="agent-1") + assert frame.provenance.principal_id == "agent-1" + assert frame.provenance.action_id == "act-9" diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 5e303e3..a5b90e1 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -938,3 +938,51 @@ def _fake_session_factory() -> object: # pragma: no cover - never called assert result.operation == "mcp.echo" assert result.capability_id == "mcp.echo" assert result.policy_decision.allowed is True + + +# ── Audit recording for denials and expansions (issue #175) ──────────────────── + + +def test_denied_grant_records_deny_trace(kernel: Kernel, reader_principal: Principal) -> None: + req = CapabilityRequest(capability_id="billing.delete_invoice", goal="delete") + with pytest.raises(PolicyDenied): + kernel.grant_capability(req, reader_principal, justification="nope") + deny_traces = [t for t in kernel.list_traces() if t.event_type == "deny"] + assert len(deny_traces) == 1 + assert deny_traces[0].capability_id == "billing.delete_invoice" + assert deny_traces[0].principal_id == reader_principal.principal_id + assert deny_traces[0].reason_code is not None + # Retrievable via explain(). + assert kernel.explain(deny_traces[0].action_id).event_type == "deny" + + +@pytest.mark.asyncio +async def test_expand_records_expand_trace(kernel: Kernel, reader_principal: Principal) -> None: + req = CapabilityRequest(capability_id="billing.list_invoices", goal="list") + token = kernel.get_token(req, reader_principal, justification="") + frame = await kernel.invoke( + token, principal=reader_principal, args={"operation": "billing.list_invoices"} + ) + assert frame.handle is not None + expanded = kernel.expand(frame.handle, query={"limit": 1}, principal=reader_principal) + assert expanded.provenance.principal_id == reader_principal.principal_id + + expand_traces = [t for t in kernel.list_traces() if t.event_type == "expand"] + assert len(expand_traces) == 1 + assert expand_traces[0].principal_id == reader_principal.principal_id + assert expand_traces[0].handle_id == frame.handle.handle_id + assert expand_traces[0].capability_id == "billing.list_invoices" + + +def test_kernel_query_traces(kernel: Kernel, reader_principal: Principal) -> None: + from weaver_kernel import TraceQuery + + req = CapabilityRequest(capability_id="billing.delete_invoice", goal="delete") + with pytest.raises(PolicyDenied): + kernel.grant_capability(req, reader_principal, justification="nope") + + denied = kernel.query_traces(TraceQuery(event_type="deny")) + assert len(denied) == 1 + assert denied[0].capability_id == "billing.delete_invoice" + # Filtering by a principal who did nothing yields nothing. + assert kernel.query_traces(TraceQuery(principal_id="ghost")) == [] diff --git a/tests/test_ocsf.py b/tests/test_ocsf.py new file mode 100644 index 0000000..9ac568f --- /dev/null +++ b/tests/test_ocsf.py @@ -0,0 +1,157 @@ +"""Tests for the OCSF / AOS SIEM export mapping (issue #176).""" + +from __future__ import annotations + +import datetime +import json + +from weaver_kernel import OCSF_VERSION, SensitivityTag, trace_to_ocsf, traces_to_ocsf +from weaver_kernel.models import ActionTrace + +_T = datetime.datetime(2026, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc) + + +def _trace( + *, + action_id: str = "act-1", + event_type: str = "invoke", + error: str | None = None, + reason_code: str | None = None, + driver_id: str = "billing", +) -> ActionTrace: + return ActionTrace( + action_id=action_id, + capability_id="billing.list_invoices", + principal_id="alice", + token_id="tok-1", + invoked_at=_T, + args={"operation": "billing.list_invoices"}, + response_mode="summary", + driver_id=driver_id, + sensitivity=SensitivityTag.PII, + event_type=event_type, # type: ignore[arg-type] + error=error, + reason_code=reason_code, + ) + + +def _assert_ocsf_valid(event: dict[str, object]) -> None: + """Structural OCSF API Activity (class 6003) validation.""" + assert event["class_uid"] == 6003 + assert event["class_name"] == "API Activity" + assert event["category_uid"] == 6 + assert isinstance(event["activity_id"], int) + assert isinstance(event["activity_name"], str) + assert event["type_uid"] == 6003 * 100 + event["activity_id"] + assert event["status_id"] in (1, 2) + assert event["status"] in ("Success", "Failure") + assert isinstance(event["severity_id"], int) + assert isinstance(event["time"], int) + metadata = event["metadata"] + assert isinstance(metadata, dict) + assert metadata["version"] == OCSF_VERSION + assert metadata["product"]["name"] == "weaver-kernel" + assert any(ext["name"] == "aos" for ext in metadata["extensions"]) + assert isinstance(event["actor"]["user"]["uid"], str) + assert isinstance(event["api"]["operation"], str) + json.dumps(event) # must be JSON-serialisable + + +def test_invoke_success_maps_to_valid_ocsf() -> None: + event = trace_to_ocsf(_trace()) + _assert_ocsf_valid(event) + assert event["status"] == "Success" + assert event["activity_id"] == 99 # invoke → Other + assert event["actor"]["user"]["uid"] == "alice" + assert event["api"]["operation"] == "billing.list_invoices" + + +def test_invoke_failure_maps_to_failure_status() -> None: + event = trace_to_ocsf(_trace(error="All drivers failed", driver_id="")) + _assert_ocsf_valid(event) + assert event["status"] == "Failure" + assert event["status_id"] == 2 + assert event["status_detail"] == "All drivers failed" + assert event["api"]["service"]["name"] == "weaver-kernel" # empty driver_id falls back + + +def test_deny_event_is_medium_severity_failure() -> None: + event = trace_to_ocsf( + _trace( + event_type="deny", + error="denied: missing role", + reason_code="missing_role", + driver_id="", + ) + ) + _assert_ocsf_valid(event) + assert event["severity_id"] == 3 # Medium + assert event["status"] == "Failure" + assert event["unmapped"]["reason_code"] == "missing_role" + assert event["unmapped"]["event_type"] == "deny" + + +def test_expand_event_maps_to_read_activity() -> None: + event = trace_to_ocsf(_trace(event_type="expand", driver_id="")) + _assert_ocsf_valid(event) + assert event["activity_id"] == 2 # Read + assert event["activity_name"] == "Read" + + +def test_time_is_epoch_millis() -> None: + event = trace_to_ocsf(_trace()) + assert event["time"] == int(_T.timestamp() * 1000) + + +def test_golden_mapping_is_deterministic() -> None: + event = trace_to_ocsf(_trace()) + assert event == { + "activity_id": 99, + "activity_name": "Other", + "category_uid": 6, + "category_name": "Application Activity", + "class_uid": 6003, + "class_name": "API Activity", + "type_uid": 600399, + "severity_id": 1, + "severity": "Informational", + "status_id": 1, + "status": "Success", + "status_detail": None, + "time": int(_T.timestamp() * 1000), + "metadata": { + "version": OCSF_VERSION, + "product": {"name": "weaver-kernel", "vendor_name": "Weaver"}, + "extensions": [{"name": "aos", "uid": "aos", "version": "draft"}], + }, + "actor": {"user": {"uid": "alice"}}, + "api": { + "operation": "billing.list_invoices", + "service": {"name": "billing"}, + }, + "unmapped": { + "action_id": "act-1", + "token_id": "tok-1", + "event_type": "invoke", + "response_mode": "summary", + "sensitivity": "PII", + "reason_code": None, + "handle_id": None, + "result_summary": None, + }, + } + + +def test_traces_to_ocsf_preserves_order() -> None: + traces = [_trace(action_id="a"), _trace(action_id="b"), _trace(action_id="c")] + events = traces_to_ocsf(traces) + assert [e["unmapped"]["action_id"] for e in events] == ["a", "b", "c"] + + +def test_no_secret_leaks_through_ocsf() -> None: + # error text is already redaction-safe at record time; the mapping adds no + # raw payload, so a canary placed only in args never reaches the event. + canary = "ZZZ-CANARY-OCSF-SECRET" + trace = _trace() + trace.args["secret"] = canary + assert canary not in json.dumps(trace_to_ocsf(trace)) diff --git a/tests/test_replay.py b/tests/test_replay.py new file mode 100644 index 0000000..be94446 --- /dev/null +++ b/tests/test_replay.py @@ -0,0 +1,113 @@ +"""Tests for the trace-replay regression harness (issue #213).""" + +from __future__ import annotations + +from typing import Any + +from weaver_kernel import ( + Capability, + DecisionRecord, + DefaultPolicyEngine, + Principal, + SafetyClass, + record_decision, + replay, +) +from weaver_kernel.errors import PolicyDenied +from weaver_kernel.models import CapabilityRequest +from weaver_kernel.policy_reasons import DenialReason + + +class _StubEngine: + """Minimal policy engine: always allow, or always deny with a fixed code.""" + + def __init__(self, *, allow: bool, reason_code: str | None = None) -> None: + self._allow = allow + self._reason_code = reason_code + + def evaluate( + self, + request: CapabilityRequest, + capability: Capability, + principal: Principal, + *, + justification: str, + ) -> Any: + if self._allow: + return None + raise PolicyDenied("denied", reason_code=self._reason_code) + + +_READ_CAP = Capability( + capability_id="cap.read", + name="read", + description="read something", + safety_class=SafetyClass.READ, +) +_REQ = CapabilityRequest(capability_id="cap.read", goal="read") +_PRINCIPAL = Principal(principal_id="p1", roles=["reader"], attributes={"tenant": "acme"}) + + +def _records(engine: Any, count: int = 3) -> list[DecisionRecord]: + return [ + record_decision(engine, _REQ, _READ_CAP, _PRINCIPAL, justification="audit") + for _ in range(count) + ] + + +def test_same_engine_yields_empty_diff() -> None: + engine = DefaultPolicyEngine() + records = [ + record_decision(engine, _REQ, _READ_CAP, _PRINCIPAL, justification="audit"), + ] + diff = replay(records, engine) + assert diff.empty + assert diff.flips == [] + assert diff.evaluated == 1 + + +def test_allow_to_deny_flips() -> None: + records = _records(_StubEngine(allow=True)) + diff = replay(records, _StubEngine(allow=False, reason_code="missing_role")) + assert diff.evaluated == 3 + assert len(diff.flips) == 3 + assert all(flip.kind == "allow_to_deny" for flip in diff.flips) + assert diff.flips[0].candidate_reason_code == "missing_role" + assert not diff.empty + + +def test_deny_to_allow_flips() -> None: + records = _records(_StubEngine(allow=False, reason_code="missing_role")) + diff = replay(records, _StubEngine(allow=True)) + assert len(diff.flips) == 3 + assert all(flip.kind == "deny_to_allow" for flip in diff.flips) + + +def test_reason_code_change_flip() -> None: + records = _records(_StubEngine(allow=False, reason_code="missing_role")) + diff = replay(records, _StubEngine(allow=False, reason_code="insufficient_justification")) + assert len(diff.flips) == 3 + assert all(flip.kind == "reason_code_change" for flip in diff.flips) + + +def test_same_reason_code_is_not_a_flip() -> None: + records = _records(_StubEngine(allow=False, reason_code="missing_role")) + diff = replay(records, _StubEngine(allow=False, reason_code="missing_role")) + assert diff.empty + assert diff.flips == [] + + +def test_rate_limited_flips_are_separated() -> None: + records = _records(_StubEngine(allow=True)) + diff = replay(records, _StubEngine(allow=False, reason_code=DenialReason.RATE_LIMITED.value)) + # Rate-limit flips are replay-order-sensitive, so they are surfaced apart + # from structural flips and do not make the structural diff non-empty. + assert diff.flips == [] + assert diff.empty + assert len(diff.rate_limited) == 3 + + +def test_ordering_is_deterministic() -> None: + records = _records(_StubEngine(allow=True), count=5) + diff = replay(records, _StubEngine(allow=False, reason_code="missing_role")) + assert [id(flip.record) for flip in diff.flips] == [id(r) for r in records] diff --git a/tests/test_stats.py b/tests/test_stats.py new file mode 100644 index 0000000..35cffb0 --- /dev/null +++ b/tests/test_stats.py @@ -0,0 +1,172 @@ +"""Tests for KernelStats counters and kernel integration (issue #179).""" + +from __future__ import annotations + +import asyncio +import threading + +import pytest + +from weaver_kernel import ( + Kernel, + KernelStats, + PolicyDenied, + Principal, + StatsSnapshot, +) +from weaver_kernel.models import CapabilityRequest + +# ── Unit: the collector ──────────────────────────────────────────────────────── + + +def test_snapshot_is_immutable_copy() -> None: + stats = KernelStats() + stats.on_grant() + snap = stats.snapshot() + assert isinstance(snap, StatsSnapshot) + stats.on_grant() # mutating the live collector does not change the snapshot + assert snap.grants == 1 + assert stats.snapshot().grants == 2 + + +def test_denials_bucketed_by_reason_code() -> None: + stats = KernelStats() + stats.on_denial("missing_role") + stats.on_denial("missing_role") + stats.on_denial("rate_limited") + stats.on_denial(None) # unknown bucket + snap = stats.snapshot() + assert snap.denials == 4 + assert snap.denials_by_reason["missing_role"] == 2 + assert snap.denials_by_reason["rate_limited"] == 1 + assert snap.denials_by_reason["unknown"] == 1 + + +def test_denials_by_reason_snapshot_is_read_only() -> None: + stats = KernelStats() + stats.on_denial("missing_role") + snap = stats.snapshot() + with pytest.raises(TypeError): + snap.denials_by_reason["missing_role"] = 99 # type: ignore[index] + + +def test_on_invocation_flags() -> None: + stats = KernelStats() + stats.on_invocation(failed=False, fallback=True, redacted=True, downgraded=False) + stats.on_invocation(failed=True, fallback=False, redacted=False, downgraded=True) + snap = stats.snapshot() + assert snap.invocations == 2 + assert snap.invocation_failures == 1 + assert snap.fallback_activations == 1 + assert snap.redaction_events == 1 + assert snap.budget_downgrades == 1 + + +def test_reset_zeroes_all_counters() -> None: + stats = KernelStats() + stats.on_grant() + stats.on_denial("missing_role") + stats.on_expansion() + stats.reset() + snap = stats.snapshot() + assert snap.grants == 0 + assert snap.denials == 0 + assert snap.expansions == 0 + assert dict(snap.denials_by_reason) == {} + + +def test_snapshot_safe_under_concurrent_increments() -> None: + stats = KernelStats() + workers = 8 + per_worker = 1000 + + def hammer() -> None: + for _ in range(per_worker): + stats.on_grant() + + threads = [threading.Thread(target=hammer) for _ in range(workers)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + assert stats.snapshot().grants == workers * per_worker + + +# ── Kernel integration ────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_grant_invoke_expand_counts(kernel: Kernel, reader_principal: Principal) -> None: + req = CapabilityRequest(capability_id="billing.list_invoices", goal="list") + grant = kernel.grant_capability(req, reader_principal, justification="audit") + frame = await kernel.invoke( + grant.token, principal=reader_principal, args={"operation": "billing.list_invoices"} + ) + assert frame.handle is not None + kernel.expand(frame.handle, query={"limit": 1}, principal=reader_principal) + + snap = kernel.stats + assert snap.grants == 1 + assert snap.invocations == 1 + assert snap.handle_stores == 1 + assert snap.expansions == 1 + assert snap.denials == 0 + + +def test_denied_grant_is_counted(kernel: Kernel, reader_principal: Principal) -> None: + # A reader cannot be granted a DESTRUCTIVE capability. + req = CapabilityRequest(capability_id="billing.delete_invoice", goal="delete") + with pytest.raises(PolicyDenied): + kernel.grant_capability(req, reader_principal, justification="nope") + + snap = kernel.stats + assert snap.grants == 0 + assert snap.denials == 1 + assert sum(snap.denials_by_reason.values()) == 1 + + +def test_reset_stats_via_kernel(kernel: Kernel, reader_principal: Principal) -> None: + req = CapabilityRequest(capability_id="billing.list_invoices", goal="list") + kernel.grant_capability(req, reader_principal, justification="audit") + assert kernel.stats.grants == 1 + kernel.reset_stats() + assert kernel.stats.grants == 0 + + +def test_redaction_event_counted_when_frame_warns() -> None: + """An invocation whose Frame carries a redaction warning bumps the counter.""" + from weaver_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + InMemoryDriver, + SafetyClass, + StaticRouter, + ) + + registry = CapabilityRegistry() + registry.register( + Capability( + capability_id="svc.lookup", + name="lookup", + description="lookup", + safety_class=SafetyClass.READ, + ) + ) + driver = InMemoryDriver(driver_id="svc") + # An inline email triggers the firewall's redaction warning. + driver.register_handler("svc.lookup", lambda _ctx: [{"note": "reach me at a@b.invalid"}]) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret="test-secret-do-not-use-in-prod"), + router=StaticRouter(routes={"svc.lookup": ["svc"]}), + ) + kernel.register_driver(driver) + principal = Principal(principal_id="u1", roles=["reader"]) + req = CapabilityRequest(capability_id="svc.lookup", goal="lookup") + grant = kernel.grant_capability(req, principal, justification="audit") + frame = asyncio.run( + kernel.invoke(grant.token, principal=principal, args={"operation": "svc.lookup"}) + ) + assert frame.warnings # redaction happened + assert kernel.stats.redaction_events == 1 diff --git a/tests/test_stores_sqlite.py b/tests/test_stores_sqlite.py index 72f15c7..cada2a5 100644 --- a/tests/test_stores_sqlite.py +++ b/tests/test_stores_sqlite.py @@ -143,8 +143,9 @@ def test_revocation_basic(tmp_path: Path) -> None: def test_revoke_principal_counts_only_newly_revoked(tmp_path: Path) -> None: store = SQLiteRevocationStore(tmp_path / "r.db") - store.track("p1", "t1") - store.track("p1", "t2") + future = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1) + store.track("p1", "t1", future) + store.track("p1", "t2", future) store.revoke("t1") assert store.revoke_principal("p1") == 1 # only t2 newly revoked assert store.is_revoked("t2") @@ -219,3 +220,16 @@ def test_prune_accepts_naive_datetime_as_utc(tmp_path: Path) -> None: pruned = store.prune(before=datetime.datetime(2026, 1, 2, 12)) assert pruned == 2 assert store.verify_chain().ok + + +def test_sqlite_sweep_expired_removes_only_expired(tmp_path: Path) -> None: + store = SQLiteRevocationStore(tmp_path / "r.db") + now = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) + store.track("p1", "live", now + datetime.timedelta(hours=1)) + store.track("p1", "dead", now - datetime.timedelta(hours=1)) + store.revoke("live") + store.revoke("dead") + removed = store.sweep_expired(now) + assert removed == 1 + assert store.is_revoked("live") # unexpired, revoked → still revoked + assert not store.is_revoked("dead") # expired → swept (fails on expiry anyway) diff --git a/tests/test_tokens.py b/tests/test_tokens.py index 304973e..d02600e 100644 --- a/tests/test_tokens.py +++ b/tests/test_tokens.py @@ -2,6 +2,8 @@ from __future__ import annotations +import datetime + import pytest from weaver_kernel import ( @@ -11,6 +13,9 @@ TokenRevoked, TokenScopeError, ) +from weaver_kernel.stores import InMemoryRevocationStore + +_UTC = datetime.timezone.utc @pytest.fixture() @@ -174,3 +179,65 @@ def test_revoked_checked_before_signature(provider: HMACTokenProvider) -> None: tampered = replace(token, signature="invalid-signature") with pytest.raises(TokenRevoked): provider.verify(tampered, expected_principal_id="user-1", expected_capability_id="cap.x") + + +# ── Revocation-state bounding / expiry sweep (issue #182) ──────────────────── + + +def test_sweep_keeps_revoked_but_unexpired_entry() -> None: + store = InMemoryRevocationStore() + now = datetime.datetime(2026, 1, 1, tzinfo=_UTC) + store.track("p1", "t1", now + datetime.timedelta(hours=1)) + store.revoke("t1") + removed = store.sweep_expired(now) + assert removed == 0 + assert store.is_revoked("t1") # a live, revoked token is never un-revoked + + +def test_sweep_removes_expired_entry() -> None: + store = InMemoryRevocationStore() + now = datetime.datetime(2026, 1, 1, tzinfo=_UTC) + store.track("p1", "t1", now - datetime.timedelta(hours=1)) + store.revoke("t1") + removed = store.sweep_expired(now) + assert removed == 1 + assert not store.is_revoked("t1") # expired anyway — verify() fails on expiry + + +def test_provider_sweep_preserves_revoked_unexpired_token(provider: HMACTokenProvider) -> None: + token = provider.issue("cap.x", "user-1", ttl_seconds=3600) + provider.revoke(token.token_id) + # now precedes expiry, so the entry is kept and the token still fails closed. + provider.sweep_revocations(datetime.datetime.now(tz=_UTC)) + with pytest.raises(TokenRevoked): + provider.verify(token, expected_principal_id="user-1", expected_capability_id="cap.x") + + +def test_revocation_state_bounded_under_grant_revoke_loop() -> None: + store = InMemoryRevocationStore() + # Tokens are live (future expiry) when revoked — the normal lifecycle. + expiry = datetime.datetime(2099, 1, 1, tzinfo=_UTC) + for i in range(500): + store.track("p1", f"t{i}", expiry) + store.revoke(f"t{i}") + # Before expiry, a sweep removes nothing (every token is still live). + assert store.sweep_expired(datetime.datetime(2026, 1, 1, tzinfo=_UTC)) == 0 + assert len(store._revoked) == 500 + # Once expired, a single sweep drops all bookkeeping → growth is bounded. + store.sweep_expired(expiry + datetime.timedelta(seconds=1)) + assert store._expiry == {} + assert store._revoked == set() + assert store._principal_tokens == {} + + +def test_track_and_sweep_accept_naive_datetimes() -> None: + """Naive expiry/now are treated as UTC — no naive-vs-aware TypeError.""" + store = InMemoryRevocationStore() + store.track("p1", "t1", datetime.datetime(2099, 1, 1)) # naive expiry + store.revoke("t1") + # Naive 'now' before expiry keeps the (live) revoked token. + assert store.sweep_expired(datetime.datetime(2026, 1, 1)) == 0 + assert store.is_revoked("t1") + # Naive 'now' after expiry removes it. + assert store.sweep_expired(datetime.datetime(2099, 1, 2)) == 1 + assert not store.is_revoked("t1") diff --git a/tests/test_trace.py b/tests/test_trace.py index 5c02ad1..98d2c35 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -61,6 +61,40 @@ def test_list_all() -> None: assert [t.action_id for t in all_traces] == ["act-0", "act-1", "act-2"] +# ── Bounded memory / eviction (issue #182) ────────────────────────────────── + + +def test_store_evicts_oldest_when_capped() -> None: + store = TraceStore(max_entries=2) + for i in range(3): + store.record(_trace(f"act-{i}")) + # Oldest (act-0) was evicted; newest two retained in insertion order. + assert [t.action_id for t in store.list_all()] == ["act-1", "act-2"] + assert store.evicted_count == 1 + + +def test_rerecording_existing_action_id_does_not_evict() -> None: + store = TraceStore(max_entries=2) + store.record(_trace("act-0")) + store.record(_trace("act-1")) + store.record(_trace("act-0")) # overwrite in place — count unchanged + assert [t.action_id for t in store.list_all()] == ["act-0", "act-1"] + assert store.evicted_count == 0 + + +def test_eviction_is_counted_across_many_records() -> None: + store = TraceStore(max_entries=10) + for i in range(25): + store.record(_trace(f"act-{i}")) + assert len(store.list_all()) == 10 + assert store.evicted_count == 15 + + +def test_max_entries_must_be_positive() -> None: + with pytest.raises(AgentKernelError, match="max_entries must be positive"): + TraceStore(max_entries=0) + + def test_explain_returns_consistent_data() -> None: store = TraceStore() t = _trace("act-explain") @@ -111,6 +145,29 @@ def test_export_action_trace_success_shape() -> None: assert exported["correction"] is None +def test_export_includes_event_type_and_reason_code() -> None: + trace = ActionTrace( + action_id="act-deny", + capability_id="billing.delete_invoice", + principal_id="u1", + token_id="", + invoked_at=datetime.datetime.now(tz=datetime.timezone.utc), + args={}, + response_mode="summary", + driver_id="", + event_type="deny", + reason_code="missing_role", + error="denied: missing role", + ) + exported = export_action_trace(trace) + assert exported["event_type"] == "deny" + assert exported["reason_code"] == "missing_role" + # Default invoke trace carries the defaults. + plain = export_action_trace(_trace("act-plain")) + assert plain["event_type"] == "invoke" + assert plain["reason_code"] is None + + def test_export_action_trace_failure_status() -> None: trace = ActionTrace( action_id="act-fail", diff --git a/tests/test_trace_query.py b/tests/test_trace_query.py new file mode 100644 index 0000000..8d3ea2a --- /dev/null +++ b/tests/test_trace_query.py @@ -0,0 +1,165 @@ +"""Tests for the TraceStore query API (issue #177).""" + +from __future__ import annotations + +import datetime + +import pytest + +from weaver_kernel import TraceQuery, TraceStore, query_traces +from weaver_kernel.errors import AgentKernelError +from weaver_kernel.models import ActionTrace + +_BASE = datetime.datetime(2026, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) + + +def _trace( + action_id: str, + *, + principal_id: str = "p1", + capability_id: str = "cap.a", + minute: int = 0, + event_type: str = "invoke", + reason_code: str | None = None, + error: str | None = None, +) -> ActionTrace: + return ActionTrace( + action_id=action_id, + capability_id=capability_id, + principal_id=principal_id, + token_id="tok", + invoked_at=_BASE + datetime.timedelta(minutes=minute), + args={}, + response_mode="summary", + driver_id="d", + event_type=event_type, # type: ignore[arg-type] + reason_code=reason_code, + error=error, + ) + + +def _corpus() -> list[ActionTrace]: + return [ + _trace("a1", principal_id="alice", capability_id="cap.read", minute=0), + _trace("a2", principal_id="bob", capability_id="cap.read", minute=1), + _trace("a3", principal_id="alice", capability_id="cap.write", minute=2, error="boom"), + _trace( + "a4", + principal_id="alice", + capability_id="cap.write", + minute=3, + event_type="deny", + reason_code="missing_role", + error="denied: missing role", + ), + _trace( + "a5", principal_id="alice", capability_id="cap.read", minute=4, event_type="expand" + ), + ] + + +def test_empty_query_matches_all() -> None: + result = query_traces(_corpus(), TraceQuery()) + assert [t.action_id for t in result] == ["a1", "a2", "a3", "a4", "a5"] + + +def test_filter_by_principal() -> None: + result = query_traces(_corpus(), TraceQuery(principal_id="alice")) + assert [t.action_id for t in result] == ["a1", "a3", "a4", "a5"] + + +def test_filter_by_capability() -> None: + result = query_traces(_corpus(), TraceQuery(capability_id="cap.write")) + assert {t.action_id for t in result} == {"a3", "a4"} + + +def test_filter_by_event_type() -> None: + assert [t.action_id for t in query_traces(_corpus(), TraceQuery(event_type="deny"))] == ["a4"] + assert [t.action_id for t in query_traces(_corpus(), TraceQuery(event_type="expand"))] == [ + "a5" + ] + + +def test_filter_by_outcome() -> None: + succeeded = query_traces(_corpus(), TraceQuery(outcome="succeeded")) + failed = query_traces(_corpus(), TraceQuery(outcome="failed")) + assert {t.action_id for t in succeeded} == {"a1", "a2", "a5"} + assert {t.action_id for t in failed} == {"a3", "a4"} + + +def test_filter_by_reason_code() -> None: + result = query_traces(_corpus(), TraceQuery(reason_code="missing_role")) + assert [t.action_id for t in result] == ["a4"] + + +def test_filter_by_time_window_since_inclusive_until_exclusive() -> None: + # since is inclusive (minute 1), until is exclusive (minute 3) → a2, a3 only. + result = query_traces( + _corpus(), + TraceQuery( + since=_BASE + datetime.timedelta(minutes=1), + until=_BASE + datetime.timedelta(minutes=3), + ), + ) + assert [t.action_id for t in result] == ["a2", "a3"] + + +def test_combination_filters_are_anded() -> None: + result = query_traces( + _corpus(), TraceQuery(principal_id="alice", capability_id="cap.read", outcome="succeeded") + ) + assert [t.action_id for t in result] == ["a1", "a5"] + + +def test_ordering_is_deterministic_by_time_then_action_id() -> None: + # Two traces at the same instant must order by action_id. + same = [ + _trace("zzz", minute=5), + _trace("aaa", minute=5), + ] + result = query_traces(same, TraceQuery()) + assert [t.action_id for t in result] == ["aaa", "zzz"] + + +def test_pagination_slices_are_disjoint_and_complete() -> None: + corpus = _corpus() + page1 = query_traces(corpus, TraceQuery(limit=2, offset=0)) + page2 = query_traces(corpus, TraceQuery(limit=2, offset=2)) + page3 = query_traces(corpus, TraceQuery(limit=2, offset=4)) + ids = [t.action_id for t in page1 + page2 + page3] + assert ids == ["a1", "a2", "a3", "a4", "a5"] # disjoint + complete, in order + + +def test_limit_zero_returns_empty() -> None: + assert query_traces(_corpus(), TraceQuery(limit=0)) == [] + + +def test_negative_offset_raises() -> None: + with pytest.raises(AgentKernelError, match="offset must be >= 0"): + query_traces(_corpus(), TraceQuery(offset=-1)) + + +def test_negative_limit_raises() -> None: + with pytest.raises(AgentKernelError, match="limit must be >= 0"): + query_traces(_corpus(), TraceQuery(limit=-5)) + + +def test_empty_store_yields_empty() -> None: + assert query_traces([], TraceQuery(principal_id="alice")) == [] + + +def test_trace_store_query_integration() -> None: + store = TraceStore() + for trace in _corpus(): + store.record(trace) + result = store.query(TraceQuery(principal_id="alice", event_type="invoke")) + assert [t.action_id for t in result] == ["a1", "a3"] + + +def test_naive_time_bounds_are_treated_as_utc() -> None: + # Naive since/until (e.g. from parsing user input) must not raise against + # the always-aware invoked_at; they are interpreted as UTC. + naive_since = datetime.datetime(2026, 1, 1, 12, 1, 0) + naive_until = datetime.datetime(2026, 1, 1, 12, 3, 0) + result = query_traces(_corpus(), TraceQuery(since=naive_since, until=naive_until)) + assert [t.action_id for t in result] == ["a2", "a3"]