diff --git a/VERSION b/VERSION index 42ec10d5..a5f784d9 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.0+2026.03.06T16.07.48.997Z.0bfff3a0.berickson.20260303.copy.paste.reducer +0.1.0+2026.03.11T18.56.10.577Z.2f873b0a.berickson.20260209.merkle.store diff --git a/docs/concepts/merkle_dag_storage.md b/docs/concepts/merkle_dag_storage.md new file mode 100644 index 00000000..44e65017 --- /dev/null +++ b/docs/concepts/merkle_dag_storage.md @@ -0,0 +1,524 @@ +Merkle DAG Log Storage System + +A cryptographically verifiable, append-only log storage system for education event data. Every event is chained into a Merkle DAG so that tampering is detectable, data-subject requests are tractable, and analytical provenance is auditable. + +--- + +## Table of Contents + +1. [Motivation & Design Goals](#motivation--design-goals) +2. [Core Concepts](#core-concepts) + - [Events, Sessions, and Streams](#events-sessions-and-streams) + - [The Merkle Chain](#the-merkle-chain) + - [Parent Streams and Categories](#parent-streams-and-categories) +3. [Quickstart](#quickstart) +4. [Streaming Pipeline Integration](#streaming-pipeline-integration) +5. [API Reference](#api-reference) + - [Merkle](#merkle) + - [Storage Backends](#storage-backends) +6. [Verification & Audit](#verification--audit) + - [What Exactly Is Verified?](#what-exactly-is-verified) + - [Running Verification](#running-verification) + - [Why This Is Safe](#why-this-is-safe) + - [Publishing Root Hashes](#publishing-root-hashes) +7. [Deletion & Tombstones](#deletion--tombstones) +8. [Visualization](#visualization) + - [Graphviz (pydot)](#graphviz-pydot) + - [NetworkX](#networkx) + - [Reading the Graph](#reading-the-graph) +9. [Storage Backends In Depth](#storage-backends-in-depth) +10. [Configuration](#configuration) +11. [Security Model & Threat Analysis](#security-model--threat-analysis) +12. [Prototype Limitations & Production Roadmap](#prototype-limitations--production-roadmap) + +--- + +## Motivation & Design Goals + +We need a log storage layer that satisfies five sometimes-competing requirements: + +| # | Goal | How the Merkle DAG helps | +|---|------|--------------------------| +| 1 | **Scale** to millions of users × millions of events | The only write primitive is "append an item keyed by its hash." This maps to Kafka topics, S3 objects, or any distributed append-only store. | +| 2 | **Data portability** — give a user *all* their data on request | Parent streams index every session a user participated in. Export = walk the parent stream and collect referenced session streams. | +| 3 | **Erasure** — remove or correct data on request | `delete_stream_with_tombstone` removes content but leaves a hash skeleton. Parent references remain structurally valid. | +| 4 | **Archival audit** — prove retained data is unmodified | The final hash of every stream is a commitment to every item. Anyone who recorded that hash can re-derive it and detect changes. | +| 5 | **Computation provenance** (future) | Computation logs will chain into the same DAG, producing a lab-notebook-grade record of every analytical step. | + +--- + +## Core Concepts + +### Events, Sessions, and Streams + +An **event** is an arbitrary JSON-serializable dict — a click, a keystroke, a page view, a server-side derivation, anything. + +A **session** is a dict of category → value(s) that describes the context in which events occur: + +```python +session = { + "student": ["Alice"], + "tool": ["editor"], +} +``` + +While a session is open, events are appended to a **stream** keyed by the canonical JSON serialization of the session dict. When the session is closed, the stream is renamed to the SHA-256 hash of its final item — making the stream **content-addressed**. + +### The Merkle Chain + +Each item in a stream looks like this: + +``` +┌───────────────────────────────────────────────┐ +│ hash: SHA-256(sorted(children) ‖ ts) │ +│ children: [event_hash, prev_hash, ...] │ +│ timestamp: 2025-01-15T08:30:00.123456 │ +│ event: { ... original payload ... } │ +│ label: (optional, for visualization) │ +└───────────────────────────────────────────────┘ +``` + +The `children` list always includes: + +1. **The hash of the event payload** — ties the item to its content. +2. **The hash of the previous item** (if any) — ties the item to the entire preceding chain. +3. **Any extra children** passed by the caller — cross-references to other streams (e.g., continuation links, child-session references). + +Because `hash` is computed *over* the children, changing any ancestor forces every descendant hash to change, which is immediately detectable. + +### Parent Streams and Categories + +The system recognizes a set of **categories** (e.g., `student`, `teacher`, `tool`). When a session is closed, the system automatically appends a `child_session_finished` event to the long-lived parent stream for each category value in the session. + +``` +Session: {"student": ["Alice"], "tool": ["editor"]} + ↓ close_session +Parent stream {"student": "Alice"} ← child_session_finished(hash=abc...) +Parent stream {"tool": "editor"} ← child_session_finished(hash=abc...) +``` + +This creates a **two-level index**: from a category value you can enumerate every session that involved it, and from each session you can retrieve its complete event stream. + +--- + +## Quickstart + +### Installation + +The core module has no dependencies beyond the Python standard library. For visualization, install optional packages: + +```bash +pip install networkx pydot +``` + +### Minimal Example + +```python +import asyncio +from merkle_store import InMemoryStorage, Merkle, CATEGORIES + + +async def main(): + storage = InMemoryStorage() + merkle = Merkle(storage, CATEGORIES) + + session = {"student": ["Alice"], "tool": ["editor"]} + + # 1. Open + await merkle.start(session, metadata={"client_version": "1.2.0"}) + + # 2. Append events + await merkle.event_to_session({"type": "keystroke", "key": "a"}, session) + await merkle.event_to_session({"type": "keystroke", "key": "b"}, session) + await merkle.event_to_session({"type": "submit"}, session) + + # 3. Close — returns the final content hash + final_hash = await merkle.close_session(session) + print(f"Session hash: {final_hash}") + + # 4. Verify + assert await merkle.verify_chain(final_hash) + print("Chain integrity verified ✓") + + +asyncio.run(main()) +``` + +### Using Filesystem Storage + +```python +from merkle_store import FSStorage, Merkle, CATEGORIES + +storage = FSStorage(path="/var/data/merkle_streams") +merkle = Merkle(storage, CATEGORIES) +# ... same async API as above (`await merkle.start(...)`, etc.) ... +``` + +Each stream becomes a JSONL file under `/var/data/merkle_streams/`. The filename is the SHA-256 of the stream key. + +### Streaming Events via `scripts/stream_writing.py` + +The repository includes `scripts/stream_writing.py`, which connects to an event source and pipes events through the Merkle pipeline: + +```bash +python scripts/stream_writing.py \\ + --store fs \\ + --store-path /var/data/merkle_streams \\ + --student alice \\ + --tool editor +``` + +This opens a session, streams every incoming event through `event_to_session`, and closes the session when the source terminates (or on `SIGINT`). + +--- + +## Streaming Pipeline Integration + +In production, the Merkle logger runs inside an async generator pipeline that sits between the event source and downstream consumers (reducers, dashboards, etc.): + +```python +from merkle_store import ( + InMemoryStorage, FSStorage, Merkle, + CATEGORIES, STORES, +) + +# --- bootstrap from config --- +storage_cls = STORES[config["store"]] # "inmemory" or "fs" +storage = storage_cls(**config.get("params", {})) +merkle = Merkle(storage, CATEGORIES) + +session = {"student": [request.student], "tool": [request.tool]} + +# --- async generator that logs and forwards events --- +async def decode_and_log(events): + await merkle.start(session, metadata=metadata) + async for msg in events: + event = msg if isinstance(msg, dict) else json.loads(msg.data) + await merkle.event_to_session(event, session) + yield event # forward downstream + await merkle.close_session(session) +``` + +The pipeline is transparent: downstream consumers receive the same events they would without the Merkle layer. The only side effect is that every event is also appended to the Merkle chain. + +--- + +## API Reference + +### `Merkle` + +| Method | Description | +|--------|-------------| +| `await start(session, metadata=None, continuation_hash=None)` | Open a new session stream. If `continuation_hash` is provided, records it as a `continue` event that links to a prior segment. | +| `await event_to_session(event, session, children=None, label=None)` | Append an event to the running stream. Returns the persisted item dict. | +| `await close_session(session, logical_break=False)` | Append a `close` event, content-address the stream, propagate to parents. Returns the final hash. | +| `await break_session(session)` | Close the current segment and immediately start a continuation. Returns the closed segment's hash. | +| `await verify_chain(stream_key)` | Walk the stream and verify all three invariants (event inclusion, chain linkage, hash correctness). Returns `True` or raises `ValueError`. | +| `await delete_stream_with_tombstone(stream_key, reason)` | Remove event data; leave a tombstone with hash skeleton and reason. Returns the tombstone dict. | + +### Storage Backends + +| Backend | Key | Description | +|---------|-----|-------------| +| `InMemoryStorage` | `inmemory` | Dict-of-lists. Fast, ephemeral. Good for tests. | +| `FSStorage(path)` | `fs` | One JSONL file per stream under `path`. Persistent across restarts. | + +Both implement the `StreamStorage` interface and are registered in the `STORES` dict. + +--- + +## Verification & Audit + +### What Exactly Is Verified? + +`verify_chain` checks three invariants for every item in a stream: + +``` +For item[i]: + 1. SHA-256(canonical_json(item[i].event)) ∈ item[i].children + 2. item[i-1].hash ∈ item[i].children (if i > 0) + 3. item[i].hash == SHA-256(sorted(item[i].children) ‖ item[i].timestamp) +``` + +Together these guarantee: + +| Property | Ensured by | +|----------|------------| +| **Content integrity** — no event payload was modified | Invariant 1 | +| **Completeness** — no item was removed or reordered | Invariant 2 | +| **Hash correctness** — the item's self-reported hash is honest | Invariant 3 | +| **Tamper evidence** — any change to any item propagates to the final hash | Invariants 1 + 2 + 3 together | + +### Running Verification + +```python +try: + await merkle.verify_chain(final_hash) + print("Integrity OK") +except ValueError as e: + print(f"INTEGRITY VIOLATION: {e}") +``` + +### Why This Is Safe + +The security argument rests on **collision resistance** of SHA-256. + +1. **You cannot forge a stream with a different final hash.** The final hash is a commitment to every preceding item. To produce a stream with different content but the same final hash, an attacker must find a SHA-256 collision — computationally infeasible. + +2. **You cannot insert, remove, or reorder items.** Each item's hash includes the previous item's hash. Changing any item forces all subsequent hashes to change, which changes the final hash. A verifier who recorded the original final hash will detect the discrepancy. + +3. **You cannot modify an event payload.** Each item's hash includes the hash of its event. Modifying the event changes its hash, which changes the item's children list, which changes the item's hash, which propagates forward. + +4. **You cannot replay old events at a new time.** The timestamp is an input to the item hash. Same content + different timestamp = different hash. + +5. **Cross-stream references are tamper-evident.** When a session is closed, its final hash is recorded as a child in parent streams. Modifying the session stream changes its final hash, which invalidates the parent's child reference. + +### Publishing Root Hashes + +For third-party auditability, the system is designed to periodically publish **root hashes** — e.g., a daily digest of all parent-stream tip hashes. Once published (to a transparency log, a blockchain, a newspaper, etc.), any party can request the underlying data and verify it against the published root. The Merkle structure means: + +- Verification is efficient: you only need the chain of hashes from the item in question up to the published root. +- Publication is compact: a single 256-bit hash covers an arbitrary volume of data. + +> **Note:** Root hash publication is not yet implemented in the prototype. The per-stream `verify_chain` provides the building block. + +--- + +## Deletion & Tombstones + +### Motivation + +Data-subject erasure requests (GDPR Article 17, CCPA, FERPA, etc.) require removing personal data. Naïvely deleting a stream would break the Merkle DAG — parent streams would reference a hash that no longer resolves. + +### How Tombstones Work + +```python +tombstone = await merkle.delete_stream_with_tombstone( + stream_key=final_hash, + reason="GDPR Article 17 erasure request from guardian", +) +``` + +This: + +1. Reads the stream and records the **ordered list of per-item hashes** and the **final hash**. +2. Deletes the stream data (all event payloads are gone). +3. Writes a **tombstone** to `__tombstone__` containing: + - `deleted_stream` — the stream key. + - `final_hash` — the hash that parent streams reference. + - `item_hashes` — the ordered list of all item hashes (no payloads). + - `item_count` — the number of deleted items. + - `reason` — why the data was deleted. + - `timestamp` — when the deletion occurred. + - `tombstone_hash` — SHA-256 of the above, for the tombstone's own integrity. + +### What Remains After Deletion + +| Retained | Removed | +|----------|---------| +| Tombstone record | All event payloads | +| Per-item hashes (ordered) | All event metadata within items | +| Final stream hash | Timestamps of individual items | +| Deletion reason & timestamp | Session descriptor within events | +| Parent-stream references | Labels | + +An auditor can confirm: +- *That* data existed (the tombstone is present). +- *How much* data existed (`item_count`). +- *When* it was deleted and *why*. +- *That the parent reference is consistent* (parent's `child_hash` matches tombstone's `final_hash`). + +An auditor **cannot**: +- Recover the deleted event content. +- Determine what the events contained. + +--- + +## Visualization + +Visual inspection of the DAG is invaluable for debugging and for explaining the system to stakeholders. Two export formats are supported. + +### Prerequisites + +```bash +pip install networkx pydot +# Graphviz must also be installed at the system level: +# macOS: brew install graphviz +# Ubuntu: sudo apt-get install graphviz +``` + +### Graphviz (pydot) + +```python +dot = storage.to_graphviz() + +# Render to file +dot.write_png("merkle_dag.png") +dot.write_svg("merkle_dag.svg") +dot.write_pdf("merkle_dag.pdf") + +# Get raw DOT source +print(dot.to_string()) +``` + +### NetworkX + +```python +import matplotlib.pyplot as plt + +G = storage.to_networkx() + +labels = {n: d.get("label", n[:8]) for n, d in G.nodes(data=True)} +pos = networkx.spring_layout(G) +networkx.draw(G, pos, labels=labels, with_labels=True, + node_size=1500, font_size=8, arrows=True) +plt.savefig("merkle_dag_nx.png", dpi=150) +plt.show() +``` + +### Reading the Graph + +The exported graph has the following structure: + +``` + ┌──────────┐ ┌──────────┐ ┌──────────┐ + │ start │───▶│ event A │───▶│ event B │───▶ ... + └──────────┘ └──────────┘ └──────────┘ + │ │ + ▼ ▼ + (event hash) (event hash) +``` + +- **Rectangles** are items (nodes). The label shows either the explicit label, a `category:value` summary, or the first 8 hex chars of the hash. +- **Arrows** point from an item to each of its children (the hashes it depends on). This includes: + - The event-content hash (a "leaf" node with no outgoing edges of its own, unless the hash happens to collide with another item — which is astronomically unlikely). + - The previous item in the chain. + - Any cross-stream references (continuation links, child-session links). + +**Parent streams** appear as separate chains. When a session is closed, the parent stream gains a node whose children include the closed session's final hash — visually, an arrow crosses from the parent chain into the session chain. + +**Tombstones** appear as isolated nodes (the original stream's nodes are gone). This makes deleted data visually obvious. + +--- + +## Storage Backends In Depth + +### InMemoryStorage + +- **Structure:** `dict[str, list[dict]]` +- **Thread safety:** single `threading.Lock` +- **Use case:** tests, demos, single-request pipelines +- **Persistence:** none — data lost on process exit + +### FSStorage + +- **Structure:** one JSONL file per stream under a configurable directory +- **Filename mapping:** `SHA-256(stream_key)` → avoids path traversal and special characters +- **Thread safety:** single `threading.Lock` (coarse — sufficient for prototype) +- **Use case:** persistent prototyping, small-scale deployments +- **Persistence:** survives process restarts; no crash-safety guarantees (no fsync) + +### Implementing a Custom Backend + +Subclass `StreamStorage` and implement six methods: + +```python +class KafkaStorage(StreamStorage): + def _append_to_stream(self, stream: str, item: dict): + ... + def _rename_or_alias_stream(self, stream: str, alias: str): + ... + def _get_stream_data(self, stream: str) -> Optional[List[dict]]: + ... + def _delete_stream(self, stream: str): + ... + def _most_recent_item(self, stream: str) -> Optional[dict]: + ... + def _walk(self) -> Iterator[dict]: + ... +``` + +Register it so configuration-driven code can find it: + +```python +from merkle_store import STORES +STORES["kafka"] = KafkaStorage +``` + +--- + +## Configuration + +When used inside the async pipeline, the system is bootstrapped from a feature-flag config dict: + +```json +{ + "store": "fs", + "params": { + "path": "/var/data/merkle_streams" + } +} +``` + +| Key | Type | Description | +|-----|------|-------------| +| `store` | `str` | Backend name: `"inmemory"` or `"fs"` (or any key in `STORES`). | +| `params` | `dict` | Keyword arguments forwarded to the backend constructor. For `FSStorage` this is `{"path": "..."}`. For `InMemoryStorage` it should be `{}` or omitted. | + +The config is read from `settings.feature_flag('merkle')`. If the flag is absent or falsy, the Merkle pipeline is not attached. + +--- + +## Security Model & Threat Analysis + +### Trust Model + +The Merkle DAG provides **tamper evidence**, not **tamper prevention**. The system operator has write access to storage and *could* rewrite data. However: + +| Threat | Mitigation | +|--------|------------| +| **Modify an event after the fact** | Changes the item hash → changes all downstream hashes → changes the final hash. Detectable by anyone who recorded the original final hash. | +| **Delete an item silently** | Breaks chain linkage (invariant 2). `verify_chain` will raise `ValueError`. | +| **Insert a fake item** | Changes all subsequent hashes. Detectable by final-hash comparison. | +| **Reorder items** | Breaks chain linkage. Detectable. | +| **Replace entire stream with fabricated data** | Produces a different final hash. Detectable if the original hash was published or recorded externally. | +| **Delete a stream without tombstone** | Parent streams still reference the old final hash. Attempting to resolve it fails. Absence is detectable. | +| **Forge a tombstone** | The tombstone hash covers the item-hash list. A forged tombstone would need to predict the per-item hashes of the original data, which requires knowledge of every event payload and timestamp — i.e., the data itself. | + +### What You Must Do + +For the security guarantees to hold, at least one of the following must be true: + +1. **Publish root hashes externally** — to a transparency log, blockchain, append-only public ledger, or even a newspaper. This commits the operator to the current state of the DAG. +2. **Share final hashes with data subjects** — so they can independently verify their own data later. +3. **Use an append-only backend** — e.g., a Kafka topic with immutable retention, or S3 with Object Lock. + +Without external commitment, the operator can silently rewrite everything. The Merkle structure makes rewriting *hard* (you must recompute the entire downstream chain), but not *impossible* if no one recorded the original hashes. + +### Hash Algorithm + +SHA-256 is used throughout. At current (2025) understanding: + +- **Collision resistance:** ~2¹²⁸ operations — far beyond feasible computation. +- **Preimage resistance:** ~2²⁵⁶ operations. +- **No known practical attacks.** + +The `HASH_TRUNCATE` setting exists **only** for debugging readability. In production, it must be `None` (full 64-character hex digest). + +--- + +## Prototype Limitations & Production Roadmap + +| Limitation | Production Fix | +|------------|----------------| +| In-memory and single-file-per-stream backends don't scale | Kafka, S3, or database backend | +| `FSStorage` uses `run_in_executor` around blocking file calls | Native async filesystem I/O backend (or move to Kafka/S3/database backends) | +| `FSStorage._most_recent_item` reads the entire file | Maintain an in-memory index or use a database | +| No periodic root-hash publication | Scheduled job that computes and publishes a Merkle root over all parent-stream tips | +| No stream chunking | Break long-lived streams on time or size boundaries using `break_session` | +| No encryption at rest | Encrypt PII-bearing streams; store decryption keys separately | +| No formal schema for events | Define and validate event schemas (JSON Schema, Avro, etc.) | +| Session key is raw canonical JSON | Compute a deterministic session ID via HMAC or structured hashing | +| Single-writer assumption per session | Enforce via distributed locking or partition assignment | +| Tombstones do not propagate to parent streams | Add a `child_deleted` event type to parent streams | +| No integration tests or property-based tests | Hypothesis-based chain-integrity tests, crash-recovery tests | diff --git a/learning_observer/VERSION b/learning_observer/VERSION index 20ad2997..7cff5221 100644 --- a/learning_observer/VERSION +++ b/learning_observer/VERSION @@ -1 +1 @@ -0.1.0+2026.02.27T21.25.37.849Z.3207a114.berickson.20260220.dami.portfolio.pr +0.1.0+2026.03.11T17.20.38.773Z.4715ad81.berickson.20260209.merkle.store diff --git a/learning_observer/learning_observer/incoming_student_event.py b/learning_observer/learning_observer/incoming_student_event.py index 9cad037a..354a732f 100644 --- a/learning_observer/learning_observer/incoming_student_event.py +++ b/learning_observer/learning_observer/incoming_student_event.py @@ -13,7 +13,9 @@ import datetime import inspect import json +import logging import os +import pmss import time import traceback import uuid @@ -39,9 +41,36 @@ import learning_observer.adapters.adapter import learning_observer.blacklist import learning_observer.blob_storage +import learning_observer.merkle_store as merkle_store import learning_observer.constants as constants +logger = logging.getLogger(__name__) + + +def _parse_max_websocket_message_bytes(value): + if isinstance(value, bool): + raise ValueError('incoming_max_websocket_message_bytes must be an integer value.') + try: + parsed = int(value) + except (TypeError, ValueError) as exc: + raise ValueError('incoming_max_websocket_message_bytes must be an integer value.') from exc + if parsed <= 0: + raise ValueError('incoming_max_websocket_message_bytes must be greater than zero.') + return parsed + + +pmss.parser( + 'incoming_max_websocket_message_bytes', + transform=_parse_max_websocket_message_bytes, +) +pmss.register_field( + name='incoming_max_websocket_message_bytes', + type='incoming_max_websocket_message_bytes', + description='Maximum decompressed websocket message size accepted by the incoming event endpoint.', + default=16 * 1024 * 1024, +) + def compile_server_data(request): ''' @@ -235,72 +264,148 @@ def event_decoder_and_logger( request, headers=None, metadata=None, - session={} ): ''' - This is the main event decoder. It is called by the - websocket handler to log events. + Main event decoder / logger factory. - Parameters: - request: The request object. - headers: The header events, which e.g. contain auth - metadata: Metadata about the request, such as IP. This is - extracted from the request, which will go away soon. + Returns an async generator coroutine that: + 1. Immediately begins decoding and yielding events + 2. Buffers decoded events until the Merkle session is initialized + 3. Flushes the buffer once ``initialize_session(student, tool)`` is called + 4. Streams directly into the Merkle chain from that point on + 5. Closes the session cleanly on disconnect / exhaustion - Returns: - A coroutine that decodes and logs events. + Also exposes: + - ``.close`` - force-close the log/session + - ``.initialize_session`` - provide identity once known - We call this after the header events, with the header events in the - `headers` parameter. This is because we want to log the header events - before the body events, so they can be dropped from the Merkle tree - for privacy. Although in most cases, students can be reidentified - from the body events, the header events contain explicit identification - tokens. It is helpful to be able to analyze data with these dropped, - obfuscated, or otherwise anonymized. + If the ``merkle`` feature flag is not set, falls back to the legacy + flat-file logger (which needs no identity). + ''' + merkle = merkle_store.get_merkle_engine() + if merkle is not None: + # ---- Merkle path ------------------------------------------------ - At present, many body events contain auth as well. We'll want to minimize - this and tag those events appropriately. + # --- Deferred session state --- + session = None + session_started = False + session_closed = False + pre_session_buffer = [] - HACK: We would like clean log files for the first classroom pilot. + async def initialize_session(student, tool): + ''' + Called once downstream stages have resolved identity. + Opens the Merkle session and flushes every event that + was buffered before identity was known. - This puts events in per-session files. + Idempotent: subsequent calls are no-ops. + ''' + nonlocal session, session_started + if session_started: + logger.debug( + 'Merkle session already initialized; ignoring duplicate call' + ) + return - The feature flag has the non-hack implementation. - ''' - if merkle_config := settings.feature_flag("merkle"): - import merkle_store - - storage_class = merkle_store.STORES[merkle_config['store']] - params = merkle_config.get("params", {}) - if not isinstance(params, dict): - raise ValueError("Merkle tree params must be a dict (even an empty one)") - storage = storage_class(**params) - merkle_store.Merkle(storage) - session = { - "student": request.student, - "tool": request.tool - } - merkle_store.start(session) + session = { + 'student': [student], + 'tool': [tool], + } + + # Filter sensitive fields out of metadata before persisting + safe_metadata = { + k: v for k, v in (metadata or {}).items() + if k not in ('auth', 'password', 'token') + } + await merkle.start(session, metadata=safe_metadata) + + if headers: + await merkle.event_to_session( + {'type': 'header', 'headers': headers}, + session, + label='headers', + ) - def decode_and_log_event(msg): + # Replay everything we buffered before identity was known + buffer_count = len(pre_session_buffer) + for buffered_event in pre_session_buffer: + # Skip auth-protocol events — they were consumed by the + # auth system and shouldn't be persisted to the chain + if buffered_event.get('_consumed_by_auth'): + continue + await merkle.event_to_session(buffered_event, session) + pre_session_buffer.clear() + + session_started = True + logger.debug( + 'Merkle session initialized for student=%s tool=%s; ' + 'flushed %d buffered events', + student, tool, buffer_count, + ) + + async def close_session(): + '''Close the Merkle session. + + Idempotent: safe to call multiple times (e.g. from both the + generator's ``finally`` block and an explicit ``terminate`` + event). ''' - Decode and store the event in the Merkle tree - ''' - event = json.loads(msg) - merkle_store.event_to_session(event) - return event + nonlocal session_closed + if session_closed: + return + session_closed = True + + if session_started: + try: + await merkle.close_session(session) + except Exception: + logger.exception('Failed to close merkle session') + elif pre_session_buffer: + # Connection died before we ever learned who the student + # was. The events are not lost (they're in the buffer), + # but they never made it into a Merkle chain. + logger.warning( + 'Merkle session closed before initialization; ' + '%d event(s) buffered but never persisted to a session.', + len(pre_session_buffer), + ) + + async def decode_and_log_event(events): + '''Async generator: decode every message, persist to Merkle + (or buffer), yield downstream.''' + try: + async for msg in events: + json_event = ( + msg if isinstance(msg, dict) + else json.loads(msg.data) + ) + + if session_started: + await merkle.event_to_session(json_event, session) + else: + # Identity not yet known — buffer for later flush + pre_session_buffer.append(json_event) + + yield json_event + except Exception: + logger.exception('Error in merkle event pipeline') + raise + finally: + await close_session() + + decode_and_log_event.close = close_session + decode_and_log_event.initialize_session = initialize_session + return decode_and_log_event + + # ---- Legacy flat-file path (unchanged) -------------------------------- global COUNT - # Count + PID should guarantee uniqueness. - # With multi-server installations, we might want to add - # `socket.gethostname()`, but hopefully we'll have our - # Merkle tree logger by then, and this will be obsolete. - filename = "{timestamp}-{ip:-<15}-{hip:-<15}-{session_count:0>10}-{pid}".format( + filename = '{timestamp}-{ip:-<15}-{hip:-<15}-{session_count:0>10}-{pid}'.format( ip=request.remote, hip=request.headers.get('X-Real-IP', ''), timestamp=datetime.datetime.utcnow().isoformat(), session_count=COUNT, - pid=os.getpid() + pid=os.getpid(), ) COUNT += 1 @@ -313,10 +418,6 @@ def close_decoder_logfile(): decoder_log_closed = True async def decode_and_log_event(events): - ''' - Take an aiohttp web sockets message, log it, and return - a clean event. - ''' try: async for msg in events: if isinstance(msg, dict): @@ -326,10 +427,13 @@ async def decode_and_log_event(events): log_event.log_event(json_event, filename=filename) yield json_event finally: - # done processing events, can close logfile now close_decoder_logfile() decode_and_log_event.close = close_decoder_logfile + # No-op so callers don't need to branch on which path is active + async def _noop_init(*args, **kwargs): + pass + decode_and_log_event.initialize_session = _noop_init return decode_and_log_event @@ -351,7 +455,10 @@ async def incoming_websocket_handler(request): we start processing each event in our queue through the reducers. ''' debug_log("Incoming web socket connected") - ws = aiohttp.web.WebSocketResponse() + ws_max_message_bytes = settings.pmss_settings.incoming_max_websocket_message_bytes( + types=['incoming_events'] + ) + ws = aiohttp.web.WebSocketResponse(max_msg_size=ws_max_message_bytes) await ws.prepare(request) lock_fields = {} authenticated = False @@ -378,7 +485,7 @@ async def process_message_from_ws(): async def update_event_handler(event): '''We need source and auth ready before we can set up the `event_handler` and be ready to process - events + events. ''' if not authenticated: return False @@ -390,6 +497,14 @@ async def update_event_handler(event): else: metadata = event metadata['auth'] = authenticated + + # ---- Initialize the Merkle session now that we know identity ---- + init_session = getattr(decoder_and_logger, 'initialize_session', None) + if init_session: + student = authenticated.get(constants.USER_ID, '') + tool = metadata.get('source', 'unknown') + await init_session(student, tool) + event_handler = await handle_incoming_client_event(metadata=metadata) reducers_last_updated = learning_observer.stream_analytics.LAST_UPDATED return True @@ -427,21 +542,31 @@ async def handle_auth_events(events): del event['auth'] if not authenticated: - authenticated = await learning_observer.auth.events.authenticate( + auth_result = await learning_observer.auth.events.authenticate( request=request, event=event, source='' ) - if authenticated: + if auth_result: + authenticated = auth_result await ws.send_json({ 'status': 'auth', constants.USER_ID: authenticated[constants.USER_ID] }) + # This specific event was the one that provided auth. + # Tag it so we can skip it during backlog flush. + event['_consumed_by_auth'] = True + await update_event_handler(event) backlog.append(event) else: while backlog: prior_event = backlog.pop(0) + # Skip events that were consumed by the auth system. + # Content events that just happened to arrive before + # auth completed are forwarded normally. + if prior_event.get('_consumed_by_auth'): + continue prior_event.update({'auth': authenticated}) yield prior_event event.update({'auth': authenticated}) @@ -463,13 +588,22 @@ async def handle_terminate_events(events): '''Stop processing when a terminate event is received.''' async for event in events: if event.get('event') == 'terminate': - debug_log('Terminate event received; shutting down connection and cleaning up logs.') + debug_log( + 'Terminate event received; shutting down connection ' + 'and cleaning up logs.' + ) handler_close = getattr(event_handler, 'close', None) if callable(handler_close): + # handler_close is a sync function — call directly handler_close() + decoder_close = getattr(decoder_and_logger, 'close', None) if callable(decoder_close): - decoder_close() + # May be async (Merkle) or sync (legacy) — handle both + result = decoder_close() + if asyncio.iscoroutine(result): + await result + await ws.close() return yield event diff --git a/learning_observer/learning_observer/merkle_store.py b/learning_observer/learning_observer/merkle_store.py index 1f5fff60..a5917f91 100644 --- a/learning_observer/learning_observer/merkle_store.py +++ b/learning_observer/learning_observer/merkle_store.py @@ -1,650 +1,2256 @@ ''' -This is a prototype for our log storage system. - -1. We'd like the logical design to scale to millions of users, each generating - millions of events. - - Merkle trees are nice, since the only logical operation is writing a - key/value pair under its hash - - However, we don't quite use this as a back-end representation, since we'd - like to be able to get at streams of events efficiently. That's why we - don't quite use a key-value store -- walking a linked list in a KVS is - slow. -2. We'd like to be able to provide users with their complete data (e.g. it's - not in a million different places). - - "Users" can mean students, schools, etc. - - Requests may come in for complete data, or for a subset of data. -3. We'd like to be able to have users remove or correct their data - - "Users" can have multiple definitions, as per above - - "Data" can mean for a particular document, all data, etc. - - Such a removal should leave a trace that data was removed, but remove - data completely. -4. We'd like to have an archival record of everything that happened, except - for data lost to such removals - - This should be auditable -- e.g. we can't fake data - - The cryptographic properties of the hash tree allow us to audit all - data that was retained. -5. In the future, we'd like an archival record of all processing on top of - data - - Families should be able to audit how their data was processed - - Researchers should be able to review a modern-day equivalent to a - lab notebook - - This should be auditable -- e.g. we can't do a p-value hunt without - leaving a record - -There is a lot of nuance -- which we may not have gotten right yet -- around: - -- What level to expose how much PII at. Removal requests ought to remove - PII, but maintain hashes pointing to the removed data -- Whether and how to break up the data into chunks. Right now, each stream - is a single chunk. We might want to e.g. break up on hourly, daily, or - other boundaries. This doesn't change the logical Merkle tree, but it - change the way we map it to storage. -- Whether and what kind of metadata we want to include in the tree. We can - include events which are in the streams, but not in the Merkle tree itself - (e.g. headers, etc.) -- How to handle logs of computation on data. -- How often to compute a top-level hash to expose to the world. We'd like to - publish a daily hash. From there, anyone who requests data should receive - a chain of hashes which allows them to verify that data is correct, - complete, and not modified. - -Note that this is *not* designed to serve data directly to dashboards. However, -we do want to be able to use the same reducers to do batched processing of -this data for research as we do for dashboards (which process streams in -realtime, and only maintain features). - -It is very much a prototype. To make this not a prototype, we would need to: - -- Make it work with Kafka -- Make it work with asyncio -- Make the file system operations not slow -- Use full-length hashes -- Confirm it's robust -- Escape the file names properly or compute interrim session IDs more - intelligently -- Etc. +Merkle DAG Log Storage System +============================== + +A prototype append-only log storage system that organizes event streams into +a content-addressed Merkle DAG (Directed Acyclic Graph). Every event is +cryptographically chained to its predecessors, making tampering detectable +and providing a verifiable audit trail. + +Overview +-------- +Events arrive in the context of a **session** -- a dict of category→value +pairs (e.g. ``{"student": ["John"], "tool": ["editor"]}``). Each session +is an append-only stream of **items**, where every item contains: + +- The original event payload. +- A list of **children** hashes (the hash of the event content, the hash of + the previous item in the stream, and any additional references). +- A **timestamp**. +- A **node hash** computed over the sorted children list and the timestamp. + +Because each item's hash depends on its predecessor, the final item's hash +is a commitment to the *entire* stream -- the defining property of a Merkle +chain. Modifying, inserting, or removing any item changes the final hash, +which makes tampering evident to anyone who recorded it. + +Session Lifecycle +----------------- +1. **start** -- creates the first item in a new stream. +2. **event_to_session** -- appends an event to the running stream. +3. **close_session** -- appends a ``close`` event, renames the stream to + its final hash (content-addressing), and propagates a + ``child_session_finished`` event to every *parent* category stream. +4. **break_session** (optional) -- closes the current segment and + immediately starts a continuation segment that references the old one. + Useful for bounding segment size or inserting periodic checkpoints. + +Parent Streams / Categories +---------------------------- +When a session is closed, the system automatically notifies *parent* +streams. For example, closing a session ``{"student": ["John"], +"tool": ["editor"]}`` appends a ``child_session_finished`` event to both +the ``{"student": "John"}`` stream and the ``{"tool": "editor"}`` stream. +This lets you walk from any category value down to every session that +involved it -- critical for data-subject access requests (GDPR Article 15) +and deletion requests (Article 17). + +Deletion & Tombstones +--------------------- +``delete_stream_with_tombstone`` removes all event data for a stream but +leaves behind a **tombstone** record that preserves: + +- The stream key and final hash (so references from parent streams still + make structural sense). +- The list of per-item hashes (so an auditor can confirm *which* data was + deleted, even though the data itself is gone). +- A reason string and timestamp. + +This satisfies the "right to erasure" while preserving the Merkle tree's +structural integrity for the data that *was* retained. + +Verification +------------ +``verify_chain`` walks a stream item-by-item and checks: + +1. The event payload's hash appears in the item's children list. +2. The previous item's hash appears in the item's children list (except + for the first item). +3. The node hash matches ``SHA-256(sorted(children) || timestamp)``. + +If any check fails, a ``ValueError`` is raised with a diagnostic message. + +Storage Backends +---------------- +Two backends are provided: + +- **InMemoryStorage** -- dict-of-lists; useful for tests and short-lived + pipelines. +- **FSStorage** -- one file per stream (JSONL format); stream names are + mapped to filenames via SHA-256 to avoid path-traversal issues. + +Both expose an async interface. :class:`InMemoryStorage` methods are +trivially async (no real I/O), while :class:`FSStorage` offloads blocking +file operations to the default executor via +:meth:`asyncio.loop.run_in_executor`. The :class:`StreamStorage` base +class defines the async interface so that Kafka, S3, or database backends +can be added without changing the Merkle logic. + +Visualization +------------- +If ``networkx`` and ``pydot`` are installed, any storage backend can be +exported to a NetworkX ``DiGraph`` or a Graphviz ``pydot.Dot`` object for +visual inspection of the DAG structure. Visualizations include: + +- **Color-coded nodes** by event type (start, close, continue, parent + propagation, normal events, tombstones, deleted placeholders). +- **Typed edges** distinguishing chain links, content-hash references, + cross-stream references, and tombstone→deleted relationships. +- **Stream clustering** via Graphviz subgraphs so that items belonging + to the same stream are visually grouped. +- **Tombstone rendering** with placeholder nodes for deleted items and + octagonal tombstone nodes showing deletion metadata. +- **Legend** explaining the visual encoding. + +Design Goals +------------ +1. **Scalability** -- the only primitive is "append an item whose identity + is its hash", which maps naturally to distributed stores (Kafka topics, + S3 objects, etc.). +2. **Data portability** -- every stream is self-contained; a data-subject + access request can be fulfilled by exporting a bounded set of streams. +3. **Erasure** -- tombstones remove PII while preserving the hash skeleton. +4. **Auditability** -- the cryptographic chain lets any party verify that + retained data has not been modified. Publishing a daily top-level hash + extends this guarantee to third parties. +5. **Reproducibility** -- in the future, computation logs will be chained + into the same DAG so that every analytical result can be traced back to + the data and code that produced it. + +Prototype Caveats +----------------- +This is a working prototype. Production hardening would include: + +- Kafka or equivalent backend for durable, distributed streaming. +- Batched / periodic Merkle root publication. +- Proper stream-name escaping or deterministic session-ID generation. +- Chunk boundaries (hourly, daily, size-based) within long-lived streams. +- Encryption at rest for PII-bearing streams. +- Comprehensive property-based and integration tests. ''' +import asyncio import hashlib import json import datetime -from modulefinder import STORE_GLOBAL +import logging import os -from pickle import STOP +import threading +from typing import Any, Dict, List, Optional, Set, Iterator, Tuple, Union +from dataclasses import dataclass, field +from concurrent.futures import Executor -# These should be abstracted out into a visualization library. -import matplotlib -import networkx -from learning_observer.incoming_student_event import COUNT -import pydot +import learning_observer.settings as settings -from confluent_kafka import Producer, Consumer +try: + import pydot + import networkx + HAS_VIZ = True +except ImportError: + HAS_VIZ = False +logger = logging.getLogger(__name__) -def json_dump(obj): - """ - Dump an object to JSON. +_merkle_engine = None +_merkle_storage = None - Args: - obj (object): The object to dump. - Returns: - str: The JSON dump. +def get_merkle_engine(): + """Return the application-scoped Merkle engine singleton. - This is shorthand so that we have a consistent way to dump objects - each time. We use JSON dumps to index into dictionaries and such. - """ - return json.dumps(obj, sort_keys=True) + Lazily initializes the storage backend and ``Merkle`` engine on + first call. Subsequent calls return the same instance. + The storage class is determined by the ``store`` key in the + ``merkle`` feature-flag config. If the key is absent or the + feature flag is a bare ``true``, falls back to + :data:`DEFAULT_STORE` (in-memory). -def json_load(string): + Returns + ------- + Merkle or None + ``None`` if the ``merkle`` feature flag is not enabled. """ - Load a JSON string. + global _merkle_engine, _merkle_storage - Args: - string (str): The JSON string. + if _merkle_engine is not None: + return _merkle_engine - Returns: - object: The JSON object. + merkle_config = settings.feature_flag('merkle') + if not merkle_config: + return None - We don't really need this, put for symmetry with json_dump and consistency. - """ - return json.loads(string) + # A bare `merkle: true` gives us a bool, not a dict + if not isinstance(merkle_config, dict): + merkle_config = {} + store_name = merkle_config.get('store', DEFAULT_STORE) + if store_name not in STORES: + logger.warning( + "Unknown merkle store %r; falling back to %r", + store_name, + DEFAULT_STORE, + ) + store_name = DEFAULT_STORE + + storage_cls = STORES[store_name] + params = merkle_config.get('params', {}) + if not isinstance(params, dict): + logger.warning( + "Merkle store params should be a dict, got %s; ignoring", + type(params).__name__, + ) + params = {} -COUNT = 0 + _merkle_storage = storage_cls(**params) + _merkle_engine = Merkle(_merkle_storage, CATEGORIES) + logger.info( + "Merkle engine initialized with %s backend (params: %s)", + store_name, + params or '(none)', + ) + return _merkle_engine -def session_key(session): - """ - Return an ID associated with a session. - Such an ID is used before we have a finished session which we can - place into the Merkle DAG +# --------------------------------------------------------------------------- +# Serialization helpers +# --------------------------------------------------------------------------- - Args: - session (dict): The session. +def json_dump(obj: Any) -> str: + '''Canonical JSON serialization (sorted keys, no extra whitespace). - Returns: - str: The session ID. This is not guaranteed to be a string in the - future. + Sorting keys is essential: two dicts with the same content must always + produce the same string so that their hashes agree. - The session ID is currently a JSON dump, with some extra info to prevent - collisions. - """ - global COUNT - COUNT += 1 + Parameters + ---------- + obj : Any + A JSON-serializable Python object. - base = { - 'timestamp': timestamp(), - 'count': COUNT - } - return json_dump(session) + Returns + ------- + str + A compact JSON string with keys in sorted order. + ''' + return json.dumps(obj, sort_keys=True) -# This might turn into a class in the future. For now, we just use the -# session_key -Session = session_key +def json_load(string: str) -> Any: + '''Deserialize a JSON string. + Parameters + ---------- + string : str + A valid JSON string. -def timestamp(): - """ - Return a timestamp string in ISO 8601 format + Returns + ------- + Any + The deserialized Python object. + ''' + return json.loads(string) - Returns: - str: The timestamp string. - The timestamp is in UTC. - """ +# --------------------------------------------------------------------------- +# Thread-safe counter +# --------------------------------------------------------------------------- + +class _AtomicCounter: + '''Monotonically increasing counter safe for concurrent access. + + Used internally to generate unique sequence numbers when needed. + Each call to :meth:`next` returns a value strictly greater than + every previous call, regardless of which thread invokes it. + ''' + + def __init__(self): + self._count = 0 + self._lock = threading.Lock() + + def next(self) -> int: + '''Return the next integer in the sequence.''' + with self._lock: + self._count += 1 + return self._count + + +_counter = _AtomicCounter() + + +# --------------------------------------------------------------------------- +# Hashing +# --------------------------------------------------------------------------- + +# In production this should be ``None`` (no truncation). Set to a small +# integer for human-readable debugging output only. +HASH_TRUNCATE: Optional[int] = None + + +def merkle_hash(*strings: str) -> str: + '''Compute a SHA-256 hex digest of tab-joined input strings. + + The inputs are joined with a tab character (``\\t``) before hashing. + Tabs are forbidden *inside* any input string to guarantee that the + concatenation is unambiguous (i.e. ``hash("a", "b")`` can never + collide with ``hash("a\\tb")``). + + Parameters + ---------- + *strings : str + One or more strings, none of which may contain a tab. + + Returns + ------- + str + A lowercase hex SHA-256 digest, optionally truncated to + :data:`HASH_TRUNCATE` characters for debugging. + + Raises + ------ + ValueError + If any input string contains a tab character. + + Examples + -------- + >>> merkle_hash('hello', 'world') # doctest: +SKIP + 'b0a43a0640...' + ''' + for s in strings: + if '\t' in s: + raise ValueError(f'Input to merkle_hash must not contain tabs: {s!r}') + digest = hashlib.sha256('\t'.join(strings).encode('utf-8')).hexdigest() + if HASH_TRUNCATE is not None: + return digest[:HASH_TRUNCATE] + return digest + + +# --------------------------------------------------------------------------- +# Timestamps +# --------------------------------------------------------------------------- + +def timestamp() -> str: + '''Return the current UTC time as an ISO-8601 string. + + Used as the ``timestamp`` field in every Merkle item. Including + the timestamp in the hash input means that replaying the same event + at a different time produces a different hash, preventing replay + attacks. + + Returns + ------- + str + e.g. ``"2025-01-15T08:30:00.123456"`` + ''' return datetime.datetime.utcnow().isoformat() -def hash(*strings): - """ - Return a hash of the given strings. +# --------------------------------------------------------------------------- +# Session key +# --------------------------------------------------------------------------- - Args: - strings (str): The strings to hash. +def session_key(session: dict) -> str: + '''Compute a deterministic string key for a *live* session dict. - Returns: - str: The hash of the given strings. + The key is the canonical JSON serialization of ``session``. This + guarantees that two calls with the same logical session always map + to the same underlying stream, while different sessions never + collide (assuming no hash truncation). - The strings should not contain tabs. - """ - return hashlib.sha1('\t'.join(strings).encode('utf-8')).hexdigest()[:8] + Once a session is **closed**, its stream is renamed from this key to + the stream's final content hash. + Parameters + ---------- + session : dict + The session descriptor, e.g. + ``{"student": ["Alice"], "tool": ["editor"]}``. -class Merkle: - def __init__(self, storage, categories): - ''' - Initialize the merkle DAG. + Returns + ------- + str + The canonical JSON string of *session*. + ''' + return json_dump(session) - `categories` is a list of categories by which we might - want to index into events - ''' + +# --------------------------------------------------------------------------- +# Visualization helpers +# --------------------------------------------------------------------------- + +def _item_display_label(item: dict, stream_name: str = '') -> str: + '''Build a multi-line label for a Merkle item node. + + Includes the event type (if present), a truncated hash, the + timestamp, and the stream name (truncated). + + Parameters + ---------- + item : dict + A standard Merkle item with ``hash``, ``children``, ``event``, + and ``timestamp`` fields. + stream_name : str, optional + The stream key this item belongs to. + + Returns + ------- + str + A newline-separated label string for Graphviz rendering. + ''' + lines = [] + + # Event type / label + event = item.get('event', {}) + if isinstance(event, dict): + etype = event.get('type', '') + if etype: + lines.append(etype.upper()) + if item.get('label') and (not lines or lines[0].lower() != item['label'].lower()): + lines.append(item['label']) + + # Truncated hash + h = item.get('hash', '?') + lines.append(f'hash: {h[:12]}...') + + # Timestamp (just time portion if available) + ts = item.get('timestamp', '') + if 'T' in ts: + lines.append(f'ts: {ts.split("T")[1][:12]}') + elif ts: + lines.append(f'ts: {ts[:16]}') + + # Stream context + if stream_name: + display_stream = stream_name if len(stream_name) <= 30 else stream_name[:27] + '...' + lines.append(f'stream: {display_stream}') + + # Child count + n_children = len(item.get('children', [])) + lines.append(f'children: {n_children}') + + return '\\n'.join(lines) + + +def _tombstone_display_label(tombstone: dict) -> str: + '''Build a multi-line label for a tombstone node. + + Parameters + ---------- + tombstone : dict + A tombstone record with ``deleted_stream``, ``final_hash``, + ``item_count``, ``reason``, ``timestamp``, and + ``tombstone_hash`` fields. + + Returns + ------- + str + A newline-separated label string for Graphviz rendering. + ''' + lines = [ + 'TOMBSTONE', + f'stream: {tombstone.get("deleted_stream", "?")[:20]}...', + f'final: {tombstone.get("final_hash", "?")[:12]}...', + f'items deleted: {tombstone.get("item_count", "?")}', + f'reason: {tombstone.get("reason", "?")}', + ] + ts = tombstone.get('timestamp', '') + if ts: + lines.append(f'deleted: {ts[:19]}') + th = tombstone.get('tombstone_hash', '?') + lines.append(f'tombstone_hash: {th[:12]}...') + return '\\n'.join(lines) + + +# Node style constants for Graphviz +_STYLE_NORMAL = { + 'shape': 'box', + 'style': 'filled', + 'fillcolor': '#E8F4FD', + 'fontname': 'Courier', + 'fontsize': '9', +} +_STYLE_START = { + 'shape': 'box', + 'style': 'filled,bold', + 'fillcolor': '#C8E6C9', + 'fontname': 'Courier', + 'fontsize': '9', +} +_STYLE_CLOSE = { + 'shape': 'box', + 'style': 'filled,bold', + 'fillcolor': '#FFCDD2', + 'fontname': 'Courier', + 'fontsize': '9', +} +_STYLE_CONTINUE = { + 'shape': 'box', + 'style': 'filled,dashed', + 'fillcolor': '#FFF9C4', + 'fontname': 'Courier', + 'fontsize': '9', +} +_STYLE_PARENT_EVENT = { + 'shape': 'box', + 'style': 'filled', + 'fillcolor': '#E1BEE7', + 'fontname': 'Courier', + 'fontsize': '9', +} +_STYLE_TOMBSTONE = { + 'shape': 'octagon', + 'style': 'filled,bold', + 'fillcolor': '#FFAB91', + 'fontname': 'Courier', + 'fontsize': '9', + 'penwidth': '2', +} +_STYLE_DELETED_PLACEHOLDER = { + 'shape': 'box', + 'style': 'dashed', + 'fillcolor': '#F5F5F5', + 'fontname': 'Courier', + 'fontsize': '8', + 'fontcolor': '#999999', + 'color': '#CCCCCC', +} +_EDGE_CHAIN = { + 'color': '#1565C0', + 'penwidth': '1.5', +} +_EDGE_CONTENT = { + 'color': '#999999', + 'style': 'dotted', + 'penwidth': '0.8', +} +_EDGE_CROSS_REF = { + 'color': '#E65100', + 'style': 'dashed', + 'penwidth': '1.2', +} +_EDGE_TOMBSTONE = { + 'color': '#D32F2F', + 'style': 'bold', + 'penwidth': '2', +} + + +def _classify_item(item: dict) -> str: + '''Return a style classification string for an item. + + Parameters + ---------- + item : dict + A standard Merkle item. + + Returns + ------- + str + One of ``"start"``, ``"close"``, ``"continue"``, + ``"parent_event"``, ``"normal"``. + ''' + event = item.get('event', {}) + if not isinstance(event, dict): + return 'normal' + etype = event.get('type', '') + if etype == 'start': + return 'start' + if etype == 'continue': + return 'continue' + if etype == 'close': + return 'close' + if etype == 'child_session_finished': + return 'parent_event' + return 'normal' + + +def _style_for_classification(classification: str) -> dict: + '''Return the Graphviz attribute dict for a classification. + + Parameters + ---------- + classification : str + One of the strings returned by :func:`_classify_item`. + + Returns + ------- + dict + Graphviz node attributes. + ''' + return { + 'start': _STYLE_START, + 'close': _STYLE_CLOSE, + 'continue': _STYLE_CONTINUE, + 'parent_event': _STYLE_PARENT_EVENT, + 'normal': _STYLE_NORMAL, + }.get(classification, _STYLE_NORMAL) + + +# --------------------------------------------------------------------------- +# Merkle DAG +# --------------------------------------------------------------------------- + +class Merkle: + '''Core async Merkle DAG engine. + + Manages append-only event streams, session lifecycle, chain + verification, and tombstone deletion. All public methods are + coroutines so they integrate naturally with an ``asyncio`` event + loop. + + Parameters + ---------- + storage : StreamStorage + The backend that persists streams. All storage methods are + async. + categories : set of str + The set of category keys (e.g. ``{"student", "teacher"}``) that + trigger parent-stream propagation when a session is closed. + + Attributes + ---------- + storage : StreamStorage + categories : set of str + + Notes + ----- + Each session should be owned by a single writer. If two + coroutines concurrently append to the *same* session, the chain + linkage may be inconsistent. + ''' + + def __init__(self, storage: 'StreamStorage', categories: Set[str]): self.storage = storage self.categories = categories - # These are generic to interact with the Merkle DAG - def event_to_session(self, event, session, children=None, label=None): - ''' - Append an event to the merkle tree. - - There are two possibilities here: - - 1. We have a closure and we're updating the SHA hash with each event. - 2. We don't have a closure and we're placing the individual events into the Merkle DAG. - - We went with the second option. This makes events into the leaf, - nodes, whereas the first option makes sessions into the leaf - nodes. - - This uses a little bit more space, but it's easier to reason about, - and potentially minimizes some ad-hoc decisions, such as where to - put boundaries between long-running sessions. - - We might still want a closure, so we don't need to read back the - last item in the stream. Or perhaps we want both (with one calling - the other), using a closure for rapid events and a call like this - one for rare ones. - - Args: - event (dict): The event to append. - session (dict): The session to append to. This should specify - a set of categories, and map those to lists of associated - IDs. For example, "teacher": ["teacher1", "teacher2"] - children (list): Additional children of this event, beyond the - current item and the past event. - label (str): An optional human-friendly label for this event. This - should NOT be relied on programmatically, or to be unique. It's - just for human consumption, e.g. when making visualizations. - - Returns: - dict: The event envelope, with the session updated, and the - hash computed. + # ---- core append --------------------------------------------------- + + async def event_to_session( + self, + event: dict, + session: dict, + children: Optional[List[str]] = None, + label: Optional[str] = None, + ) -> dict: + '''Append *event* to the Merkle chain for *session*. + + Constructs a new **item** whose ``hash`` field is a commitment + to: + + - The SHA-256 of the event payload (ensures content integrity). + - The hash of the previous item in the stream (ensures ordering + and completeness -- no item can be removed or reordered + without changing downstream hashes). + - Any additional child hashes passed by the caller (e.g. + cross-references to other streams). + - The UTC timestamp (ensures temporal ordering and prevents + replay). + + The item is appended to the stream identified by + ``session_key(session)``. + + Parameters + ---------- + event : dict + Arbitrary JSON-serializable event payload. + session : dict + The session descriptor that identifies the target stream. + children : list of str, optional + Extra child hashes to include (e.g. references to other + streams). The event hash and previous-item hash are added + automatically. + label : str, optional + A human-readable label stored on the item for visualization + purposes. Does **not** affect the hash. + + Returns + ------- + dict + The persisted item envelope with keys ``hash``, + ``children``, ``timestamp``, ``event``, and optionally + ``label``. ''' - # reverse() so we add children from the parameters to the end of the list - # of children. This isn't strictly necessary, but it is a little bit - # nicer to look at manually. We could remove the pair of reverse() calls, - # since this is an unordered list, if this ever becomes a problem. if children is None: - children = list() - children.reverse() + children = [] + else: + children = list(children) # don't mutate caller's list + storage = self.storage - session_id = session_key(session) + sid = session_key(session) ts = timestamp() - event_hash = hash(json_dump(event)) - node_hash = hash(*children, ts) - - last_hash = None - last_item = storage._most_recent_item(session_id) + # 1. Hash the event payload itself + event_hash = merkle_hash(json_dump(event)) + children.append(event_hash) + # 2. Link to the previous item in this stream (if any) + last_item = await storage.most_recent_item(sid) if last_item is not None: - last_hash = last_item['hash'] - children.append(last_hash) + children.append(last_item['hash']) - children.append(event_hash) - - children.reverse() - print(children) + # 3. Compute node hash AFTER children are fully assembled + node_hash = merkle_hash(*sorted(children), ts) item = { - 'children': children, # Points to the full chain / children - 'hash': node_hash, # Current node - 'timestamp': ts, # Timestamp - 'event': event + 'children': children, + 'hash': node_hash, + 'timestamp': ts, + 'event': event, } if label is not None: item['label'] = label - storage._append_to_stream(session_id, item) - print(item['hash']) - return item - - def start(self, session, metadata=None, continue_session=False): - ''' - Start a new session. - Args: - session (dict): The session to start. - metadata (dict): Optional metadata to attach to the session. + await storage.append_to_stream(sid, item) + return item - Returns: - dict: The session envelope + # ---- session lifecycle --------------------------------------------- + + async def start( + self, + session: dict, + metadata: Optional[dict] = None, + continuation_hash: Optional[str] = None, + ) -> dict: + '''Open a new session stream (or continue one after a break). + + Creates a ``start`` (or ``continue``) event as the first item + in a new stream. If *continuation_hash* is provided, the new + stream includes it as a child, creating a cross-segment link + in the DAG. + + Parameters + ---------- + session : dict + Session descriptor. + metadata : dict, optional + Arbitrary metadata to include in the start event (e.g. + client version, IP, request headers). + continuation_hash : str, optional + The final hash of a preceding segment. When set, the + event type is ``continue`` instead of ``start``, and the + hash is recorded in the ``continues`` field and as a child. + + Returns + ------- + dict + The persisted start/continue item. ''' - if not continue_session: - event = { - 'type': 'start', - 'session': session - # Perhaps we want to add a category here? E.g. 'session_event_stream' for the raw streams - # and something else to indicate parents? - } - else: - raise NotImplementedError('Continuing sessions not implemented') + event: Dict[str, Any] = {'type': 'start', 'session': session} if metadata is not None: event['metadata'] = metadata - return self.event_to_session( - event, - session, - label='start' - ) - - def close_session(self, session, logical_break=False): - ''' - Close the session. We update up-stream nodes with the session's - merkle leaf. and if necessary, we update the session's key / - topic / alias with the hash of the full chain. + if continuation_hash is not None: + event['type'] = 'continue' + event['continues'] = continuation_hash + + extra_children = [continuation_hash] if continuation_hash else [] + return await self.event_to_session(event, session, + children=extra_children, label='start') + + async def close_session( + self, + session: dict, + logical_break: bool = False, + ) -> str: + '''Close *session* and finalize its stream. + + Appends a ``close`` event, renames the stream from its + session-key to its **final content hash** (content-addressing), + and -- unless *logical_break* is True -- propagates a + ``child_session_finished`` event to every parent category + stream. + + Parent propagation means that closing + ``{"student": ["Alice"], "tool": ["editor"]}`` will append an + event to both the ``{"student": "Alice"}`` and + ``{"tool": "editor"}`` long-lived streams, recording the + child session's hash. + + Parameters + ---------- + session : dict + The session descriptor to close. Must match the descriptor + used in :meth:`start`. + logical_break : bool, optional + If True, rename the stream but **do not** notify parent + streams. Used internally by :meth:`break_session`. + + Returns + ------- + str + The final hash of the closed session stream. This is the + stream's new key in storage. ''' - final_item = self.event_to_session( + final_item = await self.event_to_session( {'type': 'close', 'session': session}, session, - label='close' + label='close', ) session_hash = final_item['hash'] - self.storage._rename_or_alias_stream(session_key(session), session_hash) - if len(session) < 1: - raise Exception('Session is empty') - if len(session) == 1: - print("Parent session") - print("These sessions shouldn't be closed") - return - - # We need to update the parents to point to this session. - # We don't do this if we're only introducing a logical break, - # since the session continues on. - if not logical_break: - for key in session: - if key not in self.categories: - print("Something is wrong. Session has unexpected key: {}".format(key)) - for item in session[key]: - parent_session = {key: item} - self.event_to_session( - { - 'type': 'child_session_finished', - 'session': session_hash # This should go into children, maybe?, - }, - parent_session, - children=[session_hash], - label=f'{key}' - ) + await self.storage.rename_or_alias_stream(session_key(session), session_hash) + + if logical_break: + return session_hash + + # Propagate to parent (single-category) streams + for key in session: + if key not in self.categories: + continue + values = session[key] + if not isinstance(values, list): + values = [values] + for value in values: + parent_session = {key: value} + await self.event_to_session( + { + 'type': 'child_session_finished', + 'child_hash': session_hash, + 'child_session': session, + }, + parent_session, + children=[session_hash], + label=f'{key}:{value}', + ) return session_hash - def break_session(self, session): - ''' - Split a session into two parts. This has no logical effect on the data structure, - but creates a split so that a portion of the data can be accessed under it's own - key. Logically, keys can either be part of the event envelope, or they can be the - key / topic / filename of the session. + async def break_session(self, session: dict) -> str: + '''Insert a logical break in *session*. - It may make sense to do this e.g. daily to break up long-running sessions. This - stub is a proof of concept. + Closes the current segment (renaming it to its final hash) and + immediately starts a new continuation segment that references + the old one. Useful for: - Note that we do not create ANY new keys here, since we should be able to break - a session into multiple parts, or recombine them, without breaking the logical - structure. - ''' - session_hash = self.close_session(session, logical_break=True) - self.start(session, continue_session={ - 'type': 'continue', - 'session': session_hash - }) - return session_hash + - Bounding segment size for long-running sessions. + - Creating periodic (e.g. hourly) checkpoints. + - Enabling partial verification without downloading the full + stream. + Parameters + ---------- + session : dict + The session descriptor. -class StreamStorage: - def _append_to_stream(self, stream, item): + Returns + ------- + str + The hash of the closed segment. + ''' + segment_hash = await self.close_session(session, logical_break=True) + await self.start(session, continuation_hash=segment_hash) + return segment_hash + + # ---- verification -------------------------------------------------- + + async def verify_chain(self, stream_key: str) -> bool: + '''Verify the integrity of every item in a stream. + + Walks the stream front-to-back and checks three invariants for + each item: + + 1. **Event inclusion** -- the SHA-256 of the item's ``event`` + payload appears in its ``children`` list. + 2. **Chain linkage** -- the previous item's ``hash`` appears in + the current item's ``children`` list (skipped for the first + item). + 3. **Hash correctness** -- the item's ``hash`` equals + ``SHA-256(sorted(children) || timestamp)``. + + Any violation raises ``ValueError`` with a diagnostic message. + + Parameters + ---------- + stream_key : str + The key (typically the final hash) of the stream to verify. + + Returns + ------- + bool + ``True`` if the entire chain is valid. + + Raises + ------ + ValueError + If the stream is not found, is empty, or any item fails + verification. ''' - Append an item to a stream. + data = await self.storage.get_stream_data(stream_key) + if not data: + raise ValueError(f'Stream {stream_key!r} not found or empty') + + prev_hash: Optional[str] = None + for i, item in enumerate(data): + event_hash = merkle_hash(json_dump(item['event'])) + if event_hash not in item['children']: + raise ValueError( + f'Item {i}: event hash {event_hash} not in children' + ) + if prev_hash is not None and prev_hash not in item['children']: + raise ValueError( + f'Item {i}: previous hash {prev_hash} not in children' + ) + expected = merkle_hash(*sorted(item['children']), item['timestamp']) + if item['hash'] != expected: + raise ValueError( + f'Item {i}: hash mismatch (expected {expected}, got {item["hash"]})' + ) + prev_hash = item['hash'] + return True + + # ---- deletion with tombstone --------------------------------------- + + async def delete_stream_with_tombstone(self, stream_key: str, reason: str) -> dict: + '''Delete a stream's data and leave a cryptographic tombstone. + + The tombstone preserves the stream's structural metadata -- + its key, final hash, the ordered list of per-item hashes, and + the item count -- so that: + + - Parent streams still have a valid ``child_hash`` reference + (it now points to a tombstone instead of data). + - Auditors can confirm *what* was deleted and *when*, without + being able to recover the deleted content. + + The tombstone itself is hashed and stored under the key + ``__tombstone__``. + + Parameters + ---------- + stream_key : str + The key of the stream to delete (typically its final hash). + reason : str + A human-readable reason for deletion, e.g. + ``"GDPR Article 17 erasure request"``. + + Returns + ------- + dict + The tombstone record, including its own + ``tombstone_hash``. + + Raises + ------ + ValueError + If the stream does not exist or is already empty. ''' + data = await self.storage.get_stream_data(stream_key) + if not data: + raise ValueError(f'Stream {stream_key!r} not found or empty') + + final_hash = data[-1]['hash'] + all_hashes = [item['hash'] for item in data] + + tombstone = { + 'type': 'tombstone', + 'deleted_stream': stream_key, + 'final_hash': final_hash, + 'item_hashes': all_hashes, + 'item_count': len(data), + 'reason': reason, + 'timestamp': timestamp(), + } + tombstone['tombstone_hash'] = merkle_hash(json_dump(tombstone)) + + await self.storage.delete_stream(stream_key) + await self.storage.append_to_stream( + f'__tombstone__{stream_key}', tombstone + ) + return tombstone + + +# --------------------------------------------------------------------------- +# Storage backends +# --------------------------------------------------------------------------- + +class StreamStorage: + '''Abstract base class for async stream storage backends. + + A **stream** is an ordered list of JSON-serializable dicts + (items) identified by a string key. Backends must implement the + following async operations: + + - ``append_to_stream(stream, item)`` -- append one item. + - ``rename_or_alias_stream(stream, alias)`` -- rename the stream + key (used when a session is closed and the stream is + content-addressed). + - ``get_stream_data(stream)`` -- return the full list of items, + or ``None`` if the stream does not exist. + - ``delete_stream(stream)`` -- remove the stream entirely. + - ``most_recent_item(stream)`` -- return the last appended item, + or ``None``. + - ``walk()`` -- async-iterate over every item in every stream (used + for visualization and bulk export). + - ``walk_streams()`` -- async-iterate over ``(stream_key, items_list)`` + pairs (used for stream-aware visualization). + + All mutating operations must be **safe for concurrent awaits**. + ''' + + async def append_to_stream(self, stream: str, item: dict): + '''Append *item* to the end of *stream*, creating it if needed.''' raise NotImplementedError - def _rename_or_alias_stream(self, stream, alias): - ''' - Rename a stream. - ''' + async def rename_or_alias_stream(self, stream: str, alias: str): + '''Rename *stream* to *alias*. If they are equal, no-op.''' raise NotImplementedError - def _get_stream_data(self, stream): - ''' - Get the stream. + async def get_stream_data(self, stream: str) -> Optional[List[dict]]: + '''Return all items in *stream*, or ``None`` if it does not exist. + + An existing but empty stream should return ``[]``. ''' raise NotImplementedError - def _delete_stream(self, sha_key): - ''' - Delete a stream. + async def delete_stream(self, stream: str): + '''Remove *stream* and all its items. No-op if absent.''' + raise NotImplementedError - Mostly for right-to-be-forgotten requests - ''' + async def most_recent_item(self, stream: str) -> Optional[dict]: + '''Return the last item in *stream*, or ``None`` if empty/absent.''' raise NotImplementedError - def _most_recent_item(self, stream): - ''' - Get the most recent item in a stream. + async def walk(self) -> List[dict]: + '''Return every item in every stream (arbitrary order). + + Returns a list rather than an async iterator for simplicity. ''' raise NotImplementedError - def _walk(self): - ''' - Walk the DAG. This is used for debugging. + async def walk_streams(self) -> List[Tuple[str, List[dict]]]: + '''Return ``(stream_key, items_list)`` for every stream. + + Returns a list rather than an async iterator for simplicity. + + Returns + ------- + list of (str, list of dict) + Each tuple contains the stream key and the full list of + items (or tombstone records) in that stream. ''' raise NotImplementedError - def _make_label(self, item): - ''' - Make a label for an item. + # ---- internal helpers for visualization ---------------------------- + + def _is_tombstone(self, item: dict) -> bool: + '''Check whether *item* is a tombstone record (not a standard Merkle item). + + Tombstones have ``type == "tombstone"`` at the top level, whereas + standard Merkle items have their ``type`` nested inside the ``event`` + dict. + + Parameters + ---------- + item : dict + An item or tombstone from a stream. - This is cosmetic, when rendering the graph. + Returns + ------- + bool ''' - if 'label' in item and item['label'] is not None: - return item['label'] - print(item) - if 'session' in item and len(item['session']) == 1: - return "-".join(item['session'].items()[0]) - return item['hash'][:4] + return item.get('type') == 'tombstone' + + async def _collect_all_items(self) -> Tuple[Dict[str, List[dict]], Dict[str, List[dict]]]: + '''Collect all items and tombstones, grouped by stream. - def to_networkx(self): + Returns + ------- + tuple of (items_by_stream, tombstones_by_stream) + ``items_by_stream`` maps stream_key -> list of normal Merkle items. + ``tombstones_by_stream`` maps stream_key -> list of tombstone dicts. ''' - Convert the DAG to a network. + items_by_stream: Dict[str, List[dict]] = {} + tombstones_by_stream: Dict[str, List[dict]] = {} + + for stream_key, items in await self.walk_streams(): + for item in items: + if self._is_tombstone(item): + tombstones_by_stream.setdefault(stream_key, []).append(item) + else: + items_by_stream.setdefault(stream_key, []).append(item) + + return items_by_stream, tombstones_by_stream + + # ---- convenience / visualization ----------------------------------- + + def _make_label(self, item: dict, stream_name: str = '') -> str: + '''Derive a short human-readable label for *item*. + + Priority: - This is used for testing, experimentation, and demonstration. It - would never scale with real data. + 1. The explicit ``label`` field, if present. + 2. A ``category:value`` string if the event contains a + single-key session dict. + 3. The first 8 hex characters of the item hash. + + Parameters + ---------- + item : dict + A standard Merkle item. + stream_name : str, optional + The stream key (unused in the short label but accepted for + API consistency with :func:`_item_display_label`). + + Returns + ------- + str + ''' + if item.get('label'): + return item['label'] + event = item.get('event', {}) + if isinstance(event, dict) and 'session' in event and isinstance(event['session'], dict): + keys = list(event['session'].keys()) + if len(keys) == 1: + k = keys[0] + return f'{k}:{event["session"][k]}' + return item.get('hash', '?')[:8] + + async def to_networkx(self): + '''Export the entire DAG as a :class:`networkx.DiGraph`. + + Each Merkle item becomes a node keyed by its hash, with + attributes for ``label``, ``short_label``, ``stream``, + ``event_type``, ``classification``, ``timestamp``, and + ``tombstone`` (bool). + + Tombstones are included as nodes keyed by their + ``tombstone_hash``, with ``tombstone=True`` and additional + attributes ``deleted_stream``, ``reason``, and ``item_count``. + + Edges carry an ``edge_type`` attribute: + + - ``"chain"`` -- link to the previous item in the same stream. + - ``"content"`` -- link to the event-payload hash (a virtual + node representing the raw content). + - ``"cross_ref"`` -- any other child reference (e.g. to a + closed session from a parent stream). + - ``"tombstone"`` -- from a tombstone to each of the deleted + item hashes it records. + + Returns + ------- + networkx.DiGraph + + Raises + ------ + ImportError + If ``networkx`` or ``pydot`` are not installed. ''' + if not HAS_VIZ: + raise ImportError('networkx/pydot not installed') + G = networkx.DiGraph() - for item in self._walk(): - print(item) - G.add_node(item['hash'], label=self._make_label(item)) - if 'children' in item: - for child in item['children']: - G.add_edge(item['hash'], child) + items_by_stream, tombstones_by_stream = await self._collect_all_items() + + # All known item hashes (so we can distinguish chain vs cross-ref edges) + all_item_hashes: Set[str] = set() + for items in items_by_stream.values(): + for item in items: + all_item_hashes.add(item['hash']) + + # Add normal item nodes and edges + for stream_key, items in items_by_stream.items(): + prev_hash = None + for item in items: + classification = _classify_item(item) + event = item.get('event', {}) + event_type = event.get('type', '') if isinstance(event, dict) else '' + + G.add_node( + item['hash'], + label=_item_display_label(item, stream_key), + short_label=self._make_label(item, stream_key), + stream=stream_key, + event_type=event_type, + classification=classification, + timestamp=item.get('timestamp', ''), + tombstone=False, + ) + + # Classify edges + event_hash = merkle_hash(json_dump(item['event'])) + for child in item.get('children', []): + if child == prev_hash: + G.add_edge(item['hash'], child, edge_type='chain') + elif child == event_hash: + # Content hash -- may not correspond to a real item node + if child not in G: + G.add_node( + child, + label=f'content\\n{child[:12]}...', + short_label=child[:8], + classification='content_hash', + tombstone=False, + ) + G.add_edge(item['hash'], child, edge_type='content') + else: + G.add_edge(item['hash'], child, edge_type='cross_ref') + + prev_hash = item['hash'] + + # Add tombstone nodes and edges + for stream_key, tombstones in tombstones_by_stream.items(): + for tombstone in tombstones: + t_hash = tombstone.get('tombstone_hash', f'tombstone_{id(tombstone)}') + G.add_node( + t_hash, + label=_tombstone_display_label(tombstone), + short_label=f'tombstone {t_hash[:8]}', + stream=stream_key, + classification='tombstone', + tombstone=True, + deleted_stream=tombstone.get('deleted_stream', ''), + reason=tombstone.get('reason', ''), + item_count=tombstone.get('item_count', 0), + ) + + # Edges to the hashes that were deleted + for deleted_hash in tombstone.get('item_hashes', []): + if deleted_hash not in G: + # Add placeholder for the deleted node + G.add_node( + deleted_hash, + label=f'[deleted]\\n{deleted_hash[:12]}...', + short_label=f'del {deleted_hash[:8]}', + classification='deleted', + tombstone=False, + ) + G.add_edge(t_hash, deleted_hash, edge_type='tombstone') + + # Edge from tombstone to the final hash if not already a node + final_hash = tombstone.get('final_hash', '') + if final_hash and final_hash not in G: + G.add_node( + final_hash, + label=f'[deleted final]\\n{final_hash[:12]}...', + short_label=f'del {final_hash[:8]}', + classification='deleted', + tombstone=False, + ) + return G - def to_graphviz(self): - ''' - Convert the DAG to a graphviz graph. + async def to_graphviz(self): + '''Export the entire DAG as a styled :class:`pydot.Dot` Graphviz graph. - This is used for testing, experimentation, and demonstration. It - would never scale with real data. - ''' - G = pydot.Dot(graph_type='digraph') - for item in self._walk(): - node = pydot.Node(item['hash'], label=self._make_label(item)) - G.add_node(node) - for item in self._walk(): - if 'children' in item: - for child in item['children']: - edge = pydot.Edge(item['hash'], child) - G.add_edge(edge) - return G + Nodes are color-coded by type: + - **Green** -- ``start`` / ``continue`` events. + - **Red (light)** -- ``close`` events. + - **Yellow (light)** -- ``continue`` events. + - **Purple** -- ``child_session_finished`` (parent propagation). + - **Blue (light)** -- normal events. + - **Orange** (octagon) -- tombstones. + - **Grey** (dashed) -- deleted-item placeholders. -class KafkaStorage(StreamStorage): - """ - A Merkle DAG implementation that uses Kafka as a backing store. + Edges are styled by relationship: - Very little of this is built. - """ - def __init__(self): - super().__init__() - raise NotImplementedError - self.producer = Producer() - self.consumer = Consumer() + - **Blue solid** -- chain link (prev -> next within a stream). + - **Grey dotted** -- content hash reference. + - **Orange dashed** -- cross-stream reference. + - **Red bold** -- tombstone -> deleted item. - def _append_to_stream(self, stream, item): - raise NotImplementedError - self.producer.produce(stream, json_dump(item)) + Items belonging to the same stream are grouped into Graphviz + ``cluster`` subgraphs with distinct background colors. - def _rename_or_alias_stream(self, stream, alias): - ''' - Rename a stream. We can't do this directly, so we create a new stream under the name `alias` - and then delete the old stream. - ''' - raise NotImplementedError - for item in self._get_stream_data(stream): - self._append_to_stream(alias, item) - self._delete_stream(stream) + A legend cluster is included to explain the visual encoding. - def _get_stream_data(self, stream): - raise NotImplementedError + Can be rendered to PNG, SVG, PDF, etc. via:: - def _delete_stream(self, sha_key): - ''' - Delete the Kafka topic for the stream. + dot = await storage.to_graphviz() + dot.write_png('merkle_dag.png') + + Returns + ------- + pydot.Dot + + Raises + ------ + ImportError + If ``pydot`` is not installed. ''' - self.producer.delete_topic(sha_key) + if not HAS_VIZ: + raise ImportError('pydot not installed') + + G = pydot.Dot( + graph_type='digraph', + rankdir='TB', + bgcolor='white', + fontname='Courier', + label='Merkle DAG Log', + labelloc='t', + fontsize='14', + ) + G.set_node_defaults(fontname='Courier', fontsize='9') + + items_by_stream, tombstones_by_stream = await self._collect_all_items() + + # Track all item hashes for edge classification + all_item_hashes: Set[str] = set() + for items in items_by_stream.values(): + for item in items: + all_item_hashes.add(item['hash']) + + # Group items into subgraphs by stream for visual clustering + stream_colors = [ + '#E3F2FD', '#F3E5F5', '#E8F5E9', '#FFF8E1', + '#FCE4EC', '#E0F7FA', '#FBE9E7', '#F1F8E9', + ] + color_idx = 0 + + for stream_key, items in items_by_stream.items(): + # Create a subgraph cluster for this stream + cluster_name = f'cluster_{hashlib.sha256(stream_key.encode()).hexdigest()[:12]}' + display_key = stream_key if len(stream_key) <= 40 else stream_key[:37] + '...' + subgraph = pydot.Cluster( + cluster_name, + label=display_key, + style='rounded,filled', + fillcolor=stream_colors[color_idx % len(stream_colors)], + color='#BBBBBB', + fontname='Courier', + fontsize='8', + ) + color_idx += 1 + + prev_hash = None + for item in items: + classification = _classify_item(item) + style_attrs = _style_for_classification(classification) + label = _item_display_label(item, stream_key) + + node = pydot.Node(item['hash'], label=label, **style_attrs) + subgraph.add_node(node) + + # Classify and add edges + event_hash = merkle_hash(json_dump(item['event'])) + for child in item.get('children', []): + if child == prev_hash: + G.add_edge(pydot.Edge( + item['hash'], child, **_EDGE_CHAIN + )) + elif child == event_hash: + # Only add content-hash nodes if they aren't already + # a real item (avoids duplicate nodes) + if child not in all_item_hashes: + content_node = pydot.Node( + child, + label=f'content\\n{child[:12]}...', + shape='ellipse', + style='dashed', + fillcolor='#FAFAFA', + fontname='Courier', + fontsize='7', + fontcolor='#999999', + color='#CCCCCC', + ) + G.add_node(content_node) + G.add_edge(pydot.Edge( + item['hash'], child, **_EDGE_CONTENT + )) + else: + G.add_edge(pydot.Edge( + item['hash'], child, **_EDGE_CROSS_REF + )) + + prev_hash = item['hash'] + + G.add_subgraph(subgraph) + + # Add tombstone nodes (outside stream clusters since the stream is deleted) + for stream_key, tombstones in tombstones_by_stream.items(): + for tombstone in tombstones: + t_hash = tombstone.get('tombstone_hash', f'tombstone_{id(tombstone)}') + label = _tombstone_display_label(tombstone) + + tomb_node = pydot.Node(t_hash, label=label, **_STYLE_TOMBSTONE) + G.add_node(tomb_node) + + # Collect all item_hashes from this tombstone for dedup + tombstone_item_hashes = set(tombstone.get('item_hashes', [])) + + # Add placeholder nodes for each deleted item hash + for deleted_hash in tombstone.get('item_hashes', []): + if deleted_hash not in all_item_hashes: + placeholder = pydot.Node( + deleted_hash, + label=f'[deleted]\\n{deleted_hash[:12]}...', + **_STYLE_DELETED_PLACEHOLDER, + ) + G.add_node(placeholder) + G.add_edge(pydot.Edge( + t_hash, deleted_hash, **_EDGE_TOMBSTONE + )) + + # Also link to final hash if it's not already a node + final_hash = tombstone.get('final_hash', '') + if final_hash and final_hash not in all_item_hashes: + if final_hash not in tombstone_item_hashes: + placeholder = pydot.Node( + final_hash, + label=f'[deleted final]\\n{final_hash[:12]}...', + **_STYLE_DELETED_PLACEHOLDER, + ) + G.add_node(placeholder) + + # Add a legend + legend = pydot.Cluster( + 'legend', + label='Legend', + style='rounded', + color='#CCCCCC', + fontname='Courier', + fontsize='9', + ) + legend_items = [ + ('legend_start', 'START', _STYLE_START), + ('legend_close', 'CLOSE', _STYLE_CLOSE), + ('legend_continue', 'CONTINUE', _STYLE_CONTINUE), + ('legend_normal', 'EVENT', _STYLE_NORMAL), + ('legend_parent', 'PARENT UPDATE', _STYLE_PARENT_EVENT), + ('legend_tombstone', 'TOMBSTONE', _STYLE_TOMBSTONE), + ('legend_deleted', '[DELETED]', _STYLE_DELETED_PLACEHOLDER), + ] + for node_id, label, style in legend_items: + legend.add_node(pydot.Node(node_id, label=label, **style)) + G.add_subgraph(legend) - def _most_recent_item(self, stream): - raise NotImplementedError + return G - def _walk(self): - raise NotImplementedError + +class InMemoryStorage(StreamStorage): + '''In-memory storage backend backed by a ``dict[str, list[dict]]``. + + Suitable for tests, short-lived pipelines, and demonstrations. + All data is lost when the process exits. + + Since all operations are in-memory and non-blocking, the async + methods are simple coroutines that return immediately. An + ``asyncio.Lock`` is used to serialize concurrent access within the + same event loop. + ''' + + def __init__(self): + super().__init__() + self._store: Dict[str, List[dict]] = {} + self._lock = asyncio.Lock() + + async def append_to_stream(self, stream, item): + async with self._lock: + self._store.setdefault(stream, []).append(item) + + async def rename_or_alias_stream(self, stream, alias): + async with self._lock: + if alias == stream: + return + self._store[alias] = self._store.pop(stream) + + async def get_stream_data(self, stream): + async with self._lock: + if stream not in self._store: + return None + return list(self._store[stream]) + + async def delete_stream(self, stream): + async with self._lock: + self._store.pop(stream, None) + + async def most_recent_item(self, stream): + async with self._lock: + items = self._store.get(stream) + if not items: + return None + return items[-1] + + async def walk(self): + async with self._lock: + snapshot = {k: list(v) for k, v in self._store.items()} + result = [] + for items in snapshot.values(): + result.extend(items) + return result + + async def walk_streams(self): + '''Return ``(stream_key, items_list)`` for every stream.''' + async with self._lock: + snapshot = {k: list(v) for k, v in self._store.items()} + return list(snapshot.items()) class FSStorage(StreamStorage): - """ - A Merkle DAG implementation that uses a file system as a backing store. - """ - def __init__(self, path): + '''Filesystem-backed async storage (one JSONL file per stream). + + Each stream is stored as a file where every line is a single + JSON-serialized item. Filenames are the SHA-256 of the stream + key to avoid path-traversal and encoding issues. + + Blocking file I/O is offloaded to the default thread-pool executor + via :meth:`asyncio.loop.run_in_executor`. + + Parameters + ---------- + path : str + Directory in which stream files are created. Will be created + (including parents) if it does not exist. + executor : concurrent.futures.Executor, optional + The executor to use for blocking I/O. ``None`` (the default) + uses the loop's default executor. + + Notes + ----- + This backend is adequate for prototyping but has several + performance limitations: + + - ``most_recent_item`` reads the entire file to return the last + line. + - ``rename_or_alias_stream`` is not atomic across crashes. + - No write-ahead log or fsync guarantees. + ''' + + def __init__(self, path: str, executor: Optional[Executor] = None): super().__init__() self.path = path - - def _fn(self, stream): + self._executor = executor + os.makedirs(path, exist_ok=True) + # Async lock to serialize operations within the event loop + self._lock = asyncio.Lock() + # Maintain a reverse mapping: filename_hash -> stream_key + # so walk_streams can report meaningful keys + self._key_map: Dict[str, str] = {} + + def _fn(self, stream: str) -> str: + '''Map a stream name to a filesystem path. + + Uses SHA-256 of the stream name to produce a safe, fixed-length + filename that avoids path-traversal and special-character + issues. ''' - Get the filename for a stream. + safe = hashlib.sha256(stream.encode('utf-8')).hexdigest() + self._key_map[safe] = stream + return os.path.join(self.path, safe) - This is prototype code. We should escape the stream name robustly to avoid - security issues and collisions. - ''' - safer_filename = "".join(c for c in stream if c.isalnum() or c in '-_') - return os.path.join(self.path, safer_filename) + async def _run_in_executor(self, fn, *args): + '''Run a blocking callable in the thread-pool executor.''' + loop = asyncio.get_running_loop() + return await loop.run_in_executor(self._executor, fn, *args) - def _append_to_stream(self, stream, item): - ''' - Append an item to a stream. - ''' - with open(self._fn(stream), 'a') as f: - f.write(json_dump(item)) - f.write('\n') + # -- sync helpers called inside the executor ------------------------- - def _rename_or_alias_stream(self, stream, alias): - ''' - Rename a stream. - ''' - os.rename(self._fn(stream), self._fn(alias)) + @staticmethod + def _sync_append(path: str, data: str): + with open(path, 'a') as f: + f.write(data + '\n') - def _get_stream_data(self, stream): - ''' - Get the stream. - ''' - if not os.path.exists(self._fn(stream)): + @staticmethod + def _sync_rename(src: str, dst: str): + os.rename(src, dst) + + @staticmethod + def _sync_read(path: str) -> Optional[List[dict]]: + if not os.path.exists(path): + return None + with open(path, 'r') as f: + return [json_load(line) for line in f if line.strip()] + + @staticmethod + def _sync_delete(path: str): + if os.path.exists(path): + os.remove(path) + + @staticmethod + def _sync_listdir(directory: str) -> List[str]: + return os.listdir(directory) + + @staticmethod + def _sync_read_file(filepath: str) -> List[dict]: + items = [] + if os.path.isfile(filepath): + with open(filepath, 'r') as f: + for line in f: + line = line.strip() + if line: + items.append(json_load(line)) + return items + + # -- async interface ------------------------------------------------- + + async def append_to_stream(self, stream, item): + async with self._lock: + path = self._fn(stream) + data = json_dump(item) + await self._run_in_executor(self._sync_append, path, data) + + async def rename_or_alias_stream(self, stream, alias): + async with self._lock: + src, dst = self._fn(stream), self._fn(alias) + if src == dst: + return + await self._run_in_executor(self._sync_rename, src, dst) + + async def get_stream_data(self, stream): + path = self._fn(stream) + return await self._run_in_executor(self._sync_read, path) + + async def delete_stream(self, stream): + path = self._fn(stream) + await self._run_in_executor(self._sync_delete, path) + + async def most_recent_item(self, stream): + data = await self.get_stream_data(stream) + if not data: return None - with open(self._fn(stream), 'r') as f: - return [json_load(line) for line in f.readlines()] + return data[-1] - def _delete_stream(self, sha_key): - ''' - Delete a stream. - ''' - os.remove(self._fn(sha_key)) + async def walk(self): + filenames = await self._run_in_executor(self._sync_listdir, self.path) + result = [] + for filename in filenames: + filepath = os.path.join(self.path, filename) + items = await self._run_in_executor(self._sync_read_file, filepath) + result.extend(items) + return result + + async def walk_streams(self): + '''Return ``(stream_key, items_list)`` for every stream.''' + filenames = await self._run_in_executor(self._sync_listdir, self.path) + result = [] + for filename in filenames: + filepath = os.path.join(self.path, filename) + stream_key = self._key_map.get(filename, filename) + items = await self._run_in_executor(self._sync_read_file, filepath) + result.append((stream_key, items)) + return result + +class KVSStorage(StreamStorage): + '''Storage backend that delegates to an existing ``_KVS`` instance. + + Each stream is stored as a single KVS entry whose value is a JSON + list of items. Every append does a read-modify-write cycle. + + Parameters + ---------- + kvs : _KVS + An instantiated KVS backend (``InMemoryKVS``, ``PersistentRedisKVS``, + ``EphemeralRedisKVS``, ``FilesystemKVS``, etc.). + prefix : str, optional + A key prefix applied to every stream key before storage. + Defaults to ``"merkle:"``. + ''' + + def __init__(self, kvs: '_KVS', prefix: str = 'merkle:'): + super().__init__() + self._kvs = kvs + self._prefix = prefix + self._lock = asyncio.Lock() + # Track keys ourselves because KVS.keys() may return + # all keys in the backend (including non-merkle ones), + # and some backends (Redis) decode keys differently. + self._known_keys: set = set() + + def _key(self, stream: str) -> str: + '''Prefix a stream name to produce the KVS key.''' + return f'{self._prefix}{stream}' + + async def append_to_stream(self, stream: str, item: dict): + async with self._lock: + key = self._key(stream) + data = await self._kvs[key] + if data is None: + data = [] + if not isinstance(data, list): + data = [] + data.append(item) + await self._kvs.set(key, data) + self._known_keys.add(key) + + async def rename_or_alias_stream(self, stream: str, alias: str): + async with self._lock: + if alias == stream: + return + src_key = self._key(stream) + dst_key = self._key(alias) + data = await self._kvs[src_key] + if data is None: + return + # Write to new key first + await self._kvs.set(dst_key, data) + self._known_keys.add(dst_key) + # Remove old key + await self._remove_key(src_key) + self._known_keys.discard(src_key) + + async def _remove_key(self, key: str): + '''Remove a key from the underlying KVS, trying available methods.''' + if hasattr(self._kvs, 'remove'): + await self._kvs.remove(key) + elif hasattr(self._kvs, '__delitem__'): + await self._kvs.__delitem__(key) + else: + # Last resort: overwrite with None + await self._kvs.set(key, None) - def _most_recent_item(self, stream): - ''' - Get the most recent item in a stream. - ''' - data = self._get_stream_data(stream) + async def get_stream_data(self, stream: str) -> Optional[List[dict]]: + key = self._key(stream) + data = await self._kvs[key] if data is None: return None - if len(data) == 0: + if not isinstance(data, list): + return None + return list(data) + + async def delete_stream(self, stream: str): + key = self._key(stream) + await self._remove_key(key) + self._known_keys.discard(key) + + async def most_recent_item(self, stream: str) -> Optional[dict]: + key = self._key(stream) + data = await self._kvs[key] + if not data or not isinstance(data, list): return None return data[-1] - def _walk(self): + async def walk(self) -> List[dict]: + result: List[dict] = [] + for key in list(self._known_keys): + data = await self._kvs[key] + if isinstance(data, list): + result.extend(data) + return result + + async def walk_streams(self) -> List[Tuple[str, List[dict]]]: + result: List[Tuple[str, List[dict]]] = [] + prefix_len = len(self._prefix) + for key in list(self._known_keys): + data = await self._kvs[key] + if isinstance(data, list): + stream_name = key[prefix_len:] if key.startswith(self._prefix) else key + result.append((stream_name, data)) + return result + + async def debug_dump(self): + '''Print diagnostic info about what is actually stored. + + Useful for debugging Redis integration issues. ''' - Walk the DAG. This is used for debugging. - ''' - for filename in os.listdir(self.path): - with open(os.path.join(self.path, filename), 'r') as f: - for line in f.readlines(): - yield json_load(line) + print(f'[KVSStorage debug] prefix={self._prefix!r}') + print(f'[KVSStorage debug] tracked keys ({len(self._known_keys)}):') + for key in sorted(self._known_keys): + data = await self._kvs[key] + item_count = len(data) if isinstance(data, list) else '(not a list)' + data_type = type(data).__name__ + print(f' {key!r} -> type={data_type}, items={item_count}') + + # Also check what the KVS backend itself reports + try: + all_backend_keys = await self._kvs.keys() + merkle_keys = [k for k in all_backend_keys if k.startswith(self._prefix)] + print(f'[KVSStorage debug] backend keys with our prefix ({len(merkle_keys)}):') + for key in sorted(merkle_keys): + data = await self._kvs[key] + item_count = len(data) if isinstance(data, list) else '(not a list)' + print(f' {key!r} -> items={item_count}') + except Exception as e: + print(f'[KVSStorage debug] could not enumerate backend keys: {e}') + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +CATEGORIES: Set[str] = { + 'teacher', 'student', 'school', 'classroom', 'course', 'assignment', 'tool', +} +''' +The set of category keys recognized for parent-stream propagation. +When a session containing one of these keys is closed, a +``child_session_finished`` event is appended to the corresponding +single-category parent stream. +''' -class InMemoryStorage(StreamStorage): - """ - A Merkle DAG implementation that uses in-memory storage. - """ - def __init__(self): - super().__init__() - self.store = {} +STORES = { + 'fs': FSStorage, + 'inmemory': InMemoryStorage, + 'kvs': KVSStorage, +} +DEFAULT_STORE = 'inmemory' +''' +Registry of available storage backends, keyed by short name. - def _append_to_stream(self, stream, item): - if stream not in self.store: - self.store[stream] = [] - self.store[stream].append(item) +Used by configuration-driven code to instantiate a backend from a +string identifier. +''' - def _rename_or_alias_stream(self, stream, alias): - if alias == stream: - return - self.store[alias] = self.store[stream] - del self.store[stream] - def _get_stream_data(self, stream): - return self.store[stream] +# --------------------------------------------------------------------------- +# Smoke tests +# --------------------------------------------------------------------------- +async def test_case_inmemory(): + '''Original smoke test using InMemoryStorage.''' + storage = InMemoryStorage() + merkle = Merkle(storage, CATEGORIES) - def _delete_stream(self, stream): - del self.store[stream] + session = { + 'teacher': ['Mr. A'], + 'student': ['John'], + } - def _most_recent_item(self, stream): - if stream not in self.store: - return None - if len(self.store[stream]) == 0: - return None - return self.store[stream][-1] - - def _walk(self): - for stream in self.store: - for item in self.store[stream]: - yield item - - -CATEGORIES = set( - [ - "teacher", - "student", - "school", - "classroom", - "course", - "assignment" - ] -) + await merkle.start(session) + await merkle.event_to_session({'type': 'event', 'payload': 'A'}, session, label='A') + await merkle.event_to_session({'type': 'event', 'payload': 'B'}, session, label='B') + await merkle.event_to_session({'type': 'event', 'payload': 'C'}, session, label='C') + final_hash = await merkle.close_session(session) + assert await merkle.verify_chain(final_hash) + print(f'[inmemory] Chain verified: {final_hash}') -STORES = { - "kafka": KafkaStorage, - "fs": FSStorage, - "inmemory": InMemoryStorage -} + for parent_key in [json_dump({'student': 'John'}), json_dump({'teacher': 'Mr. A'})]: + data = await storage.get_stream_data(parent_key) + if data: + print(f'[inmemory] Parent stream {parent_key[:40]}... has {len(data)} item(s)') + tombstone = await merkle.delete_stream_with_tombstone(final_hash, reason='GDPR request') + print(f'[inmemory] Tombstone: {tombstone["tombstone_hash"]}') + assert await storage.get_stream_data(final_hash) is None + print('[inmemory] All checks passed.') -def test_case(): - """ - A test case, mostly used to demo the Merkle DAG. It doesn't check for - correctness yet, but does show a simple visualization of the DAG. - """ - big_session = { - "teacher": ["Ms. Q", "Mr. R"], - "student": ["John"], - "school": ["Washington Elementary"], - "classroom": ["4A"], - "course": ["Math"] - } - small_session = { - "teacher": ["Mr. A"], - "student": ["John"] + +async def test_case_kvs_basic(): + '''Basic start -> events -> close -> verify cycle on KVSStorage.''' + from learning_observer.kvs import InMemoryKVS as KVSInMemory + + kvs_backend = KVSInMemory() + storage = KVSStorage(kvs_backend) + merkle = Merkle(storage, CATEGORIES) + + session = { + 'teacher': ['Ms. B'], + 'student': ['Alice'], } - session = small_session - STORAGE = 'FS' + await merkle.start(session) + await merkle.event_to_session({'type': 'event', 'payload': 'X'}, session, label='X') + await merkle.event_to_session({'type': 'event', 'payload': 'Y'}, session, label='Y') + final_hash = await merkle.close_session(session) + + assert await merkle.verify_chain(final_hash) + print(f'[kvs-basic] Chain verified: {final_hash}') + + # Confirm the stream was renamed (old session key should be gone) + old_key = session_key(session) + assert await storage.get_stream_data(old_key) is None, \ + 'Session key should be removed after close' - if STORAGE == 'MEMORY': - storage = InMemoryStorage() - elif STORAGE == 'FS': - if not os.path.exists('/tmp/merkle_dag'): - os.mkdir('/tmp/merkle_dag') - storage = FSStorage('/tmp/merkle_dag') - else: - raise NotImplementedError(STORAGE) + # Confirm the stream is retrievable by final hash + data = await storage.get_stream_data(final_hash) + assert data is not None and len(data) == 4, \ + f'Expected 4 items (start + 2 events + close), got {len(data) if data else 0}' + print(f'[kvs-basic] Stream has {len(data)} items as expected.') + # Confirm parent streams were propagated + for parent_key in [json_dump({'student': 'Alice'}), json_dump({'teacher': 'Ms. B'})]: + parent_data = await storage.get_stream_data(parent_key) + assert parent_data is not None and len(parent_data) >= 1, \ + f'Parent stream {parent_key[:30]}... should have at least 1 item' + print(f'[kvs-basic] Parent stream {parent_key[:30]}... has {len(parent_data)} item(s)') + + print('[kvs-basic] All checks passed.') + + +async def test_case_kvs_tombstone(): + '''Tombstone deletion on KVSStorage.''' + from learning_observer.kvs import InMemoryKVS as KVSInMemory + + kvs_backend = KVSInMemory() + storage = KVSStorage(kvs_backend) merkle = Merkle(storage, CATEGORIES) - merkle.start(session) - merkle.event_to_session({"type": "event", "event": "A", "name": "1st"}, session, label="A") - merkle.event_to_session({"type": "event", "event": {"B": "c"}, "name": "2nd"}, session, label="B") - merkle.event_to_session({"type": "event", "event": {"B": "c"}}, session, label="C") - merkle.close_session(session) - G = storage.to_graphviz() - import PIL.Image as Image - import io - Image.open(io.BytesIO(G.create_png())).show() - - -if __name__ == "__main__": - test_case() + + session = {'student': ['Bob']} + + await merkle.start(session) + await merkle.event_to_session({'type': 'event', 'payload': 'secret'}, session, label='secret') + final_hash = await merkle.close_session(session) + + assert await merkle.verify_chain(final_hash) + print(f'[kvs-tombstone] Chain verified before deletion: {final_hash}') + + # Delete with tombstone + tombstone = await merkle.delete_stream_with_tombstone(final_hash, reason='GDPR Art. 17') + print(f'[kvs-tombstone] Tombstone hash: {tombstone["tombstone_hash"]}') + + # Original stream should be gone + assert await storage.get_stream_data(final_hash) is None, \ + 'Deleted stream should return None' + + # Tombstone should be stored + tombstone_key = f'__tombstone__{final_hash}' + tombstone_data = await storage.get_stream_data(tombstone_key) + assert tombstone_data is not None and len(tombstone_data) == 1, \ + 'Tombstone stream should have exactly 1 record' + assert tombstone_data[0]['type'] == 'tombstone' + assert tombstone_data[0]['reason'] == 'GDPR Art. 17' + assert tombstone_data[0]['item_count'] == 3 # start + event + close + print(f'[kvs-tombstone] Tombstone record verified: {tombstone_data[0]["item_count"]} items recorded.') + + # Attempting to delete again should raise + try: + await merkle.delete_stream_with_tombstone(final_hash, reason='duplicate') + assert False, 'Should have raised ValueError' + except ValueError: + print('[kvs-tombstone] Double-delete correctly rejected.') + + print('[kvs-tombstone] All checks passed.') + + +async def test_case_kvs_break_session(): + '''Session break (segmentation) on KVSStorage.''' + from learning_observer.kvs import InMemoryKVS as KVSInMemory + + kvs_backend = KVSInMemory() + storage = KVSStorage(kvs_backend) + merkle = Merkle(storage, CATEGORIES) + + session = {'student': ['Carol'], 'tool': ['notebook']} + + await merkle.start(session) + await merkle.event_to_session({'type': 'event', 'payload': 'part1'}, session, label='part1') + + # Break the session — closes segment 1, starts segment 2 + segment1_hash = await merkle.break_session(session) + print(f'[kvs-break] Segment 1 hash: {segment1_hash}') + + # Verify segment 1 + assert await merkle.verify_chain(segment1_hash) + print('[kvs-break] Segment 1 verified.') + + # Continue in segment 2 + await merkle.event_to_session({'type': 'event', 'payload': 'part2'}, session, label='part2') + final_hash = await merkle.close_session(session) + print(f'[kvs-break] Segment 2 (final) hash: {final_hash}') + + # Verify segment 2 + assert await merkle.verify_chain(final_hash) + print('[kvs-break] Segment 2 verified.') + + # Segment 2 should reference segment 1 via the continuation link + seg2_data = await storage.get_stream_data(final_hash) + assert seg2_data is not None + first_item = seg2_data[0] + assert first_item['event']['type'] == 'continue' + assert first_item['event']['continues'] == segment1_hash + assert segment1_hash in first_item['children'], \ + 'Continuation item should include segment 1 hash in children' + print('[kvs-break] Cross-segment linkage confirmed.') + + print('[kvs-break] All checks passed.') + + +async def test_case_kvs_walk(): + '''walk() and walk_streams() on KVSStorage.''' + from learning_observer.kvs import InMemoryKVS as KVSInMemory + + kvs_backend = KVSInMemory() + storage = KVSStorage(kvs_backend) + merkle = Merkle(storage, CATEGORIES) + + # Create two separate sessions + session_a = {'student': ['Dave']} + session_b = {'student': ['Eve']} + + await merkle.start(session_a) + await merkle.event_to_session({'type': 'event', 'payload': 'a1'}, session_a) + hash_a = await merkle.close_session(session_a) + + await merkle.start(session_b) + await merkle.event_to_session({'type': 'event', 'payload': 'b1'}, session_b) + await merkle.event_to_session({'type': 'event', 'payload': 'b2'}, session_b) + hash_b = await merkle.close_session(session_b) + + # walk() should return all items across all streams + all_items = await storage.walk() + print(f'[kvs-walk] Total items across all streams: {len(all_items)}') + assert len(all_items) > 0 + + # walk_streams() should return identifiable stream groups + streams = await storage.walk_streams() + stream_keys = [s[0] for s in streams] + print(f'[kvs-walk] Streams found: {len(streams)}') + for key, items in streams: + display = key if len(key) <= 40 else key[:37] + '...' + print(f' [{display}] -> {len(items)} item(s)') + + # The closed session streams should appear under their final hashes + assert any(k == hash_a for k in stream_keys), \ + f'Stream {hash_a[:16]}... not found in walk_streams' + assert any(k == hash_b for k in stream_keys), \ + f'Stream {hash_b[:16]}... not found in walk_streams' + + print('[kvs-walk] All checks passed.') + + +async def test_case_kvs_prefix_isolation(): + '''Two KVSStorage instances with different prefixes sharing the same KVS backend.''' + from learning_observer.kvs import InMemoryKVS as KVSInMemory + + kvs_backend = KVSInMemory() + storage_a = KVSStorage(kvs_backend, prefix='merkle_a:') + storage_b = KVSStorage(kvs_backend, prefix='merkle_b:') + merkle_a = Merkle(storage_a, CATEGORIES) + merkle_b = Merkle(storage_b, CATEGORIES) + + session = {'student': ['Frank']} + + # Write to storage_a + await merkle_a.start(session) + await merkle_a.event_to_session({'type': 'event', 'payload': 'from_a'}, session) + hash_a = await merkle_a.close_session(session) + + # Write to storage_b with the same session descriptor + await merkle_b.start(session) + await merkle_b.event_to_session({'type': 'event', 'payload': 'from_b'}, session) + hash_b = await merkle_b.close_session(session) + + # Both should verify independently + assert await merkle_a.verify_chain(hash_a) + assert await merkle_b.verify_chain(hash_b) + print(f'[kvs-prefix] Chain A verified: {hash_a[:16]}...') + print(f'[kvs-prefix] Chain B verified: {hash_b[:16]}...') + + # walk_streams should only see items from their own prefix + streams_a = await storage_a.walk_streams() + streams_b = await storage_b.walk_streams() + + items_a = await storage_a.walk() + items_b = await storage_b.walk() + + # They should not overlap (different payloads, different timestamps → different hashes) + hashes_a = {item['hash'] for item in items_a if 'hash' in item} + hashes_b = {item['hash'] for item in items_b if 'hash' in item} + overlap = hashes_a & hashes_b + # Parent streams for the same student may not overlap because timestamps differ, + # but the closed-session hashes definitely shouldn't match + assert hash_a not in hashes_b, 'Hash A leaked into storage B' + assert hash_b not in hashes_a, 'Hash B leaked into storage A' + print(f'[kvs-prefix] Storage A: {len(streams_a)} streams, {len(items_a)} items') + print(f'[kvs-prefix] Storage B: {len(streams_b)} streams, {len(items_b)} items') + + print('[kvs-prefix] All checks passed.') + + +async def test_case_kvs_verify_tamper_detection(): + '''Verify that tampering with a stored item is detected.''' + from learning_observer.kvs import InMemoryKVS as KVSInMemory + + kvs_backend = KVSInMemory() + storage = KVSStorage(kvs_backend) + merkle = Merkle(storage, CATEGORIES) + + session = {'student': ['Grace']} + + await merkle.start(session) + await merkle.event_to_session({'type': 'event', 'payload': 'original'}, session) + final_hash = await merkle.close_session(session) + + # Verify clean chain + assert await merkle.verify_chain(final_hash) + print('[kvs-tamper] Clean chain verified.') + + # Now tamper: modify the event payload of the middle item + kvs_key = f'{storage._prefix}{final_hash}' + data = await kvs_backend[kvs_key] + assert data is not None and len(data) == 3 # start + event + close + + # Corrupt the second item's event + data[1]['event']['payload'] = 'TAMPERED' + await kvs_backend.set(kvs_key, data) + + # Verification should now fail + try: + await merkle.verify_chain(final_hash) + assert False, 'Tampered chain should have failed verification' + except ValueError as e: + print(f'[kvs-tamper] Tamper correctly detected: {e}') + + print('[kvs-tamper] All checks passed.') + + +async def test_case_kvs_visualization(): + '''Visualization export on KVSStorage (only runs if networkx/pydot available).''' + if not HAS_VIZ: + print('[kvs-viz] Skipping (networkx/pydot not installed)') + return + + from learning_observer.kvs import InMemoryKVS as KVSInMemory + + kvs_backend = KVSInMemory() + storage = KVSStorage(kvs_backend) + merkle = Merkle(storage, CATEGORIES) + + session = {'student': ['Heidi'], 'tool': ['canvas']} + + await merkle.start(session) + await merkle.event_to_session({'type': 'event', 'payload': 'draw'}, session, label='draw') + await merkle.event_to_session({'type': 'event', 'payload': 'erase'}, session, label='erase') + final_hash = await merkle.close_session(session) + + # Delete the session to get a tombstone in the graph + tombstone = await merkle.delete_stream_with_tombstone(final_hash, reason='test cleanup') + + # NetworkX export + G = await storage.to_networkx() + tombstone_nodes = [n for n, d in G.nodes(data=True) if d.get('tombstone')] + deleted_nodes = [n for n, d in G.nodes(data=True) if d.get('classification') == 'deleted'] + edge_types = {} + for u, v, d in G.edges(data=True): + et = d.get('edge_type', 'unknown') + edge_types[et] = edge_types.get(et, 0) + 1 + + print(f'[kvs-viz] NetworkX: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges') + print(f'[kvs-viz] Tombstone nodes: {len(tombstone_nodes)}') + print(f'[kvs-viz] Deleted placeholders: {len(deleted_nodes)}') + print(f'[kvs-viz] Edge types: {edge_types}') + + assert len(tombstone_nodes) >= 1, 'Should have at least one tombstone node' + assert len(deleted_nodes) >= 1, 'Should have at least one deleted placeholder' + + # Graphviz export + dot = await storage.to_graphviz() + print(f'[kvs-viz] Graphviz: {len(dot.get_node_list())} nodes, {len(dot.get_edge_list())} edges') + + print('[kvs-viz] All checks passed.') + + +async def test_case_kvs_redis(): + '''Full lifecycle on KVSStorage backed by a real Redis instance. + + This test bootstraps the Learning Observer settings system via + ``learning_observer.offline.init()`` so that the Redis connection + can resolve its host/port/password from the standard config. If + the settings system or Redis is unavailable, the test is skipped + gracefully. + ''' + # --- bootstrap the settings system -------------------------------- + try: + import learning_observer.offline + learning_observer.offline.init() + except Exception as e: + raise e + print(f'[kvs-redis] Skipping — could not initialize settings: {e}') + return + + # --- get the KVS that init() wired up ----------------------------- + try: + from learning_observer.kvs import KVS + kvs_backend = KVS() + await kvs_backend.set('__merkle_redis_ping__', 'pong') + pong = await kvs_backend['__merkle_redis_ping__'] + assert pong == 'pong', f'Redis ping failed: got {pong!r}' + print('[kvs-redis] Redis connection confirmed.') + except Exception as e: + print(f'[kvs-redis] Skipping — could not connect to Redis: {e}') + return + + import time + prefix = f'merkle_test_{int(time.time())}:' + storage = KVSStorage(kvs_backend, prefix=prefix) + merkle = Merkle(storage, CATEGORIES) + + session = { + 'teacher': ['Dr. Redis'], + 'student': ['Ivy'], + 'tool': ['terminal'], + } + + # --- lifecycle ---------------------------------------------------- + await merkle.start(session, metadata={'test': True}) + await merkle.event_to_session( + {'type': 'event', 'payload': 'command_1'}, session, label='cmd1', + ) + await merkle.event_to_session( + {'type': 'event', 'payload': 'command_2'}, session, label='cmd2', + ) + final_hash = await merkle.close_session(session) + print(f'[kvs-redis] Session closed: {final_hash}') + + # --- debug: show what's actually in Redis ------------------------- + await storage.debug_dump() + + # --- verify ------------------------------------------------------- + assert await merkle.verify_chain(final_hash) + print('[kvs-redis] Chain verified.') + + data = await storage.get_stream_data(final_hash) + assert data is not None, 'Stream data is None — rename may have failed' + assert len(data) == 4, f'Expected 4 items (start + 2 events + close), got {len(data)}' + print(f'[kvs-redis] Stream has {len(data)} items.') + + # --- parents ------------------------------------------------------ + for parent_key in [json_dump({'student': 'Ivy'}), json_dump({'teacher': 'Dr. Redis'}), + json_dump({'tool': 'terminal'})]: + parent_data = await storage.get_stream_data(parent_key) + assert parent_data is not None and len(parent_data) >= 1 + print(f'[kvs-redis] Parent {parent_key[:30]}... OK ({len(parent_data)} items)') + + # --- tombstone ---------------------------------------------------- + tombstone = await merkle.delete_stream_with_tombstone(final_hash, reason='GDPR test') + assert await storage.get_stream_data(final_hash) is None + tombstone_data = await storage.get_stream_data(f'__tombstone__{final_hash}') + assert tombstone_data is not None and len(tombstone_data) == 1 + assert tombstone_data[0]['item_count'] == 4 + print(f'[kvs-redis] Tombstone verified.') + + # --- final debug dump --------------------------------------------- + await storage.debug_dump() + + # --- cleanup ------------------------------------------------------ + try: + all_keys = await kvs_backend.keys() + test_keys = [k for k in all_keys if k.startswith(prefix)] + if hasattr(kvs_backend, 'remove'): + for k in test_keys: + await kvs_backend.remove(k) + print(f'[kvs-redis] Cleaned up {len(test_keys)} test keys.') + except Exception as e: + print(f'[kvs-redis] Cleanup warning: {e}') + + print('[kvs-redis] All checks passed.') + + +async def test_all(): + '''Run all test cases.''' + tests = [ + ('InMemoryStorage', test_case_inmemory), + ('KVS Basic', test_case_kvs_basic), + ('KVS Tombstone', test_case_kvs_tombstone), + ('KVS Break Session', test_case_kvs_break_session), + ('KVS Walk', test_case_kvs_walk), + ('KVS Prefix Isolation', test_case_kvs_prefix_isolation), + ('KVS Tamper Detection', test_case_kvs_verify_tamper_detection), + ('KVS Visualization', test_case_kvs_visualization), + ('KVS Redis Integration', test_case_kvs_redis), + ] + + passed = 0 + failed = 0 + skipped = 0 + for name, test_fn in tests: + print(f'\n{"="*60}') + print(f'Running: {name}') + print(f'{"="*60}') + try: + await test_fn() + passed += 1 + except Exception as e: + failed += 1 + print(f'FAILED: {name}') + import traceback + traceback.print_exc() + + print(f'\n{"="*60}') + print(f'Results: {passed} passed, {failed} failed, {passed + failed} total') + print(f'{"="*60}') + + if failed > 0: + raise SystemExit(1) + + +if __name__ == '__main__': + asyncio.run(test_all())