diff --git a/.gitignore b/.gitignore index 6faa62cf..37cebf1f 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ node_modules/ coverage/ CLAUDE.md TASKS-DO-NOT-CHECK-IN.md +EDITORS-REPORT.md git-stunts-git-warp-*.tgz diff --git a/BACKLOG/OG-010-public-api-design-thinking.md b/BACKLOG/OG-010-public-api-design-thinking.md index af9f2efa..adbde7fa 100644 --- a/BACKLOG/OG-010-public-api-design-thinking.md +++ b/BACKLOG/OG-010-public-api-design-thinking.md @@ -1,6 +1,6 @@ # OG-010 — IBM Design Thinking Pass Over Public APIs And README -Status: ACTIVE +Status: DONE ## Problem diff --git a/BACKLOG/OG-014-streaming-content-attachments.md b/BACKLOG/OG-014-streaming-content-attachments.md index 29c100eb..c8f25bd7 100644 --- a/BACKLOG/OG-014-streaming-content-attachments.md +++ b/BACKLOG/OG-014-streaming-content-attachments.md @@ -1,102 +1,114 @@ -# OG-014 — Stream content attachments through git-cas +# OG-014 — Mandatory CAS blob storage with streaming I/O -Status: QUEUED +Status: DONE Legend: Observer Geometry -## Problem - -`getContent()` and `getEdgeContent()` currently return full `Uint8Array` -buffers. That means attachment reads materialize the entire payload in memory -before user code can process it. - -This is fine for small text blobs, but it is the wrong default shape for large -attachments: +Design doc: `docs/design/streaming-cas-blob-storage.md` -- the attachment may not fit comfortably in memory -- the caller cannot decide between buffered read and stream processing -- builder-facing docs risk teaching attachment reads as eager byte loads -- the current blob-storage abstraction still forces `retrieve()` to return a - full buffer rather than a stream-capable interface +## Problem -`git-warp` already contains a `CasBlobAdapter` that stores attachments in -`git-cas` with CDC chunking, but the public attachment path still terminates in -buffered reads. That leaves the most scalable backend present but not fully -expressed through the public API. +Content blob attachments in `git-warp` have two structural problems: -## Why this matters +### 1. CAS blob storage is opt-in -WARP graphs can legitimately carry attached documents, artifacts, and other -payloads that are larger than normal graph properties. +`attachContent()` and `attachEdgeContent()` accept an optional `blobStorage` +injection. When callers do not provide it, blobs fall through to raw +`persistence.writeBlob()` — a single unchunked Git object with no CDC +deduplication, no encryption support, and no streaming restore path. -The API should make the memory tradeoff explicit: +This means the substrate's chunking, deduplication, and encryption capabilities +are present but silently bypassed by default. There is no good reason for a +content blob to skip CAS. Every blob should be chunked. -- buffered reads when you actually want all bytes in memory -- streaming reads when you want to process incrementally +### 2. Neither write nor read paths support streaming -That decision should belong to the caller, not be forced by the default -attachment API shape. +**Write path**: `attachContent(nodeId, content)` accepts `Uint8Array | string`. +The caller must buffer the entire payload in memory before handing it to the +patch builder. `CasBlobAdapter.store()` then wraps that buffer in +`Readable.from([buf])` — a synthetic stream from an already-buffered payload. -## Current state +**Read path**: `getContent(nodeId)` returns `Promise`. The +full blob is materialized into memory before the caller can process it. +`CasBlobAdapter.retrieve()` calls `cas.restore()` which buffers internally. -Today the attachment read path is eager: +`git-cas` already supports streaming on both sides: +- `cas.store({ source })` accepts any readable/iterable source +- `cas.restoreStream()` returns `AsyncIterable` -- `getContent()` -> `Promise` -- `getEdgeContent()` -> `Promise` -- `BlobStoragePort.retrieve()` -> `Promise` -- default Git blob reads go through `readBlob()` and collect the full blob -- `CasBlobAdapter` can already store attachment content in `git-cas`, but it - still restores into one full buffer via `retrieve()` -- `git-cas` streaming restore is already used in `CasSeekCacheAdapter`, but not - yet exposed through attachment reads +The streaming substrate is there. It is not expressed through the public API. -## Desired outcome +## Why this matters -Make `git-cas` the first-class streaming attachment path without breaking the -simple buffered paths. +WARP graphs can carry attached documents, media, model weights, and other +payloads that are legitimately large. The API should not force full in-memory +buffering on either side of the I/O boundary. -Likely shape: +- Callers writing large content should be able to pipe a stream in +- Callers reading large content should be able to consume it incrementally +- Every blob should get CDC chunking and deduplication as a substrate guarantee +- The decision between buffered and streaming I/O should belong to the caller -- `getContentStream(nodeId)` -- `getEdgeContentStream(from, to, label)` -- `BlobStoragePort.retrieveStream(oid)` -- `CasBlobAdapter.retrieveStream(oid)` backed by `git-cas restoreStream()` -- a clear default/recommended way to wire `CasBlobAdapter` into `WarpApp.open()` - / `WarpCore.open()` for attachment storage +## Current state -Buffered helpers should remain available for convenience, but they should be -clearly layered on top of the stream-capable substrate. +As of `v15.0.1`: + +- `BlobStoragePort`: `store(content, options) → Promise`, + `retrieve(oid) → Promise` — both buffered +- `CasBlobAdapter`: fully implemented CAS adapter with CDC chunking, optional + encryption, backward-compat fallback to raw Git blobs — but only buffered I/O +- `CasBlobAdapter` is internal (not exported from `index.js`) +- `PatchBuilderV2.attachContent()`: accepts `Uint8Array | string`, uses + `blobStorage.store()` if injected, else raw `persistence.writeBlob()` +- `getContent()` / `getEdgeContent()`: returns `Promise`, + uses `blobStorage.retrieve()` if injected, else raw `persistence.readBlob()` +- `WarpApp` and `WarpCore` do not expose content read methods at all +- `git-cas` streaming (`restoreStream()`) is already used in + `CasSeekCacheAdapter` but not in blob reads +- `InMemoryGraphAdapter` has `writeBlob()`/`readBlob()` for browser/test path -Longer-term, if attachment storage standardizes on `git-cas`, the builder story -gets cleaner too: +## Desired outcome -- large attachments become chunked CAS assets -- reads can stream incrementally -- dedupe happens below the API surface -- legacy raw Git blob attachments can remain readable for compatibility +1. CAS blob storage is mandatory — no fallback to raw `writeBlob()` for content +2. Write path accepts streaming input and pipes through without buffering +3. Read path returns a stream the caller can consume incrementally +4. Buffered convenience methods remain available, layered on top of streams +5. Browser and in-memory paths still work via a conforming adapter +6. Legacy raw Git blob attachments remain readable for backward compatibility ## Acceptance criteria -1. `git-warp` exposes explicit streaming APIs for node and edge attachments. -2. Callers can choose stream vs buffered read intentionally. -3. `BlobStoragePort` grows a stream-capable retrieval contract. -4. `CasBlobAdapter` supports streaming retrieval via `git-cas`. -5. `git-cas` becomes the recommended path for large attachment storage. -6. Legacy raw Git blob attachments remain readable for compatibility. -7. Builder docs explain when to use buffered reads vs streams. -8. Large attachment reads no longer require full in-memory buffering by - default in the stream path. +1. Every content blob written through `attachContent()` / `attachEdgeContent()` + goes through `BlobStoragePort` — no raw `persistence.writeBlob()` fallback. +2. `attachContent()` / `attachEdgeContent()` accept streaming input + (`AsyncIterable`, `ReadableStream`, `Uint8Array`, `string`). +3. New `getContentStream()` / `getEdgeContentStream()` return + `AsyncIterable` for incremental consumption. +4. Existing `getContent()` / `getEdgeContent()` remain as buffered convenience, + implemented on top of the stream primitive. +5. `BlobStoragePort` grows `storeStream()` and `retrieveStream()` methods. +6. `CasBlobAdapter` implements streaming via `git-cas` natively. +7. An `InMemoryBlobStorageAdapter` implements the port contract for browser and + test paths. +8. Legacy raw Git blob attachments remain readable through backward-compat + fallback in `CasBlobAdapter.retrieveStream()`. +9. Content stream methods are exposed on `WarpApp` and `WarpCore`. ## Non-goals -- no automatic conversion of all existing attachment reads to streams -- no silent breaking change to `getContent()` / `getEdgeContent()` -- no attempt to solve whole-state out-of-core replay here +- No automatic migration of existing raw Git blobs to CAS format +- No silent breaking change to existing `getContent()` / `getEdgeContent()` + return types +- No attempt to solve whole-state out-of-core replay (that is OG-013) +- No encryption-by-default (encryption remains an opt-in CAS capability) ## Notes -This item is related to, but narrower than, -`OG-013-out-of-core-materialization-and-streaming-reads.md`. -`OG-013` is about whole-state and replay architecture. -This item is specifically about attachment payload I/O and making -`git-cas` the streaming/chunked attachment path. +This item supersedes the original OG-014 scope, which covered only streaming +reads. The expanded scope now includes mandatory CAS and streaming writes. + +Related items: +- `OG-013`: out-of-core materialization and streaming reads (broader, separate) +- `B160`: blob attachments via CAS (done, but opt-in — this item makes it + mandatory) +- `B163`: streaming restore for seek cache (done, pattern to follow for blobs) diff --git a/BACKLOG/OG-015-jsr-documentation-quality.md b/BACKLOG/OG-015-jsr-documentation-quality.md index ebe39108..45a6fbbe 100644 --- a/BACKLOG/OG-015-jsr-documentation-quality.md +++ b/BACKLOG/OG-015-jsr-documentation-quality.md @@ -1,6 +1,6 @@ # OG-015 — Raise JSR documentation quality score -Status: QUEUED +Status: DONE Legend: Observer Geometry diff --git a/BACKLOG/README.md b/BACKLOG/README.md index 1d472ec9..af4ed6db 100644 --- a/BACKLOG/README.md +++ b/BACKLOG/README.md @@ -1,6 +1,6 @@ # BACKLOG — Observer Geometry -Last updated: 2026-03-28 +Last updated: 2026-03-29 This directory holds promotable pre-design items for the current Observer Geometry tranche. @@ -26,9 +26,9 @@ Workflow: | DONE | OG-007 | Expand hash-stability coverage across snapshot flavors | [OG-007-hash-stability-coverage.md](OG-007-hash-stability-coverage.md) | | DONE | OG-008 | Make retargeting compatibility a hard major-version cut | [OG-008-retargeting-compatibility.md](OG-008-retargeting-compatibility.md) | | QUEUED | OG-009 | Align playback-head and TTD consumers after read nouns stabilize | [OG-009-playback-head-alignment.md](OG-009-playback-head-alignment.md) | -| ACTIVE | OG-010 | IBM Design Thinking pass over public APIs and README | [OG-010-public-api-design-thinking.md](OG-010-public-api-design-thinking.md) | +| DONE | OG-010 | IBM Design Thinking pass over public APIs and README | [OG-010-public-api-design-thinking.md](OG-010-public-api-design-thinking.md) | | QUEUED | OG-011 | Publish a public API catalog and browser documentation playground | [OG-011-public-api-catalog-and-playground.md](OG-011-public-api-catalog-and-playground.md) | | DONE | OG-012 | Audit and reconcile the documentation corpus before v15 | [OG-012-documentation-corpus-audit.md](OG-012-documentation-corpus-audit.md) | | QUEUED | OG-013 | Design out-of-core materialization and streaming reads | [OG-013-out-of-core-materialization-and-streaming-reads.md](OG-013-out-of-core-materialization-and-streaming-reads.md) | -| QUEUED | OG-014 | Stream content attachments through `git-cas` | [OG-014-streaming-content-attachments.md](OG-014-streaming-content-attachments.md) | -| QUEUED | OG-015 | Raise JSR documentation quality score | [OG-015-jsr-documentation-quality.md](OG-015-jsr-documentation-quality.md) | +| DONE | OG-014 | Mandatory CAS blob storage with streaming I/O | [OG-014-streaming-content-attachments.md](OG-014-streaming-content-attachments.md) | +| DONE | OG-015 | Raise JSR documentation quality score | [OG-015-jsr-documentation-quality.md](OG-015-jsr-documentation-quality.md) | diff --git a/CHANGELOG.md b/CHANGELOG.md index d93a3222..27562275 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,25 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [16.0.0] — 2026-03-29 + +### Added + +- **Streaming content attachment I/O (OG-014)** — `getContentStream()` and `getEdgeContentStream()` return `AsyncIterable` for incremental consumption of large content blobs. `attachContent()` and `attachEdgeContent()` now accept `AsyncIterable`, `ReadableStream`, `Uint8Array`, or `string` — streaming inputs are piped directly to blob storage without intermediate buffering. +- **`InMemoryBlobStorageAdapter`** — new domain-local adapter implementing `BlobStoragePort` with content-addressed `Map`-based storage for browser and test paths. Exported from the package surface. +- **`BlobStoragePort.storeStream()` / `retrieveStream()`** — streaming variants of the blob storage port contract. `storeStream()` accepts `AsyncIterable`, `retrieveStream()` returns `AsyncIterable`. +- **Content methods on `WarpApp` and `WarpCore`** — `getContent()`, `getContentStream()`, `getContentOid()`, `getContentMeta()` and their edge equivalents are now exposed on both public API surfaces (previously only on `WarpRuntime`). + +### Changed + +- **CAS blob storage is now mandatory for content attachments** — `attachContent()` and `attachEdgeContent()` always route through `BlobStoragePort`. The raw `persistence.writeBlob()` fallback has been removed. `WarpRuntime.open()` auto-constructs `CasBlobAdapter` for Git-backed persistence (when `plumbing` is available) or `InMemoryBlobStorageAdapter` otherwise. +- **Content blob tree entries use tree mode** — patch commit trees and checkpoint trees now reference content blobs as `040000 tree` entries (matching CAS tree OIDs) instead of `100644 blob` entries. + +### Removed + +- **`TraversalService`** — deprecated alias removed. Use `CommitDagTraversalService` directly. +- **`createWriter()`** — deprecated method removed from `WarpApp` and `WarpCore`. Use `writer()` or `writer(id)` instead. + ## [15.0.0] — 2026-03-28 ## [15.0.1] — 2026-03-28 diff --git a/MIGRATING.md b/MIGRATING.md new file mode 100644 index 00000000..b15de6ce --- /dev/null +++ b/MIGRATING.md @@ -0,0 +1,108 @@ +# Migrating to v16 + +This guide covers the breaking changes in v16.0.0 and how to update your code. + +## Content attachments now require blob storage (OG-014) + +**What changed:** `attachContent()` and `attachEdgeContent()` no longer fall +back to raw `persistence.writeBlob()`. They always route through +`BlobStoragePort`. Without blob storage, they throw `NO_BLOB_STORAGE`. + +**Who is affected:** Only consumers who construct `PatchBuilderV2` directly +(bypassing `WarpRuntime.open()`) without passing `blobStorage`. If you use +`WarpApp.open()`, `WarpCore.open()`, or `WarpRuntime.open()`, blob storage +is auto-constructed — no code changes needed. + +**How to migrate:** + +```javascript +import InMemoryBlobStorageAdapter from '@git-stunts/git-warp/defaultBlobStorage'; + +// If you construct PatchBuilderV2 directly, add blobStorage: +const builder = new PatchBuilderV2({ + persistence, + writerId: 'alice', + blobStorage: new InMemoryBlobStorageAdapter(), // or CasBlobAdapter for Git + // ...other options +}); +``` + +## Streaming content I/O + +**What changed:** `attachContent()` and `attachEdgeContent()` now accept +streaming input (`AsyncIterable`, `ReadableStream`) +in addition to `Uint8Array` and `string`. New `getContentStream()` and +`getEdgeContentStream()` methods return `AsyncIterable`. + +**Who is affected:** No one — this is additive. Existing code continues to +work. New streaming APIs are opt-in. + +**How to use:** + +```javascript +// Streaming write — pipe a file directly +import { createReadStream } from 'node:fs'; +const patch = await app.createPatch(); +patch.addNode('doc:1'); +await patch.attachContent('doc:1', createReadStream('large-file.bin'), { + size: fileStat.size, + mime: 'application/octet-stream', +}); +await patch.commit(); + +// Streaming read — consume incrementally +const stream = await app.getContentStream('doc:1'); +if (stream) { + for await (const chunk of stream) { + process.stdout.write(chunk); + } +} + +// Buffered read — unchanged +const buf = await app.getContent('doc:1'); +``` + +## Content blob tree entries use tree mode + +**What changed:** Patch commit trees and checkpoint trees now reference +content blobs as `040000 tree` entries (CAS tree OIDs) instead of +`100644 blob` entries. + +**Who is affected:** Consumers who parse raw Git commit trees and expect +content anchor entries to use blob mode. This does not affect any public +API — it is an internal storage format change. + +**How to migrate:** If you parse `_content_` entries from commit trees, +update your parser to accept `040000 tree` mode. + +## `TraversalService` removed + +**What changed:** The `TraversalService` export was a deprecated alias for +`CommitDagTraversalService`. It has been removed. + +**How to migrate:** + +```javascript +// Before +import { TraversalService } from '@git-stunts/git-warp'; + +// After +import { CommitDagTraversalService } from '@git-stunts/git-warp'; +``` + +## `createWriter()` removed + +**What changed:** The `createWriter()` method on `WarpApp` was deprecated in +v15 and has been removed. Use `writer()` instead. + +**How to migrate:** + +```javascript +// Before +const w = await app.createWriter(); +const w2 = await app.createWriter({ persist: 'config', alias: 'secondary' }); + +// After +const w = await app.writer(); // resolves from git config or generates +const w2 = await app.writer('secondary'); // explicit ID +``` diff --git a/contracts/type-surface.m8.json b/contracts/type-surface.m8.json index 2c2001bb..eac565be 100644 --- a/contracts/type-surface.m8.json +++ b/contracts/type-surface.m8.json @@ -15,6 +15,9 @@ "BlobStoragePort": { "kind": "class" }, + "InMemoryBlobStorageAdapter": { + "kind": "class" + }, "BunHttpAdapter": { "kind": "class" }, @@ -633,11 +636,6 @@ "TraversalError": { "kind": "class" }, - "TraversalService": { - "kind": "class", - "deprecated": true, - "alias": "CommitDagTraversalService" - }, "WarpApp": { "kind": "class", "default": true @@ -1109,18 +1107,6 @@ ], "returns": "Promise" }, - "createWriter": { - "async": true, - "deprecated": true, - "params": [ - { - "name": "opts", - "type": "{ persist?: 'config' | 'none'; alias?: string }", - "optional": true - } - ], - "returns": "Promise" - }, "syncCoverage": { "async": true, "params": [], diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index 8f885391..8d5e30ed 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -1,8 +1,8 @@ # ROADMAP — @git-stunts/git-warp -> **Current release on `main`:** v15.0.0 -> **Next intended release:** v15.0.1 -> **Last reconciled:** 2026-03-28 (after publishing `v15.0.0` and then cleaning the npm/JSR artifact surface, packaged README links, and repo topology on `main`. `OG-010` is complete; `v15.0.1` is the release-correction patch.) +> **Current release on `main`:** v16.0.0 +> **Next intended release:** v16.0.1 +> **Last reconciled:** 2026-03-29 (v16.0.0 release. OG-014 streaming CAS blob storage, OG-015 JSR docs, deprecated TraversalService and createWriter removed.) > **Completed milestones:** [docs/ROADMAP/COMPLETED.md](ROADMAP/COMPLETED.md) --- diff --git a/docs/design/streaming-cas-blob-storage.md b/docs/design/streaming-cas-blob-storage.md new file mode 100644 index 00000000..dfc2a10d --- /dev/null +++ b/docs/design/streaming-cas-blob-storage.md @@ -0,0 +1,302 @@ +# Streaming CAS Blob Storage + +Status: IMPLEMENTED + +Legend: Observer Geometry + +Cycle: OG-014 + +## Why This Note Exists + +Content blob attachments in `git-warp` have two structural problems that +compound each other: + +1. CAS blob storage is opt-in. Callers who do not inject a `BlobStoragePort` get + raw unchunked Git blobs with no deduplication, no encryption path, and no + streaming restore. The substrate's chunking capability is present but silently + bypassed by default. + +2. Neither the write path nor the read path supports streaming. Writers must + buffer the entire payload before handing it to `attachContent()`. Readers get + the entire blob materialized into memory before they can process it. + +`git-cas` already supports streaming on both sides (`store({ source })` accepts +any readable, `restoreStream()` returns `AsyncIterable`). The streaming +substrate exists. It is not expressed through the public attachment API. + +## IBM Design Thinking Framing + +### Sponsor Human + +An application developer attaching documents, media, or artifacts to a WARP +graph. + +This person needs to: + +- attach large files without buffering the whole payload in process memory +- read attached content incrementally (pipe to disk, HTTP response, etc.) +- trust that the substrate handles chunking and deduplication without explicit + opt-in +- understand the cost difference between buffered and streaming reads + +### Sponsor Agent + +A coding agent managing content-bearing graph nodes on behalf of a user or +pipeline. + +This agent needs to: + +- discover that `attachContent()` accepts a stream, not just a buffer +- choose between `getContent()` (buffered) and `getContentStream()` (streaming) + based on payload size +- avoid accidentally buffering large payloads by following the simplest API path + +### Sponsor Tooling + +A debugger, sync pipeline, or transfer planner that needs to inspect or move +content blobs between coordinates. + +This sponsor needs to: + +- stream content through transfer operations without double-buffering +- rely on CAS chunking for efficient delta transfers +- trust that content OIDs are CAS tree OIDs with stable chunk structure + +If the API serves one sponsor while silently degrading for the others, the +design has failed. + +## Hills + +### Hill 1 — Every blob is chunked + +As an application developer, every content blob I attach to a WARP graph is +CDC-chunked through git-cas, regardless of how I opened the graph or whether I +explicitly configured blob storage. There is no path where my blobs silently +bypass chunking. + +### Hill 2 — Writes stream in + +As an application developer, I can pipe a readable source (file stream, HTTP +body, async generator) directly into `attachContent()` without buffering the +entire payload in memory first. + +### Hill 3 — Reads stream out + +As an application developer, I can consume attached content incrementally via +`getContentStream()`, deciding myself whether to buffer, pipe, or process +chunk-by-chunk. The buffered `getContent()` remains available as convenience but +is no longer the only option. + +## Invariants + +1. **CAS is mandatory for content blobs.** `attachContent()` and + `attachEdgeContent()` always go through `BlobStoragePort`. The raw + `persistence.writeBlob()` fallback for content is removed. + +2. **`AsyncIterable` is the domain stream type.** It is universal + across Node, Bun, and Deno. No `node:stream` dependency in domain code. + Infrastructure adapters convert to/from runtime-specific stream types at the + port boundary. + +3. **Backward compatibility for legacy blobs.** Content written as raw Git blobs + before CAS migration remains readable. `CasBlobAdapter.retrieveStream()` + falls back to yielding a single chunk from `persistence.readBlob()` when the + OID is not a CAS manifest. + +4. **`getContent()` / `getEdgeContent()` return types do not change.** They + remain `Promise` — buffered convenience methods + implemented on top of the stream primitive. + +5. **Port contract is the boundary.** `BlobStoragePort` defines the abstract + streaming contract. `CasBlobAdapter` implements it for Git-backed graphs. + `InMemoryBlobStorageAdapter` implements it for browser and test paths. No + domain code knows which adapter is active. + +## Non-Goals + +- **No automatic migration.** Existing raw Git blobs are not rewritten to CAS. + They are read through the backward-compat fallback path. +- **No encryption by default.** Encryption remains an opt-in CAS capability + configured at adapter construction time. This design makes CAS mandatory, not + encryption. +- **No whole-state streaming.** Out-of-core materialization and streaming + enumeration of graph state is OG-013. This item is specifically about content + attachment I/O. +- **No patch blob streaming.** `patchBlobStorage` (for encrypted patch CBOR) is + a separate concern. This design covers content attachments only. + +## Key Design Decisions + +### D1 — `BlobStoragePort` grows streaming methods + +Current contract: + +```text +store(content, options) → Promise +retrieve(oid) → Promise +``` + +New contract: + +```text +store(source, options) → Promise +retrieve(oid) → Promise +storeStream(source, options) → Promise +retrieveStream(oid) → AsyncIterable +``` + +`store()` keeps its current signature for backward compat with simple callers. +`storeStream()` accepts `AsyncIterable` as `source`. +`retrieveStream()` returns `AsyncIterable`. +`retrieve()` becomes sugar: collect `retrieveStream()` into a single buffer. + +### D2 — Write input normalization + +`attachContent()` and `attachEdgeContent()` accept a union type: + +```text +AsyncIterable | ReadableStream | Uint8Array | string +``` + +A domain utility (`normalizeToAsyncIterable()`) converts all input shapes to +`AsyncIterable` before calling `storeStream()`. This keeps the port +boundary clean while the public API remains ergonomic. + +For `Uint8Array` and `string` inputs, the size and mime metadata can still be +inferred synchronously before streaming. For true streaming inputs, callers +should provide `size` in the metadata options if they know it. + +### D3 — `InMemoryBlobStorageAdapter` + +A new infrastructure adapter implementing `BlobStoragePort` with `Map`-based +storage. No CDC chunking — it stores and retrieves content directly. Its purpose +is port conformance, not chunking behavior. + +This adapter is used by: + +- `InMemoryGraphAdapter`-based graphs (browser, tests) +- Any context where `@git-stunts/plumbing` is not available + +### D4 — Auto-construction of blob storage + +When `WarpRuntime.open()` receives a `persistence` adapter that has `plumbing` +(i.e., a `GitGraphAdapter`), it auto-constructs a `CasBlobAdapter` internally +if no explicit `blobStorage` was provided. This makes CAS the default without +requiring callers to manually wire it. + +When `persistence` does not have `plumbing` (i.e., `InMemoryGraphAdapter`), and +no `blobStorage` was provided, it auto-constructs an +`InMemoryBlobStorageAdapter`. + +The `blobStorage` parameter remains available for callers who want explicit +control (e.g., to provide an encryption key). + +### D5 — Public surface placement + +Content stream methods land on both `WarpApp` and `WarpCore`: + +- `getContentStream(nodeId)` → `AsyncIterable | null` +- `getEdgeContentStream(from, to, label)` → `AsyncIterable | null` + +These delegate to `WarpRuntime`, which delegates to `query.methods.js`. + +`getContent()` / `getEdgeContent()` also get exposed on `WarpApp` and +`WarpCore` (currently missing — a pre-existing gap). + +### D6 — Content metadata on streams + +`getContentMeta(nodeId)` returns `{ oid, mime, size }` without reading the +blob. Callers who want to decide between buffered and streaming reads based on +size can check metadata first: + +```javascript +const meta = await app.getContentMeta(nodeId); +if (meta && meta.size > THRESHOLD) { + for await (const chunk of app.getContentStream(nodeId)) { /* ... */ } +} else { + const buf = await app.getContent(nodeId); +} +``` + +This pattern is already supported — `getContentMeta()` exists on `WarpRuntime`. +It just needs to be surfaced on `WarpApp` / `WarpCore`. + +## Checkpoint Gates + +### Checkpoint 1 — Doctrine + +- [ ] This design doc reviewed and accepted +- [ ] OG-014 backlog item updated and marked ACTIVE +- [ ] No conflicts with OG-013 (out-of-core) or existing CAS work (B158–B164) + +### Checkpoint 2 — Spec + +- [ ] `BlobStoragePort` streaming contract tests written (red) +- [ ] `InMemoryBlobStorageAdapter` contract tests written (red) +- [ ] `CasBlobAdapter` streaming tests written (red) +- [ ] `attachContent()` streaming input tests written (red) +- [ ] `getContentStream()` / `getEdgeContentStream()` tests written (red) +- [ ] Auto-construction tests written (red) +- [ ] Legacy raw blob backward-compat tests written (red) + +### Checkpoint 3 — Semantic + +- [ ] `BlobStoragePort` updated with `storeStream()` / `retrieveStream()` +- [ ] `CasBlobAdapter` implements streaming via `git-cas` +- [ ] `InMemoryBlobStorageAdapter` created and wired +- [ ] `PatchBuilderV2` accepts streaming input +- [ ] `query.methods.js` implements `getContentStream()` / + `getEdgeContentStream()` +- [ ] `WarpRuntime.open()` auto-constructs blob storage +- [ ] Raw `writeBlob()` fallback removed from content write path +- [ ] All spec tests green +- [ ] Full test suite green (4000+ tests) +- [ ] `WarpGraph.noCoordination.test.js` passes + +### Checkpoint 4 — Surface + +- [ ] `WarpApp` and `WarpCore` expose content methods +- [ ] `index.d.ts` updated with streaming types +- [ ] `index.js` exports `InMemoryBlobStorageAdapter` +- [ ] CHANGELOG updated +- [ ] ROADMAP reconciled +- [ ] README updated if content examples exist + +## Affected Files + +### Ports + +- `src/ports/BlobStoragePort.js` — add `storeStream()`, `retrieveStream()` + +### Infrastructure + +- `src/infrastructure/adapters/CasBlobAdapter.js` — implement streaming methods +- `src/infrastructure/adapters/InMemoryBlobStorageAdapter.js` — new adapter +- `src/infrastructure/adapters/lazyCasInit.js` — no change expected + +### Domain + +- `src/domain/services/PatchBuilderV2.js` — streaming write input, + `normalizeToAsyncIterable()` call +- `src/domain/warp/query.methods.js` — `getContentStream()`, + `getEdgeContentStream()` +- `src/domain/warp/patch.methods.js` — forward blob storage (already done) +- `src/domain/WarpRuntime.js` — auto-construct blob storage in `open()` +- `src/domain/WarpApp.js` — expose content methods +- `src/domain/WarpCore.js` — expose content methods +- `src/domain/utils/streamUtils.js` — new: `normalizeToAsyncIterable()`, + `collectAsyncIterable()` + +### Tests + +- `test/unit/ports/BlobStoragePort.test.js` — streaming contract tests +- `test/unit/infrastructure/adapters/CasBlobAdapter.test.js` — streaming tests +- `test/unit/infrastructure/adapters/InMemoryBlobStorageAdapter.test.js` — new +- `test/unit/domain/services/PatchBuilderV2.content.test.js` — streaming input +- `test/unit/domain/WarpGraph.content.test.js` — streaming read tests +- `test/integration/api/content-attachment.test.js` — streaming round-trips + +### Public Surface + +- `index.js` — export `InMemoryBlobStorageAdapter` +- `index.d.ts` — streaming type declarations diff --git a/docs/design/streaming-cas-blob-storage.retro.md b/docs/design/streaming-cas-blob-storage.retro.md new file mode 100644 index 00000000..1fd142e3 --- /dev/null +++ b/docs/design/streaming-cas-blob-storage.retro.md @@ -0,0 +1,160 @@ +# OG-014 Retrospective — Streaming CAS Blob Storage + +Slice: OG-014 +Design doc: `docs/design/streaming-cas-blob-storage.md` +Backlog item: `BACKLOG/OG-014-streaming-content-attachments.md` +Landed: 2026-03-29 on `main` (5 commits, 30 files, +1595 / -100 lines) + +## Governing Documents + +- Design doc: `docs/design/streaming-cas-blob-storage.md` (6 design decisions, + 4 checkpoint gates, 3 hills) +- Original backlog item: `BACKLOG/OG-014-streaming-content-attachments.md` + (expanded from streaming-reads-only to mandatory CAS + streaming writes + + streaming reads) + +## What Actually Landed + +### Commits + +1. `5f0cebd` — doctrine: design doc + backlog promotion +2. `5dba666` — spec: 35 red-phase tests across 6 files +3. `af80714` — fix: test alignment + size/mime propagation bug +4. `ea0df5f` — semantic: full implementation, 5203 tests green +5. `05bdb3a` — surface: WarpApp/WarpCore methods, index.js/d.ts, CHANGELOG + +### New Production Code + +| File | What | +|---|---| +| `src/ports/BlobStoragePort.js` | `storeStream()`, `retrieveStream()` abstract methods | +| `src/domain/utils/defaultBlobStorage.js` | `InMemoryBlobStorageAdapter` — content-addressed Map storage | +| `src/domain/utils/streamUtils.js` | `normalizeToAsyncIterable()`, `isStreamingInput()` | +| `src/infrastructure/adapters/CasBlobAdapter.js` | `storeStream()`, `retrieveStream()` via git-cas | +| `src/domain/services/PatchBuilderV2.js` | Streaming input acceptance, mandatory CAS, no raw fallback | +| `src/domain/warp/query.methods.js` | `getContentStream()`, `getEdgeContentStream()` | +| `src/domain/WarpRuntime.js` | Auto-construction of blob storage in `open()` | +| `src/domain/WarpApp.js` | 8 content read methods on product surface | +| `src/domain/WarpCore.js` | 8 content read methods on plumbing surface | +| `src/domain/services/CheckpointService.js` | Tree mode change: `040000 tree` for content anchors | + +### New Test Coverage + +- 35 new spec tests (BlobStoragePort, InMemoryBlobStorageAdapter, CasBlobAdapter + streaming, PatchBuilderV2 streaming input, getContentStream, + getEdgeContentStream, auto-construction, no-raw-fallback) +- Updated ~20 existing tests for mandatory blobStorage +- All 5203 tests green + +## Design Alignment Audit + +### Hill 1 — Every blob is chunked +**Aligned.** `attachContent()` and `attachEdgeContent()` now throw +`E_NO_BLOB_STORAGE` without blob storage. `WarpRuntime.open()` +auto-constructs `CasBlobAdapter` (Git) or `InMemoryBlobStorageAdapter` +(browser/test). No path bypasses blob storage. + +### Hill 2 — Writes stream in +**Aligned.** `attachContent()` accepts `AsyncIterable | +ReadableStream | Uint8Array | string`. Streaming inputs route +to `storeStream()` without intermediate buffering. + +### Hill 3 — Reads stream out +**Aligned.** `getContentStream()` and `getEdgeContentStream()` return +`AsyncIterable`. Buffered `getContent()` / `getEdgeContent()` +remain as convenience. + +### D1 — BlobStoragePort grows streaming methods +**Aligned.** `storeStream()` and `retrieveStream()` added. `retrieve()` +remains as-is (not implemented as sugar over `retrieveStream()` — kept +independent for simplicity). + +### D2 — Write input normalization +**Aligned.** `normalizeToAsyncIterable()` in `streamUtils.js` handles all +four input types. `isStreamingInput()` detects async iterables and +ReadableStreams. + +### D3 — InMemoryBlobStorageAdapter +**Partially aligned.** Adapter created and functional, but lives in +`src/domain/utils/defaultBlobStorage.js` instead of +`src/infrastructure/adapters/` as originally planned. This was a deliberate +architectural decision: the adapter has zero infrastructure dependencies +(only `Map`, `TextEncoder`, `crypto.subtle`), so placing it in domain +follows the `defaultCodec.js` / `defaultClock.js` pattern and avoids a +domain→infrastructure import in `WarpRuntime.js`. + +### D4 — Auto-construction of blob storage +**Partially aligned.** Design said auto-construct based on +`persistence.plumbing`. Implementation does this, but uses a dynamic +`import()` for `CasBlobAdapter` — the only domain→infrastructure bridge +in the codebase. Accepted as a pragmatic trade-off: the alternative +(requiring all callers to wire CAS explicitly) contradicts the "every blob +gets chunked" invariant. + +### D5 — Public surface placement +**Aligned.** Content methods on both `WarpApp` and `WarpCore`. All 8 +content read methods (buffered + streaming, node + edge, for content, +OID, and metadata) exposed. + +### D6 — Content metadata on streams +**Aligned.** `getContentMeta()` / `getEdgeContentMeta()` now exposed on +`WarpApp` and `WarpCore` (previously only on `WarpRuntime`). Callers can +check size before deciding buffered vs streaming. + +## Observed Drift + +### 1. InMemoryBlobStorageAdapter placement +**Design:** `src/infrastructure/adapters/InMemoryBlobStorageAdapter.js` +**Actual:** `src/domain/utils/defaultBlobStorage.js` +**Reason:** Deliberate architectural decision — zero infrastructure deps. +**Resolution:** Accepted. The file name and module path differ from the +design doc but the behavior is identical. + +### 2. Content anchor tree mode +**Design:** Not explicitly addressed — design focused on I/O streaming. +**Actual:** Changed `100644 blob` → `040000 tree` in both +`PatchBuilderV2.commit()` and `CheckpointService.createV5()`. +**Reason:** CAS stores content as tree objects, not blobs. Git `mktree` +rejects blob-mode entries pointing at tree OIDs. +**Resolution:** Accepted as a necessary breaking change. Follow-on: legacy +graphs with raw blob content may need a migration helper if they create +new checkpoints after upgrading. Add to ROADMAP if reports surface. + +### 3. `retrieve()` not implemented as sugar over `retrieveStream()` +**Design:** D1 said "retrieve() becomes sugar: collect retrieveStream() +into a single buffer." +**Actual:** `retrieve()` remains independent — both `CasBlobAdapter` and +`InMemoryBlobStorageAdapter` implement it directly. +**Reason:** Implementation shortcut — the independent path is simpler and +avoids an extra async iteration overhead for the common buffered case. +**Resolution:** Accepted. No functional difference for callers. + +### 4. Dynamic import for CasBlobAdapter auto-construction +**Design:** D4 said "auto-constructs CasBlobAdapter internally when +persistence has plumbing." +**Actual:** Uses `await import('../infrastructure/adapters/CasBlobAdapter.js')` +inside an async function — the only domain→infrastructure dynamic import. +**Reason:** No way to statically import from infrastructure in domain +without violating hexagonal architecture. Dynamic import in an async +factory is the least-bad option. +**Resolution:** Accepted. The bridge is isolated in one function +(`autoConstructBlobStorage`), documented, and only fires for Git-backed +persistence. + +## What Went Well + +- The dev loop (doctrine → spec → semantic → surface) worked cleanly. + Red-phase tests caught real bugs during implementation (size/mime + propagation was broken for non-streaming inputs). +- The design doc's 6 decisions mapped directly to implementation steps. +- 5203 tests passing throughout — no regressions. +- The `InMemoryBlobStorageAdapter` placement decision avoided a hexagonal + architecture violation that would have been the first in the codebase. + +## What Could Improve + +- The content anchor tree mode change (`100644 blob` → `040000 tree`) was + not anticipated in the design doc. It should have been a design decision + (D7) since it's a breaking change affecting commit tree structure. +- The `retrieve()` vs `retrieveStream()` sugar decision should have been + called out explicitly in the design rather than silently diverged from. diff --git a/eslint.config.js b/eslint.config.js index da825de0..c3d89ebb 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -196,7 +196,6 @@ export default tseslint.config( // ── Relaxed complexity for algorithm-heavy modules ───────────────────────── { files: [ - "src/domain/services/TraversalService.js", "src/domain/services/IndexRebuildService.js", ], rules: { diff --git a/index.d.ts b/index.d.ts index e0d343aa..9c17ae36 100644 --- a/index.d.ts +++ b/index.d.ts @@ -244,12 +244,16 @@ export interface TraverseFacadeOptions { labelFilter?: string | string[]; } +/** Edge weight function for weighted traversal algorithms. */ export type EdgeWeightFn = (from: string, to: string, label: string) => number | Promise; +/** Node weight function for weighted traversal algorithms. */ export type NodeWeightFn = (nodeId: string) => number | Promise; +/** Selector for weighted cost traversal mode — supply either an edge or node weight function, not both. */ export type WeightedCostSelector = | { weightFn?: EdgeWeightFn; nodeWeightFn?: never } | { nodeWeightFn?: NodeWeightFn; weightFn?: never }; +/** @deprecated Traversal facade that delegates to GraphTraversal. Use GraphTraversal directly. */ export interface LogicalTraversal { bfs(start: string, options?: TraverseFacadeOptions): Promise; dfs(start: string, options?: TraverseFacadeOptions): Promise; @@ -434,6 +438,7 @@ export interface NodeInfo { parents: string[]; } +/** Abstract port for Git persistence operations (commits, refs, blobs, trees). */ export abstract class GraphPersistencePort { /** The empty tree SHA */ abstract get emptyTree(): string; @@ -482,6 +487,7 @@ export const LogLevel: { readonly SILENT: 4; }; +/** Numeric log level value (0=debug, 1=info, 2=warn, 3=error, 4=silent). */ export type LogLevelValue = 0 | 1 | 2 | 3 | 4; /** @@ -554,9 +560,24 @@ export abstract class SeekCachePort { */ export abstract class BlobStoragePort { /** Stores content and returns a storage identifier (e.g. CAS tree OID). */ - abstract store(content: Uint8Array | string, options?: { slug?: string }): Promise; + abstract store(content: Uint8Array | string, options?: { slug?: string; mime?: string | null; size?: number | null }): Promise; /** Retrieves content by its storage identifier. */ abstract retrieve(oid: string): Promise; + /** Stores content from a streaming source and returns a storage identifier. */ + abstract storeStream(source: AsyncIterable, options?: { slug?: string; mime?: string | null; size?: number | null }): Promise; + /** Retrieves content as an async iterable of chunks. */ + abstract retrieveStream(oid: string): AsyncIterable; +} + +/** + * In-memory blob storage adapter for browser and test paths. + * Content-addressed Map-based storage implementing BlobStoragePort. + */ +export class InMemoryBlobStorageAdapter extends BlobStoragePort { + store(content: Uint8Array | string, options?: { slug?: string; mime?: string | null; size?: number | null }): Promise; + retrieve(oid: string): Promise; + storeStream(source: AsyncIterable, options?: { slug?: string; mime?: string | null; size?: number | null }): Promise; + retrieveStream(oid: string): AsyncIterable; } /** @@ -1082,10 +1103,6 @@ export class CommitDagTraversalService { }): Promise<{ path: string[]; totalCost: number; nodesExplored: number }>; } -/** - * @deprecated Use CommitDagTraversalService instead. - */ -export { CommitDagTraversalService as TraversalService }; /** * Binary search over WARP graph history. @@ -1357,29 +1374,35 @@ export interface Lens { */ export type ObserverConfig = Lens; +/** Observer source pinned to the live materialized frontier. */ export interface LiveObserverSource { kind: 'live'; ceiling?: number | null; } +/** Observer source pinned to an explicit coordinate (writer-tip frontier + ceiling). */ export interface CoordinateObserverSource { kind: 'coordinate'; frontier: Map | Record; ceiling?: number | null; } +/** Observer source pinned to a single strand's visible patch universe. */ export interface StrandObserverSource { kind: 'strand'; strandId: string; ceiling?: number | null; } +/** Union of observer source types for worldline creation. */ export type WorldlineSource = LiveObserverSource | CoordinateObserverSource | StrandObserverSource; +/** Options for creating a worldline handle. */ export interface WorldlineOptions { source?: WorldlineSource; } +/** Options for creating an observer. */ export interface ObserverOptions { source?: WorldlineSource; } @@ -1592,11 +1615,13 @@ export interface TemporalQuery { ): Promise; } +/** Options for content attachment metadata (mime type, size hint). */ export interface ContentAttachmentOptions { mime?: string | null; size?: number | null; } +/** Structured content metadata returned by getContentMeta/getEdgeContentMeta. */ export interface ContentMeta { oid: string; mime: string | null; @@ -1648,11 +1673,11 @@ export class PatchBuilderV2 { /** Sets a property on an edge. */ setEdgeProperty(from: string, to: string, label: string, key: string, value: unknown): PatchBuilderV2; /** Attaches content to a node (writes blob + sets _content property). */ - attachContent(nodeId: string, content: Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; + attachContent(nodeId: string, content: AsyncIterable | ReadableStream | Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; /** Clears content from a node (sets _content metadata registers to null). */ clearContent(nodeId: string): PatchBuilderV2; /** Attaches content to an edge (writes blob + sets _content edge property). */ - attachEdgeContent(from: string, to: string, label: string, content: Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; + attachEdgeContent(from: string, to: string, label: string, content: AsyncIterable | ReadableStream | Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; /** Clears content from an edge (sets _content metadata registers to null). */ clearEdgeContent(from: string, to: string, label: string): PatchBuilderV2; /** Builds the PatchV2 object without committing. */ @@ -1686,11 +1711,11 @@ export class PatchSession { /** Sets a property on an edge. */ setEdgeProperty(from: string, to: string, label: string, key: string, value: unknown): this; /** Attaches content to a node (writes blob + sets _content property). */ - attachContent(nodeId: string, content: Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; + attachContent(nodeId: string, content: AsyncIterable | ReadableStream | Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; /** Clears content from a node (sets _content metadata registers to null). */ clearContent(nodeId: string): this; /** Attaches content to an edge (writes blob + sets _content edge property). */ - attachEdgeContent(from: string, to: string, label: string, content: Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; + attachEdgeContent(from: string, to: string, label: string, content: AsyncIterable | ReadableStream | Uint8Array | string, metadata?: ContentAttachmentOptions): Promise; /** Clears content from an edge (sets _content metadata registers to null). */ clearEdgeContent(from: string, to: string, label: string): this; /** Builds the PatchV2 object without committing. */ @@ -2044,6 +2069,18 @@ declare class WarpCoreBase { */ getEdgeContent(from: string, to: string, label: string): Promise; + /** + * Gets the content blob for a node as a stream, or null if none is attached. + * Returns an async iterable of Uint8Array chunks for incremental consumption. + */ + getContentStream(nodeId: string): Promise | null>; + + /** + * Gets the content blob for an edge as a stream, or null if none is attached. + * Returns an async iterable of Uint8Array chunks for incremental consumption. + */ + getEdgeContentStream(from: string, to: string, label: string): Promise | null>; + /** * Checks if a node exists in the materialized state. */ @@ -2414,13 +2451,6 @@ declare class WarpCoreBase { /** * Creates a new Writer with a fresh canonical ID. - * @deprecated Use writer() or writer(id) instead. - */ - createWriter(opts?: { - persist?: 'config' | 'none'; - alias?: string; - }): Promise; - /** Checks GC thresholds and runs GC if needed. */ maybeRunGC(): MaybeGCResult; @@ -2475,12 +2505,6 @@ export declare class WarpApp { /** Gets or creates a Writer, optionally resolving from git config. */ writer(writerId?: Parameters[0]): ReturnType; - /** - * Creates a new Writer with a fresh canonical ID. - * @deprecated Use writer() or writer(id) instead. - */ - createWriter(opts?: Parameters[0]): ReturnType; - /** Creates a new PatchBuilderV2 for adding operations. */ createPatch(): ReturnType; @@ -2524,6 +2548,25 @@ export declare class WarpApp { options: Parameters[1], ): ReturnType; + // ── Content attachment reads ────────────────────────────────────── + + /** Gets the content blob for a node, or null if none is attached. */ + getContent(nodeId: string): Promise; + /** Gets the content blob for a node as a stream, or null if none is attached. */ + getContentStream(nodeId: string): Promise | null>; + /** Gets the content blob OID for a node, or null if none is attached. */ + getContentOid(nodeId: string): Promise; + /** Gets structured content metadata for a node attachment, or null if none is attached. */ + getContentMeta(nodeId: string): Promise; + /** Gets the content blob for an edge, or null if none is attached. */ + getEdgeContent(from: string, to: string, label: string): Promise; + /** Gets the content blob for an edge as a stream, or null if none is attached. */ + getEdgeContentStream(from: string, to: string, label: string): Promise | null>; + /** Gets the content blob OID for an edge, or null if none is attached. */ + getEdgeContentOid(from: string, to: string, label: string): Promise; + /** Gets structured content metadata for an edge attachment, or null if none is attached. */ + getEdgeContentMeta(from: string, to: string, label: string): Promise; + /** Creates a durable strand descriptor. */ createStrand(options?: Parameters[0]): ReturnType; @@ -2755,10 +2798,14 @@ export const TICK_RECEIPT_RESULT_TYPES: readonly TickReceiptResult[]; // Conflict Analyzer // ============================================================================ +/** Kind of conflict detected between concurrent patches. */ export type ConflictKind = 'supersession' | 'eventual_override' | 'redundancy'; +/** Level of evidence detail in conflict analysis results. */ export type ConflictEvidenceLevel = 'summary' | 'standard' | 'full'; +/** Causal relationship between conflicting patches. */ export type ConflictCausalRelation = 'concurrent' | 'ordered' | 'replay_equivalent' | 'reducer_collapsed'; +/** Selector identifying the entity targeted by conflict analysis. */ export interface ConflictTargetSelector { targetKind: 'node' | 'edge' | 'node_property' | 'edge_property'; entityId?: string; @@ -2768,6 +2815,7 @@ export interface ConflictTargetSelector { label?: string; } +/** Anchor point (commit SHA + Lamport ceiling) for conflict analysis. */ export interface ConflictAnchor { patchSha: string; writerId: string; @@ -2778,6 +2826,7 @@ export interface ConflictAnchor { receiptOpIndex?: number; } +/** Resolved target entity with node/edge identity for conflict analysis. */ export interface ConflictTarget { targetKind: 'node' | 'edge' | 'node_property' | 'edge_property'; targetDigest: string; @@ -2789,6 +2838,7 @@ export interface ConflictTarget { edgeKey?: string; } +/** A writer that participated in a conflict with its contributing patch. */ export interface ConflictParticipant { anchor: ConflictAnchor; effectDigest: string; @@ -2798,6 +2848,7 @@ export interface ConflictParticipant { notes?: string[]; } +/** How a conflict was resolved by the CRDT reducer. */ export interface ConflictResolution { reducerId: string; basis: { code: string; reason?: string }; @@ -2809,6 +2860,7 @@ export interface ConflictResolution { }; } +/** Single conflict trace: two participants, their causal relation, and resolution. */ export interface ConflictTrace { conflictId: string; kind: ConflictKind; @@ -2828,6 +2880,7 @@ export interface ConflictTrace { }; } +/** Diagnostic summary for a single entity's conflict history. */ export interface ConflictDiagnostic { code: string; severity: 'warning' | 'error'; @@ -2835,6 +2888,7 @@ export interface ConflictDiagnostic { data?: Record; } +/** Full conflict analysis result for a materialized coordinate. */ export interface ConflictAnalysis { analysisVersion: string; resolvedCoordinate: { @@ -2862,6 +2916,7 @@ export interface ConflictAnalysis { conflicts: ConflictTrace[]; } +/** Options for creating a new strand descriptor. */ export interface StrandCreateOptions { strandId?: string; lamportCeiling?: number | null; @@ -2870,11 +2925,13 @@ export interface StrandCreateOptions { leaseExpiresAt?: string | null; } +/** Options for braiding read-only overlays onto a strand. */ export interface StrandBraidOptions { braidedStrandIds?: string[]; writable?: boolean | null; } +/** Descriptor for a braided read-only overlay on a strand. */ export interface StrandReadOverlayDescriptor { strandId: string; overlayId: string; @@ -2883,6 +2940,7 @@ export interface StrandReadOverlayDescriptor { patchCount: number; } +/** Descriptor for a queued intent on a strand. */ export interface StrandIntentDescriptor { intentId: string; enqueuedAt: string; @@ -2892,6 +2950,7 @@ export interface StrandIntentDescriptor { contentBlobOids: string[]; } +/** Counterfactual produced by ticking a strand (rejected patches). */ export interface StrandTickCounterfactual { intentId: string; reason: string; @@ -2900,6 +2959,7 @@ export interface StrandTickCounterfactual { writes: string[]; } +/** Record of a strand tick: accepted patches and counterfactuals. */ export interface StrandTickRecord { tickId: string; strandId: string; @@ -2913,6 +2973,7 @@ export interface StrandTickRecord { overlayPatchShas: string[]; } +/** Durable descriptor for a speculative strand with base observation and overlay. */ export interface StrandDescriptor { schemaVersion: number; strandId: string; @@ -3077,18 +3138,21 @@ export interface WarpStateV5 { edgeBirthEvent: Map; } +/** Compact projection of materialized state: node IDs, edge tuples, and properties. */ export interface VisibleStateProjectionV5 { nodes: string[]; edges: Array<{ from: string; to: string; label: string }>; props: Array<{ node: string; key: string; value: unknown }>; } +/** Neighbor entry from visible state: target node, edge label, and direction. */ export interface VisibleStateNeighborV5 { nodeId: string; label: string; direction: 'outgoing' | 'incoming'; } +/** Edge-local view from visible state: endpoints, label, and properties. */ export interface VisibleEdgeViewV5 { from: string; to: string; @@ -3096,6 +3160,7 @@ export interface VisibleEdgeViewV5 { props: Record; } +/** Node-local view from visible state: properties, neighbors, and content metadata. */ export interface VisibleNodeViewV5 { nodeId: string; props: Record; @@ -3104,6 +3169,7 @@ export interface VisibleNodeViewV5 { content: ContentMeta | null; } +/** Read-only accessor over materialized V5 state with entity-local inspection. */ export interface VisibleStateReaderV5 { project(): VisibleStateProjectionV5; hasNode(nodeId: string): boolean; @@ -3121,6 +3187,7 @@ export interface VisibleStateReaderV5 { inspectNode(nodeId: string): VisibleNodeViewV5 | null; } +/** Compact summary of visible state: entity and property counts. */ export interface VisibleStateSummaryV5 { nodeCount: number; edgeCount: number; @@ -3128,12 +3195,14 @@ export interface VisibleStateSummaryV5 { edgePropertyCount: number; } +/** Single node property value in a visible state comparison. */ export interface VisibleStateNodePropertyValueV5 { node: string; key: string; value: unknown; } +/** Node property change between two visible states. */ export interface VisibleStateNodePropertyChangeV5 { node: string; key: string; @@ -3141,6 +3210,7 @@ export interface VisibleStateNodePropertyChangeV5 { rightValue: unknown; } +/** Single edge property value in a visible state comparison. */ export interface VisibleStateEdgePropertyValueV5 { from: string; to: string; @@ -3149,6 +3219,7 @@ export interface VisibleStateEdgePropertyValueV5 { value: unknown; } +/** Edge property change between two visible states. */ export interface VisibleStateEdgePropertyChangeV5 { from: string; to: string; @@ -3158,6 +3229,7 @@ export interface VisibleStateEdgePropertyChangeV5 { rightValue: unknown; } +/** Per-node detail in a visible state comparison: property and neighbor deltas. */ export interface VisibleStateComparisonTargetV5 { targetId: string | null; leftExists: boolean; @@ -3181,6 +3253,7 @@ export interface VisibleStateComparisonTargetV5 { contentChanged: boolean; } +/** Full visible state comparison between two materialized states. */ export interface VisibleStateComparisonV5 { comparisonVersion: string; changed: boolean; @@ -3213,23 +3286,28 @@ export interface VisibleStateComparisonV5 { target?: VisibleStateComparisonTargetV5; } +/** Prefix-based filter for scoping visible state to node ID families. */ export interface VisibleStateScopePrefixFilterV1 { include?: string[]; exclude?: string[]; } +/** Scope configuration for filtering visible state comparison or transfer. */ export interface VisibleStateScopeV1 { nodeIdPrefixes?: VisibleStateScopePrefixFilterV1; } +/** Selector identifying source or target coordinate for comparison. */ export type CoordinateComparisonSelectorV1 = | { kind: 'live'; ceiling?: number | null } | { kind: 'strand'; strandId: string; ceiling?: number | null } | { kind: 'strand_base'; strandId: string; ceiling?: number | null } | { kind: 'coordinate'; frontier: Map | Record; ceiling?: number | null }; +/** Selector for coordinate transfer planning (same shape as comparison selector). */ export type CoordinateTransferPlanSelectorV1 = CoordinateComparisonSelectorV1; +/** Resolved side of a coordinate comparison with frontier, patches, and state. */ export interface CoordinateComparisonResolvedSideV1 { coordinateKind: 'frontier' | 'strand' | 'strand_base'; patchFrontier: Record; @@ -3253,6 +3331,7 @@ export interface CoordinateComparisonResolvedSideV1 { }; } +/** Patch-level divergence between two coordinates: shared vs side-only counts. */ export interface CoordinateComparisonPatchDivergenceV1 { sharedCount: number; leftOnlyCount: number; @@ -3271,11 +3350,13 @@ export interface CoordinateComparisonPatchDivergenceV1 { }; } +/** Unresolved-to-resolved side pair for a coordinate comparison. */ export interface CoordinateComparisonSideV1 { requested: Record; resolved: CoordinateComparisonResolvedSideV1; } +/** Full coordinate comparison result with side digests and visible state diff. */ export interface CoordinateComparisonV1 { comparisonVersion: string; comparisonDigest: string; @@ -3286,6 +3367,7 @@ export interface CoordinateComparisonV1 { visibleState: VisibleStateComparisonV5; } +/** Canonical fact payload for a coordinate comparison. */ export interface CoordinateComparisonFactV1 { comparisonVersion: string; scope?: VisibleStateScopeV1; @@ -3295,6 +3377,7 @@ export interface CoordinateComparisonFactV1 { visibleState: VisibleStateComparisonV5; } +/** Exported coordinate comparison fact with canonical JSON and digest. */ export interface CoordinateComparisonFactExportV1 { exportVersion: string; factKind: 'coordinate-comparison'; @@ -3303,6 +3386,7 @@ export interface CoordinateComparisonFactExportV1 { fact: CoordinateComparisonFactV1; } +/** Summary of candidate transfer operations between two visible states. */ export interface VisibleStateTransferPlanSummaryV1 { opCount: number; addNodeCount: number; @@ -3319,6 +3403,7 @@ export interface VisibleStateTransferPlanSummaryV1 { clearEdgeContentCount: number; } +/** Single candidate transfer operation (add/remove/set/attach/clear). */ export type VisibleStateTransferOperationV1 = | { op: 'add_node'; nodeId: string } | { op: 'remove_node'; nodeId: string } @@ -3331,6 +3416,7 @@ export type VisibleStateTransferOperationV1 = | { op: 'attach_edge_content'; from: string; to: string; label: string; content: Uint8Array; contentOid: string; mime?: string | null; size?: number | null } | { op: 'clear_edge_content'; from: string; to: string; label: string }; +/** Canonical fact form of a transfer operation (without inline content bytes). */ export type VisibleStateTransferOperationFactV1 = | { op: 'add_node'; nodeId: string } | { op: 'remove_node'; nodeId: string } @@ -3343,8 +3429,10 @@ export type VisibleStateTransferOperationFactV1 = | { op: 'attach_edge_content'; from: string; to: string; label: string; contentOid: string; mime?: string | null; size?: number | null } | { op: 'clear_edge_content'; from: string; to: string; label: string }; +/** Side label for a transfer plan (same shape as comparison side). */ export type CoordinateTransferPlanSideV1 = CoordinateComparisonSideV1; +/** Full coordinate transfer plan with candidate operations and digests. */ export interface CoordinateTransferPlanV1 { transferVersion: string; transferDigest: string; @@ -3357,6 +3445,7 @@ export interface CoordinateTransferPlanV1 { ops: VisibleStateTransferOperationV1[]; } +/** Canonical fact payload for a coordinate transfer plan. */ export interface CoordinateTransferPlanFactV1 { transferVersion: string; comparisonDigest: string; @@ -3368,6 +3457,7 @@ export interface CoordinateTransferPlanFactV1 { ops: VisibleStateTransferOperationFactV1[]; } +/** Exported coordinate transfer plan fact with canonical JSON and digest. */ export interface CoordinateTransferPlanFactExportV1 { exportVersion: string; factKind: 'coordinate-transfer-plan'; @@ -3683,4 +3773,5 @@ export function deserializeWormhole(json: { payload: PatchEntry[]; }): WormholeEdge; +/** Default package export — the curated product-facing WARP surface. */ export default WarpApp; diff --git a/index.js b/index.js index ba277693..b59b7d63 100644 --- a/index.js +++ b/index.js @@ -67,6 +67,7 @@ import { } from './src/domain/errors/index.js'; import WriterError from './src/domain/errors/WriterError.js'; import BlobStoragePort from './src/ports/BlobStoragePort.js'; +import InMemoryBlobStorageAdapter from './src/domain/utils/defaultBlobStorage.js'; import CryptoPort from './src/ports/CryptoPort.js'; import HttpServerPort from './src/ports/HttpServerPort.js'; import NodeCryptoAdapter from './src/infrastructure/adapters/NodeCryptoAdapter.js'; @@ -145,8 +146,6 @@ import { exportCoordinateTransferPlanFact, } from './src/domain/services/CoordinateFactExport.js'; -const TraversalService = CommitDagTraversalService; - export { GitGraphAdapter, InMemoryGraphAdapter, @@ -157,7 +156,6 @@ export { HealthCheckService, HealthStatus, CommitDagTraversalService, - TraversalService, BisectService, GraphPersistencePort, IndexStoragePort, @@ -177,6 +175,7 @@ export { // Port contracts BlobStoragePort, + InMemoryBlobStorageAdapter, CryptoPort, HttpServerPort, diff --git a/jsr.json b/jsr.json index 159437ed..c123e4b1 100644 --- a/jsr.json +++ b/jsr.json @@ -1,6 +1,6 @@ { "name": "@git-stunts/git-warp", - "version": "15.0.1", + "version": "16.0.0", "imports": { "roaring": "npm:roaring@^2.7.0" }, diff --git a/package.json b/package.json index 44a425e5..496d3c93 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@git-stunts/git-warp", - "version": "15.0.1", + "version": "16.0.0", "description": "Deterministic WARP graph over Git: graph-native storage, traversal, and tooling.", "type": "module", "license": "Apache-2.0", diff --git a/src/domain/WarpApp.js b/src/domain/WarpApp.js index 0c2da585..4c485c7b 100644 --- a/src/domain/WarpApp.js +++ b/src/domain/WarpApp.js @@ -1,4 +1,14 @@ import WarpCore from './WarpCore.js'; +import { + getContent as _getContent, + getContentStream as _getContentStream, + getContentOid as _getContentOid, + getContentMeta as _getContentMeta, + getEdgeContent as _getEdgeContent, + getEdgeContentStream as _getEdgeContentStream, + getEdgeContentOid as _getEdgeContentOid, + getEdgeContentMeta as _getEdgeContentMeta, +} from './warp/query.methods.js'; /** * Curated product-facing WARP surface. @@ -67,14 +77,6 @@ export default class WarpApp { return await this._runtime().writer(writerId); } - /** - * @param {{ persist?: 'config' | 'none', alias?: string }} [opts] - * @returns {Promise} - */ - async createWriter(opts) { - return await this._runtime().createWriter(opts); - } - /** * @returns {Promise} */ @@ -167,6 +169,51 @@ export default class WarpApp { return this._runtime().watch(pattern, options); } + // ── Content attachment reads ────────────────────────────────────────── + // Imported from query.methods.js and called with the runtime as this binding. + + /** @param {string} nodeId @returns {Promise} */ + async getContent(nodeId) { + return await _getContent.call(this._runtime(), nodeId); + } + + /** @param {string} nodeId @returns {Promise|null>} */ + async getContentStream(nodeId) { + return await _getContentStream.call(this._runtime(), nodeId); + } + + /** @param {string} nodeId @returns {Promise} */ + async getContentOid(nodeId) { + return await _getContentOid.call(this._runtime(), nodeId); + } + + /** @param {string} nodeId @returns {Promise<{ oid: string, mime: string|null, size: number|null }|null>} */ + async getContentMeta(nodeId) { + return await _getContentMeta.call(this._runtime(), nodeId); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise} */ + async getEdgeContent(from, to, label) { + return await _getEdgeContent.call(this._runtime(), from, to, label); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise|null>} */ + async getEdgeContentStream(from, to, label) { + return await _getEdgeContentStream.call(this._runtime(), from, to, label); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise} */ + async getEdgeContentOid(from, to, label) { + return await _getEdgeContentOid.call(this._runtime(), from, to, label); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise<{ oid: string, mime: string|null, size: number|null }|null>} */ + async getEdgeContentMeta(from, to, label) { + return await _getEdgeContentMeta.call(this._runtime(), from, to, label); + } + + // ── Strands ───────────────────────────────────────────────────────── + /** * @param {Parameters[0]} [options] * @returns {ReturnType} diff --git a/src/domain/WarpCore.js b/src/domain/WarpCore.js index 384bb203..d3020578 100644 --- a/src/domain/WarpCore.js +++ b/src/domain/WarpCore.js @@ -1,4 +1,14 @@ import WarpRuntime from './WarpRuntime.js'; +import { + getContent as _getContent, + getContentStream as _getContentStream, + getContentOid as _getContentOid, + getContentMeta as _getContentMeta, + getEdgeContent as _getEdgeContent, + getEdgeContentStream as _getEdgeContentStream, + getEdgeContentOid as _getEdgeContentOid, + getEdgeContentMeta as _getEdgeContentMeta, +} from './warp/query.methods.js'; import { toInternalStrandShape, toPublicStrandShape } from './utils/strandPublicShape.js'; import { buildCoordinateComparisonFact, @@ -85,6 +95,57 @@ export default class WarpCore { return /** @type {WarpCore} */ (/** @type {unknown} */ (runtime)); } + // ── Content attachment reads ────────────────────────────────────────── + // Imported from query.methods.js and called with WarpRuntime-typed this. + // WarpCore is a WarpRuntime at runtime (via Object.setPrototypeOf in _adopt). + + /** @private @returns {WarpRuntime} */ + _asRuntime() { + return /** @type {WarpRuntime} */ (/** @type {unknown} */ (this)); + } + + /** @param {string} nodeId @returns {Promise} */ + async getContent(nodeId) { + return await _getContent.call(this._asRuntime(), nodeId); + } + + /** @param {string} nodeId @returns {Promise|null>} */ + async getContentStream(nodeId) { + return await _getContentStream.call(this._asRuntime(), nodeId); + } + + /** @param {string} nodeId @returns {Promise} */ + async getContentOid(nodeId) { + return await _getContentOid.call(this._asRuntime(), nodeId); + } + + /** @param {string} nodeId @returns {Promise<{ oid: string, mime: string|null, size: number|null }|null>} */ + async getContentMeta(nodeId) { + return await _getContentMeta.call(this._asRuntime(), nodeId); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise} */ + async getEdgeContent(from, to, label) { + return await _getEdgeContent.call(this._asRuntime(), from, to, label); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise|null>} */ + async getEdgeContentStream(from, to, label) { + return await _getEdgeContentStream.call(this._asRuntime(), from, to, label); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise} */ + async getEdgeContentOid(from, to, label) { + return await _getEdgeContentOid.call(this._asRuntime(), from, to, label); + } + + /** @param {string} from @param {string} to @param {string} label @returns {Promise<{ oid: string, mime: string|null, size: number|null }|null>} */ + async getEdgeContentMeta(from, to, label) { + return await _getEdgeContentMeta.call(this._asRuntime(), from, to, label); + } + + // ── Strands ───────────────────────────────────────────────────────── + /** * Creates a durable strand descriptor pinned to the current frontier. * diff --git a/src/domain/WarpRuntime.js b/src/domain/WarpRuntime.js index 7922013e..5a5fff85 100644 --- a/src/domain/WarpRuntime.js +++ b/src/domain/WarpRuntime.js @@ -22,6 +22,7 @@ import SyncController from './services/SyncController.js'; import SyncTrustGate from './services/SyncTrustGate.js'; import { AuditVerifierService } from './services/AuditVerifierService.js'; import MaterializedViewService from './services/MaterializedViewService.js'; +import InMemoryBlobStorageAdapter from './utils/defaultBlobStorage.js'; import { wireWarpMethods } from './warp/_wire.js'; import * as queryMethods from './warp/query.methods.js'; import * as subscribeMethods from './warp/subscribe.methods.js'; @@ -39,6 +40,27 @@ import * as comparisonMethods from './warp/comparison.methods.js'; const DEFAULT_ADJACENCY_CACHE_SIZE = 3; +/** + * Auto-constructs a BlobStoragePort when none is explicitly provided. + * + * When persistence has `plumbing` (Git-backed), constructs a CasBlobAdapter + * for CDC chunking and Git-native GC reachability. Otherwise uses + * InMemoryBlobStorageAdapter for browser/test paths. + * + * @param {unknown} persistence + * @returns {Promise} + */ +async function autoConstructBlobStorage(persistence) { + const p = /** @type {{ plumbing?: unknown }} */ (persistence); + if (p.plumbing) { + const { default: CasBlobAdapter } = await import( + /* webpackIgnore: true */ '../infrastructure/adapters/CasBlobAdapter.js' + ); + return new CasBlobAdapter({ plumbing: p.plumbing, persistence }); + } + return new InMemoryBlobStorageAdapter(); +} + /** * @param {{ mode?: 'off'|'log-only'|'enforce', pin?: string|null }|undefined|null} trust * @returns {{ mode: 'off'|'log-only'|'enforce', pin: string|null }} @@ -390,7 +412,10 @@ export default class WarpRuntime { } } - const graph = new WarpRuntime({ persistence, graphName, writerId, gcPolicy, adjacencyCacheSize, checkpointPolicy, autoMaterialize, onDeleteWithData, logger, clock, crypto, codec, seekCache, audit, blobStorage, patchBlobStorage, trust }); + // Auto-construct blob storage when none provided (OG-014: CAS is mandatory) + const resolvedBlobStorage = blobStorage || await autoConstructBlobStorage(persistence); + + const graph = new WarpRuntime({ persistence, graphName, writerId, gcPolicy, adjacencyCacheSize, checkpointPolicy, autoMaterialize, onDeleteWithData, logger, clock, crypto, codec, seekCache, audit, blobStorage: resolvedBlobStorage, patchBlobStorage, trust }); // Validate migration boundary await graph._validateMigrationBoundary(); diff --git a/src/domain/errors/WriterError.js b/src/domain/errors/WriterError.js index 4b2a1635..90fddba0 100644 --- a/src/domain/errors/WriterError.js +++ b/src/domain/errors/WriterError.js @@ -15,6 +15,7 @@ import WarpError from './WarpError.js'; * | `WRITER_REF_ADVANCED` | Writer ref moved since beginPatch() | * | `WRITER_CAS_CONFLICT` | Compare-and-swap failure during commit | * | `PERSIST_WRITE_FAILED` | Git persistence operation failed | + * | `NO_BLOB_STORAGE` | Content attachment attempted without blob storage | * | `WRITER_ERROR` | Generic/default writer error | * * @class WriterError diff --git a/src/domain/services/CheckpointService.js b/src/domain/services/CheckpointService.js index 259f58da..e2090e41 100644 --- a/src/domain/services/CheckpointService.js +++ b/src/domain/services/CheckpointService.js @@ -194,7 +194,7 @@ function collectContentAnchorEntries(propMap) { for (let i = 0; i < sortedOids.length; i++) { const oid = sortedOids[i]; - sortedOids[i] = `100644 blob ${oid}\t_content_${oid}`; + sortedOids[i] = `040000 tree ${oid}\t_content_${oid}`; } return sortedOids; diff --git a/src/domain/services/PatchBuilderV2.js b/src/domain/services/PatchBuilderV2.js index ae57211a..5be43b0c 100644 --- a/src/domain/services/PatchBuilderV2.js +++ b/src/domain/services/PatchBuilderV2.js @@ -36,6 +36,7 @@ import { lowerCanonicalOp } from './OpNormalizer.js'; import { encodePatchMessage, decodePatchMessage, detectMessageKind } from './WarpMessageCodec.js'; import { buildWriterRef } from '../utils/RefLayout.js'; import WriterError from '../errors/WriterError.js'; +import { isStreamingInput, normalizeToAsyncIterable } from '../utils/streamUtils.js'; /** * Inspects materialized state for edges and properties attached to a node. @@ -596,24 +597,45 @@ export class PatchBuilderV2 { * this patch) for `getContent()` to find it later. `attachContent()` * only sets the `_content` property — it does not create the node. * + * Accepts streaming input (AsyncIterable, ReadableStream) as well as + * buffered input (Uint8Array, string). Streaming inputs are piped + * directly to blob storage without intermediate buffering. + * * @param {string} nodeId - The node ID to attach content to - * @param {Uint8Array|string} content - The content to attach + * @param {AsyncIterable|ReadableStream|Uint8Array|string} content - The content to attach * @param {{ mime?: string|null, size?: number|null }} [metadata] - Optional metadata hint * @returns {Promise} This builder instance for method chaining */ async attachContent(nodeId, content, metadata = undefined) { this._assertNotCommitted(); - // Validate identifiers before writing blob to avoid orphaned blobs _assertNoReservedBytes(nodeId, 'nodeId'); _assertNoReservedBytes(CONTENT_PROPERTY_KEY, 'key'); this._assertNodeExistsForContent(nodeId); - const normalizedMeta = normalizeContentMetadata(content, metadata); - const oid = this._blobStorage - ? await this._blobStorage.store(content, { slug: `${this._graphName}/${nodeId}`, mime: normalizedMeta.mime, size: normalizedMeta.size }) - : await this._persistence.writeBlob(content); + if (!this._blobStorage) { + throw new WriterError( + 'NO_BLOB_STORAGE', + 'Cannot attach content without blob storage — inject blobStorage via open() or use InMemoryBlobStorageAdapter', + ); + } + const slug = `${this._graphName}/${nodeId}`; + let oid; + let mime; + let size; + if (isStreamingInput(content)) { + mime = metadata?.mime ?? null; + size = metadata?.size ?? null; + const source = normalizeToAsyncIterable(content); + oid = await this._blobStorage.storeStream(source, { slug, mime, size }); + } else { + const buffered = /** @type {Uint8Array|string} */ (content); + const normalizedMeta = normalizeContentMetadata(buffered, metadata); + mime = normalizedMeta.mime; + size = normalizedMeta.size; + oid = await this._blobStorage.store(buffered, { slug, mime, size }); + } this.setProperty(nodeId, CONTENT_PROPERTY_KEY, oid); - this.setProperty(nodeId, CONTENT_SIZE_PROPERTY_KEY, normalizedMeta.size); - this.setProperty(nodeId, CONTENT_MIME_PROPERTY_KEY, normalizedMeta.mime); + this.setProperty(nodeId, CONTENT_SIZE_PROPERTY_KEY, size); + this.setProperty(nodeId, CONTENT_MIME_PROPERTY_KEY, mime); this._contentBlobs.push(oid); return this; } @@ -641,31 +663,51 @@ export class PatchBuilderV2 { } /** - * Attaches content to an edge by writing the blob to the Git object store + * Attaches content to an edge by writing the blob via blob storage * and storing the blob OID as the `_content` edge property. * + * Accepts streaming input (AsyncIterable, ReadableStream) as well as + * buffered input (Uint8Array, string). + * * @param {string} from - Source node ID * @param {string} to - Target node ID * @param {string} label - Edge label - * @param {Uint8Array|string} content - The content to attach + * @param {AsyncIterable|ReadableStream|Uint8Array|string} content - The content to attach * @param {{ mime?: string|null, size?: number|null }} [metadata] - Optional metadata hint * @returns {Promise} This builder instance for method chaining */ async attachEdgeContent(from, to, label, content, metadata = undefined) { this._assertNotCommitted(); - // Validate identifiers before writing blob to avoid orphaned blobs _assertNoReservedBytes(from, 'from'); _assertNoReservedBytes(to, 'to'); _assertNoReservedBytes(label, 'label'); _assertNoReservedBytes(CONTENT_PROPERTY_KEY, 'key'); this._assertEdgeExists(from, to, label); - const normalizedMeta = normalizeContentMetadata(content, metadata); - const oid = this._blobStorage - ? await this._blobStorage.store(content, { slug: `${this._graphName}/${from}/${to}/${label}`, mime: normalizedMeta.mime, size: normalizedMeta.size }) - : await this._persistence.writeBlob(content); + if (!this._blobStorage) { + throw new WriterError( + 'NO_BLOB_STORAGE', + 'Cannot attach content without blob storage — inject blobStorage via open() or use InMemoryBlobStorageAdapter', + ); + } + const slug = `${this._graphName}/${from}/${to}/${label}`; + let oid; + let mime; + let size; + if (isStreamingInput(content)) { + mime = metadata?.mime ?? null; + size = metadata?.size ?? null; + const source = normalizeToAsyncIterable(content); + oid = await this._blobStorage.storeStream(source, { slug, mime, size }); + } else { + const buffered = /** @type {Uint8Array|string} */ (content); + const normalizedMeta = normalizeContentMetadata(buffered, metadata); + mime = normalizedMeta.mime; + size = normalizedMeta.size; + oid = await this._blobStorage.store(buffered, { slug, mime, size }); + } this.setEdgeProperty(from, to, label, CONTENT_PROPERTY_KEY, oid); - this.setEdgeProperty(from, to, label, CONTENT_SIZE_PROPERTY_KEY, normalizedMeta.size); - this.setEdgeProperty(from, to, label, CONTENT_MIME_PROPERTY_KEY, normalizedMeta.mime); + this.setEdgeProperty(from, to, label, CONTENT_SIZE_PROPERTY_KEY, size); + this.setEdgeProperty(from, to, label, CONTENT_MIME_PROPERTY_KEY, mime); this._contentBlobs.push(oid); return this; } @@ -890,10 +932,11 @@ export class PatchBuilderV2 { // 7. Create tree with the patch blob + any content blobs (deduplicated) // Format for mktree: "mode type oid\tpath" + // Content is always stored via BlobStoragePort (CAS), producing tree OIDs. const treeEntries = [`100644 blob ${patchBlobOid}\tpatch.cbor`]; const uniqueBlobs = [...new Set(this._contentBlobs)]; for (const blobOid of uniqueBlobs) { - treeEntries.push(`100644 blob ${blobOid}\t_content_${blobOid}`); + treeEntries.push(`040000 tree ${blobOid}\t_content_${blobOid}`); } const treeOid = await this._persistence.writeTree(treeEntries); diff --git a/src/domain/services/TraversalService.js b/src/domain/services/TraversalService.js deleted file mode 100644 index 7b867c70..00000000 --- a/src/domain/services/TraversalService.js +++ /dev/null @@ -1,8 +0,0 @@ -/** - * @deprecated Use CommitDagTraversalService instead. - * This alias will be removed after one minor version. - */ - -import CommitDagTraversalService from './CommitDagTraversalService.js'; - -export default CommitDagTraversalService; diff --git a/src/domain/utils/defaultBlobStorage.js b/src/domain/utils/defaultBlobStorage.js new file mode 100644 index 00000000..10b6a82b --- /dev/null +++ b/src/domain/utils/defaultBlobStorage.js @@ -0,0 +1,118 @@ +/** + * In-memory blob storage adapter for browser and test paths. + * + * Implements BlobStoragePort with Map-based storage. Content-addressed: + * identical content always produces the same OID. No CDC chunking — + * its purpose is port conformance, not chunking behavior. + * + * @module domain/utils/defaultBlobStorage + */ + +import BlobStoragePort from '../../ports/BlobStoragePort.js'; +import { hexEncode } from './bytes.js'; +import { collectAsyncIterable } from './streamUtils.js'; + +const _encoder = new TextEncoder(); + +/** + * Simple content-addressed hash using Web Crypto SHA-256. + * Falls back to a synchronous FNV-1a for environments without + * crypto.subtle (plain HTTP in Docker, etc.). + * + * @param {Uint8Array} bytes + * @returns {Promise} hex digest + */ +async function contentHash(bytes) { + if (typeof globalThis.crypto !== 'undefined' && globalThis.crypto.subtle) { + const buf = /** @type {ArrayBuffer} */ (bytes.buffer).slice(bytes.byteOffset, bytes.byteOffset + bytes.byteLength); + const digest = await globalThis.crypto.subtle.digest('SHA-256', buf); + return hexEncode(new Uint8Array(digest)); + } + // FNV-1a 64-bit (as two 32-bit halves) — not cryptographic, just deterministic. + // Produces 16-char hex OIDs (shorter than SHA). Acceptable because + // InMemoryBlobStorageAdapter OIDs never leave the process boundary. + let h1 = 0x811c9dc5; + let h2 = 0xcbf29ce4; + for (let i = 0; i < bytes.length; i++) { + h1 ^= bytes[i]; + h1 = Math.imul(h1, 0x01000193); + h2 ^= bytes[i]; + h2 = Math.imul(h2, 0x01000193); + } + return (h1 >>> 0).toString(16).padStart(8, '0') + + (h2 >>> 0).toString(16).padStart(8, '0'); +} + +export default class InMemoryBlobStorageAdapter extends BlobStoragePort { + constructor() { + super(); + /** @type {Map} */ + this._store = new Map(); + } + + /** + * @override + * @param {Uint8Array|string} content + * @param {{ slug?: string, mime?: string|null, size?: number|null }} [_options] + * @returns {Promise} + */ + async store(content, _options) { + const bytes = typeof content === 'string' + ? _encoder.encode(content) + : content; + const oid = await contentHash(bytes); + this._store.set(oid, bytes); + return oid; + } + + /** + * @override + * @param {string} oid + * @returns {Promise} + */ + retrieve(oid) { + const bytes = this._store.get(oid); + if (!bytes) { + return Promise.reject(new Error(`InMemoryBlobStorageAdapter: unknown OID '${oid}'`)); + } + return Promise.resolve(bytes); + } + + /** + * @override + * @param {AsyncIterable} source + * @param {{ slug?: string, mime?: string|null, size?: number|null }} [_options] + * @returns {Promise} + */ + async storeStream(source, _options) { + const bytes = await collectAsyncIterable(source); + return await this.store(bytes); + } + + /** + * @override + * @param {string} oid + * @returns {AsyncIterable} + */ + retrieveStream(oid) { + const bytes = this._store.get(oid); + if (!bytes) { + throw new Error(`InMemoryBlobStorageAdapter: unknown OID '${oid}'`); + } + const chunk = bytes; + return /** @type {AsyncIterable} */ ({ + [Symbol.asyncIterator]() { + let done = false; + return { + next() { + if (done) { + return Promise.resolve({ value: undefined, done: true }); + } + done = true; + return Promise.resolve({ value: chunk, done: false }); + }, + }; + }, + }); + } +} diff --git a/src/domain/utils/streamUtils.js b/src/domain/utils/streamUtils.js new file mode 100644 index 00000000..0a325391 --- /dev/null +++ b/src/domain/utils/streamUtils.js @@ -0,0 +1,157 @@ +/** + * Stream normalization utilities for content attachment I/O. + * + * Domain-only — no Node.js stream imports. Uses AsyncIterable + * as the universal stream type across Node, Bun, and Deno. + * + * @module domain/utils/streamUtils + */ + +const _encoder = new TextEncoder(); + +/** + * Returns true when the value is an async iterable (has Symbol.asyncIterator). + * + * @param {unknown} value + * @returns {value is AsyncIterable} + */ +function isAsyncIterable(value) { + return value !== null + && typeof value === 'object' + && Symbol.asyncIterator in /** @type {object} */ (value); +} + +/** + * Returns true when the value is a ReadableStream (Web Streams API). + * + * @param {unknown} value + * @returns {value is ReadableStream} + */ +function isReadableStream(value) { + return typeof ReadableStream !== 'undefined' + && value instanceof ReadableStream; +} + +/** + * Returns true when the content is a streaming input type + * (AsyncIterable or ReadableStream) rather than a buffered value. + * + * @param {unknown} content + * @returns {boolean} + */ +export function isStreamingInput(content) { + // Buffered types are never streaming, even if a polyfill adds Symbol.asyncIterator + if (content instanceof Uint8Array || typeof content === 'string') { + return false; + } + return isAsyncIterable(content) || isReadableStream(content); +} + +/** + * Normalizes any supported content input to AsyncIterable. + * + * Accepted input types: + * - `AsyncIterable` — passed through + * - `ReadableStream` — adapted via async iteration protocol + * - `Uint8Array` — wrapped as single-element async iterable + * - `string` — encoded to UTF-8, wrapped as single-element async iterable + * + * @param {AsyncIterable | ReadableStream | Uint8Array | string} content + * @returns {AsyncIterable} + */ +export function normalizeToAsyncIterable(content) { + if (isAsyncIterable(content)) { + return content; + } + + if (isReadableStream(content)) { + // ReadableStream implements Symbol.asyncIterator in modern runtimes. + // For those that don't, use getReader() manually. + if (Symbol.asyncIterator in content) { + return /** @type {AsyncIterable} */ (content); + } + return readableStreamToAsyncIterable(content); + } + + const bytes = typeof content === 'string' + ? _encoder.encode(content) + : content; + + return singleValueAsyncIterable(bytes); +} + +/** + * Wraps a single Uint8Array as an async iterable yielding one chunk. + * + * @param {Uint8Array} value + * @returns {AsyncIterable} + */ +function singleValueAsyncIterable(value) { + return { + [Symbol.asyncIterator]() { + let done = false; + return { + next() { + if (done) { + return Promise.resolve({ value: undefined, done: true }); + } + done = true; + return Promise.resolve({ value, done: false }); + }, + }; + }, + }; +} + +/** + * Adapts a ReadableStream to an async iterable via getReader(). + * + * @param {ReadableStream} stream + * @returns {AsyncIterable} + */ +function readableStreamToAsyncIterable(stream) { + return { + [Symbol.asyncIterator]() { + const reader = stream.getReader(); + return { + async next() { + const { value, done } = await reader.read(); + if (done) { + reader.releaseLock(); + return { value: undefined, done: true }; + } + return { value, done: false }; + }, + return() { + reader.releaseLock(); + return Promise.resolve({ value: undefined, done: true }); + }, + }; + }, + }; +} + +/** + * Collects an async iterable into a single Uint8Array. + * + * @param {AsyncIterable} source + * @returns {Promise} + */ +export async function collectAsyncIterable(source) { + const chunks = []; + let totalLength = 0; + for await (const chunk of source) { + chunks.push(chunk); + totalLength += chunk.byteLength; + } + if (chunks.length === 1) { + return chunks[0]; + } + const result = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.byteLength; + } + return result; +} diff --git a/src/domain/warp/_wiredMethods.d.ts b/src/domain/warp/_wiredMethods.d.ts index d501c5a7..582184e7 100644 --- a/src/domain/warp/_wiredMethods.d.ts +++ b/src/domain/warp/_wiredMethods.d.ts @@ -639,7 +639,6 @@ declare module '../WarpRuntime.js' { getWriterPatches(writerId: string, stopAtSha?: string | null): Promise>; _onPatchCommitted(writerId: string, opts?: { patch?: PatchV2; sha?: string }): Promise; writer(writerId?: string): Promise; - createWriter(opts?: { persist?: 'config' | 'none'; alias?: string }): Promise; _ensureFreshState(): Promise; discoverWriters(): Promise; discoverTicks(): Promise<{ ticks: number[]; maxTick: number; perWriter: Map }> }>; diff --git a/src/domain/warp/patch.methods.js b/src/domain/warp/patch.methods.js index 4d23107f..1a485f28 100644 --- a/src/domain/warp/patch.methods.js +++ b/src/domain/warp/patch.methods.js @@ -16,7 +16,7 @@ import { vvIncrement, vvClone } from '../crdt/VersionVector.js'; import { buildWriterRef, buildWritersPrefix, parseWriterIdFromRef } from '../utils/RefLayout.js'; import { decodePatchMessage, detectMessageKind } from '../services/WarpMessageCodec.js'; import { Writer } from './Writer.js'; -import { generateWriterId, resolveWriterId } from '../utils/WriterId.js'; +import { resolveWriterId } from '../utils/WriterId.js'; import EncryptionError from '../errors/EncryptionError.js'; import PersistenceError from '../errors/PersistenceError.js'; @@ -345,65 +345,6 @@ export async function writer(writerId) { }); } -/** - * Creates a new Writer with a fresh canonical ID. - * - * This always generates a new unique writer ID, regardless of any - * existing configuration. Use this when you need a guaranteed fresh - * identity (e.g., spawning a new writer process). - * - * @deprecated Use `writer()` to resolve a stable ID from git config, or `writer(id)` with an explicit ID. - * @this {import('../WarpRuntime.js').default} - * @param {{ persist?: 'config'|'none', alias?: string }} [opts] - * @returns {Promise} A Writer instance with new canonical ID - * @throws {Error} If config operations fail (when persist:'config') - * - * @example - * // Create ephemeral writer (not persisted) - * const writer = await graph.createWriter(); - * - * @example - * // Create and persist to git config - * const writer = await graph.createWriter({ persist: 'config' }); - */ -export async function createWriter(opts = {}) { - if (this._logger) { - this._logger.warn('[warp] createWriter() is deprecated. Use writer() or writer(id) instead.'); - } else { - // eslint-disable-next-line no-console - console.warn('[warp] createWriter() is deprecated. Use writer() or writer(id) instead.'); - } - - const { persist = 'none', alias } = opts; - - // Generate new canonical writerId - const freshWriterId = generateWriterId(); - - // Optionally persist to git config - if (persist === 'config') { - const configKey = alias - ? `warp.writerId.${alias}` - : `warp.writerId.${this._graphName}`; - await /** @type {import('../../ports/ConfigPort.js').default} */ (/** @type {unknown} */ (this._persistence)).configSet(configKey, freshWriterId); - } - - /** @type {CorePersistence} */ - const writerPersistence = this._persistence; - return new Writer({ - persistence: writerPersistence, - graphName: this._graphName, - writerId: freshWriterId, - versionVector: this._versionVector, - getCurrentState: () => this._cachedState, - onDeleteWithData: this._onDeleteWithData, - onCommitSuccess: /** @type {(result: {patch: import('../types/WarpTypesV2.js').PatchV2, sha: string}) => void} */ ((/** @type {{patch?: import('../types/WarpTypesV2.js').PatchV2, sha?: string}} */ commitOpts) => this._onPatchCommitted(freshWriterId, commitOpts)), - codec: this._codec, - logger: this._logger || undefined, - blobStorage: this._blobStorage || undefined, - patchBlobStorage: this._patchBlobStorage || undefined, - }); -} - /** * Ensures cached state is fresh. When autoMaterialize is enabled, * materializes if state is null or dirty. Otherwise throws. diff --git a/src/domain/warp/query.methods.js b/src/domain/warp/query.methods.js index fb2cbc6f..4e589bc1 100644 --- a/src/domain/warp/query.methods.js +++ b/src/domain/warp/query.methods.js @@ -813,3 +813,79 @@ export async function getEdgeContent(from, to, label) { } return await this._persistence.readBlob(oid); } + +/** + * Gets the content blob for a node as a stream, or null if none is attached. + * + * Returns an async iterable of Uint8Array chunks for incremental + * consumption. Use `getContent()` when you want the full buffer. + * + * @this {import('../WarpRuntime.js').default} + * @param {string} nodeId - The node ID to get content for + * @returns {Promise|null>} Async iterable of content chunks, or null + */ +export async function getContentStream(nodeId) { + await this._ensureFreshState(); + const s = /** @type {import('../services/JoinReducer.js').WarpStateV5} */ (this._cachedState); + const registers = getNodeContentRegisters(s, nodeId); + if (!registers) { + return null; + } + const { value: oid } = registers.contentRegister; + if (this._blobStorage && typeof this._blobStorage.retrieveStream === 'function') { + return this._blobStorage.retrieveStream(oid); + } + // Fallback: wrap buffered read as single-chunk async iterable + const buf = await this._persistence.readBlob(oid); + return singleChunkAsyncIterable(buf); +} + +/** + * Gets the content blob for an edge as a stream, or null if none is attached. + * + * Returns an async iterable of Uint8Array chunks for incremental + * consumption. Use `getEdgeContent()` when you want the full buffer. + * + * @this {import('../WarpRuntime.js').default} + * @param {string} from - Source node ID + * @param {string} to - Target node ID + * @param {string} label - Edge label + * @returns {Promise|null>} Async iterable of content chunks, or null + */ +export async function getEdgeContentStream(from, to, label) { + await this._ensureFreshState(); + const s = /** @type {import('../services/JoinReducer.js').WarpStateV5} */ (this._cachedState); + const registers = getEdgeContentRegisters(s, from, to, label); + if (!registers) { + return null; + } + const { value: oid } = registers.contentRegister; + if (this._blobStorage && typeof this._blobStorage.retrieveStream === 'function') { + return this._blobStorage.retrieveStream(oid); + } + const buf = await this._persistence.readBlob(oid); + return singleChunkAsyncIterable(buf); +} + +/** + * Wraps a single buffer as an async iterable yielding one chunk. + * + * @param {Uint8Array} buf + * @returns {AsyncIterable} + */ +function singleChunkAsyncIterable(buf) { + return { + [Symbol.asyncIterator]() { + let done = false; + return { + next() { + if (done) { + return Promise.resolve({ value: undefined, done: true }); + } + done = true; + return Promise.resolve({ value: buf, done: false }); + }, + }; + }, + }; +} diff --git a/src/infrastructure/adapters/CasBlobAdapter.js b/src/infrastructure/adapters/CasBlobAdapter.js index 53b6d230..169ddfdc 100644 --- a/src/infrastructure/adapters/CasBlobAdapter.js +++ b/src/infrastructure/adapters/CasBlobAdapter.js @@ -18,7 +18,7 @@ import { createLazyCas } from './lazyCasInit.js'; import LoggerObservabilityBridge from './LoggerObservabilityBridge.js'; import { Readable } from 'node:stream'; -/** @typedef {{ readManifest: Function, restore: Function, store: Function, createTree: Function }} CasStore */ +/** @typedef {{ readManifest: Function, restore: Function, restoreStream?: Function, store: Function, createTree: Function }} CasStore */ /** * Error codes from `@git-stunts/git-cas` that indicate the OID is not @@ -143,7 +143,10 @@ export default class CasBlobAdapter extends BlobStoragePort { restoreOpts.encryptionKey = this._encryptionKey; } const { buffer } = await cas.restore(restoreOpts); - return buffer; + // Normalize Buffer to Uint8Array for domain boundary compliance + return buffer instanceof Uint8Array && buffer.constructor !== Uint8Array + ? new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength) + : buffer; } catch (err) { // Fallback: OID may be a raw Git blob (pre-CAS content). // Only fall through for "not a manifest" errors (missing tree, bad format). @@ -162,4 +165,133 @@ export default class CasBlobAdapter extends BlobStoragePort { return blob; } } + + /** + * Stores content from a streaming source via git-cas. + * + * The source async iterable is piped directly to CAS without + * intermediate buffering. + * + * @override + * @param {AsyncIterable} source + * @param {{ slug?: string, mime?: string|null, size?: number|null }} [options] + * @returns {Promise} + */ + async storeStream(source, options) { + const cas = await this._getCas(); + const readable = Readable.from(source); + + /** @type {{ source: *, slug: string, filename: string, encryptionKey?: Uint8Array }} */ + const storeOpts = { + source: readable, + slug: options?.slug || `blob-${Date.now().toString(36)}`, + filename: 'content', + }; + if (this._encryptionKey) { + storeOpts.encryptionKey = this._encryptionKey; + } + + const manifest = await cas.store(storeOpts); + return await cas.createTree({ manifest }); + } + + /** + * Retrieves content as an async iterable of chunks. Uses + * `cas.restoreStream()` when available, falling back to + * buffered `cas.restore()` wrapped as a single-chunk yield. + * + * Falls back to a single-chunk yield from `persistence.readBlob()` + * for legacy raw Git blobs written before CAS migration. + * + * @override + * @param {string} oid + * @returns {AsyncIterable} + */ + retrieveStream(oid) { + const self = this; + return /** @type {AsyncIterable} */ ({ + [Symbol.asyncIterator]() { + /** @type {AsyncIterator|null} */ + let inner = null; + let initialized = false; + return { + async next() { + if (!initialized) { + initialized = true; + inner = await self._resolveStreamIterator(oid); + } + return await /** @type {AsyncIterator} */ (inner).next(); + }, + return() { + return Promise.resolve({ value: undefined, done: true }); + }, + }; + }, + }); + } + + /** + * Resolves the inner async iterator for retrieveStream(). + * + * @private + * @param {string} oid + * @returns {Promise>} + */ + async _resolveStreamIterator(oid) { + const cas = await this._getCas(); + + try { + const manifest = await cas.readManifest({ treeOid: oid }); + /** @type {{ manifest: *, encryptionKey?: Uint8Array }} */ + const restoreOpts = { manifest }; + if (this._encryptionKey) { + restoreOpts.encryptionKey = this._encryptionKey; + } + + if (typeof cas.restoreStream === 'function') { + const stream = cas.restoreStream(restoreOpts); + return stream[Symbol.asyncIterator](); + } + + // Fallback: buffered restore as single chunk + const { buffer } = await cas.restore(restoreOpts); + return singleChunkIterator(buffer); + } catch (err) { + if (!isLegacyBlobError(err)) { + throw err; + } + const blob = await this._persistence.readBlob(oid); + if (blob === null || blob === undefined) { + throw new PersistenceError( + `Missing Git object: ${oid}`, + PersistenceError.E_MISSING_OBJECT, + { context: { oid } }, + ); + } + return singleChunkIterator(blob); + } + } +} + +/** + * Creates a single-element async iterator from a buffer. + * + * @param {Uint8Array} buf + * @returns {AsyncIterator} + */ +function singleChunkIterator(buf) { + let done = false; + return { + next() { + if (done) { + return Promise.resolve({ value: undefined, done: true }); + } + done = true; + return Promise.resolve({ value: buf, done: false }); + }, + return() { + done = true; + return Promise.resolve({ value: undefined, done: true }); + }, + }; } diff --git a/src/ports/BlobStoragePort.js b/src/ports/BlobStoragePort.js index cce9ca60..25be9862 100644 --- a/src/ports/BlobStoragePort.js +++ b/src/ports/BlobStoragePort.js @@ -30,4 +30,27 @@ export default class BlobStoragePort { async retrieve(_oid) { throw new Error('BlobStoragePort.retrieve() not implemented'); } + + /** + * Stores content from a streaming source and returns a storage identifier. + * + * @param {AsyncIterable} _source - Async iterable of content chunks + * @param {{ slug?: string, mime?: string|null, size?: number|null }} [_options] - Optional storage metadata + * @returns {Promise} Storage identifier for retrieval + * @abstract + */ + async storeStream(_source, _options) { + throw new Error('BlobStoragePort.storeStream() not implemented'); + } + + /** + * Retrieves content as an async iterable of chunks. + * + * @param {string} _oid - Storage identifier returned by store() or storeStream() + * @returns {AsyncIterable} Async iterable of content chunks + * @abstract + */ + retrieveStream(_oid) { + throw new Error('BlobStoragePort.retrieveStream() not implemented'); + } } diff --git a/test/integration/api/helpers/setup.js b/test/integration/api/helpers/setup.js index bfec0bd2..2c0ed429 100644 --- a/test/integration/api/helpers/setup.js +++ b/test/integration/api/helpers/setup.js @@ -10,6 +10,7 @@ import { tmpdir } from 'node:os'; // @ts-expect-error - no declaration file for @git-stunts/plumbing import Plumbing from '@git-stunts/plumbing'; import GitGraphAdapter from '../../../../src/infrastructure/adapters/GitGraphAdapter.js'; +import CasBlobAdapter from '../../../../src/infrastructure/adapters/CasBlobAdapter.js'; import WarpRuntime from '../../../../src/domain/WarpRuntime.js'; import WebCryptoAdapter from '../../../../src/infrastructure/adapters/WebCryptoAdapter.js'; @@ -37,8 +38,16 @@ export async function createTestRepo(label = 'api-test') { * @param {Object} [opts={}] - Additional options forwarded to WarpRuntime.open * @returns {Promise} Opened WarpRuntime instance */ + const blobStorage = new CasBlobAdapter({ plumbing, persistence }); + + /** + * @param {string} graphName + * @param {string} writerId + * @param {Object} [opts] + */ async function openGraph(graphName, writerId, opts = {}) { return WarpRuntime.open({ + blobStorage, ...opts, persistence, graphName, diff --git a/test/type-check/consumer.ts b/test/type-check/consumer.ts index bedd6b89..d1ba7a03 100644 --- a/test/type-check/consumer.ts +++ b/test/type-check/consumer.ts @@ -416,10 +416,6 @@ const wPs: PatchSession = await w.beginPatch(); const wWriterId: string = w.writerId; const wGraphName: string = w.graphName; -// ---- createWriter (deprecated) ---- -const cw: Writer = await graph.createWriter(); -const cwWithOpts: Writer = await graph.createWriter({ persist: 'config', alias: 'test' }); - // ---- PatchSession methods ---- const ps: PatchSession = await w.beginPatch(); const ps2: PatchSession = ps.addNode('x').removeNode('y').addEdge('a', 'b', 'c').removeEdge('a', 'b', 'c'); diff --git a/test/unit/domain/WarpGraph.content.test.js b/test/unit/domain/WarpGraph.content.test.js index 84352b0c..1c4dfb32 100644 --- a/test/unit/domain/WarpGraph.content.test.js +++ b/test/unit/domain/WarpGraph.content.test.js @@ -166,7 +166,13 @@ describe('WarpRuntime content attachment (query methods)', () => { describe('getContent()', () => { it('reads and returns the blob buffer', async () => { const buf = new TextEncoder().encode('# ADR 001\n\nSome content'); - mockPersistence.readBlob.mockResolvedValue(buf); + const blobStorage = { + store: vi.fn(), + retrieve: vi.fn().mockResolvedValue(buf), + storeStream: vi.fn(), + retrieveStream: vi.fn(), + }; + /** @type {any} */ (graph)._blobStorage = blobStorage; setupGraphState(graph, (/** @type {any} */ state) => { addNode(state, 'doc:1', 1); @@ -176,7 +182,7 @@ describe('WarpRuntime content attachment (query methods)', () => { const content = await graph.getContent('doc:1'); expect(content).toEqual(buf); - expect(mockPersistence.readBlob).toHaveBeenCalledWith('abc123'); + expect(blobStorage.retrieve).toHaveBeenCalledWith('abc123'); }); it('returns null when no content attached', async () => { @@ -219,9 +225,18 @@ describe('WarpRuntime content attachment (query methods)', () => { expect(mockPersistence.readBlob).not.toHaveBeenCalled(); }); - it('falls back to persistence.readBlob() when blobStorage is not provided', async () => { + it('uses auto-constructed blobStorage when none explicitly provided', async () => { + // OG-014: blob storage is always present (auto-constructed) + // The auto-constructed InMemoryBlobStorageAdapter won't have this OID, + // so we inject a mock to verify the read path goes through blobStorage const rawBuf = new TextEncoder().encode('raw blob'); - mockPersistence.readBlob.mockResolvedValue(rawBuf); + const blobStorage = { + store: vi.fn(), + retrieve: vi.fn().mockResolvedValue(rawBuf), + storeStream: vi.fn(), + retrieveStream: vi.fn(), + }; + /** @type {any} */ (graph)._blobStorage = blobStorage; setupGraphState(graph, (/** @type {any} */ state) => { addNode(state, 'doc:1', 1); @@ -232,7 +247,7 @@ describe('WarpRuntime content attachment (query methods)', () => { const content = await graph.getContent('doc:1'); expect(content).toEqual(rawBuf); - expect(mockPersistence.readBlob).toHaveBeenCalledWith('raw-oid'); + expect(blobStorage.retrieve).toHaveBeenCalledWith('raw-oid'); }); it('preserves E_MISSING_OBJECT from blobStorage.retrieve()', async () => { @@ -416,7 +431,13 @@ describe('WarpRuntime content attachment (query methods)', () => { describe('getEdgeContent()', () => { it('reads and returns the blob buffer', async () => { const buf = new TextEncoder().encode('edge content'); - mockPersistence.readBlob.mockResolvedValue(buf); + const blobStorage = { + store: vi.fn(), + retrieve: vi.fn().mockResolvedValue(buf), + storeStream: vi.fn(), + retrieveStream: vi.fn(), + }; + /** @type {any} */ (graph)._blobStorage = blobStorage; setupGraphState(graph, (/** @type {any} */ state) => { addNode(state, 'a', 1); @@ -428,7 +449,7 @@ describe('WarpRuntime content attachment (query methods)', () => { const content = await graph.getEdgeContent('a', 'b', 'rel'); expect(content).toEqual(buf); - expect(mockPersistence.readBlob).toHaveBeenCalledWith('def456'); + expect(blobStorage.retrieve).toHaveBeenCalledWith('def456'); }); it('returns null when no content attached', async () => { @@ -442,4 +463,105 @@ describe('WarpRuntime content attachment (query methods)', () => { expect(content).toBeNull(); }); }); + + describe('getContentStream()', () => { + it('returns an async iterable of content chunks', async () => { + const chunk1 = new TextEncoder().encode('hello '); + const chunk2 = new TextEncoder().encode('world'); + const blobStorage = { + store: vi.fn(), + retrieve: vi.fn(), + storeStream: vi.fn(), + retrieveStream: vi.fn().mockReturnValue((async function* () { + yield chunk1; + yield chunk2; + })()), + }; + /** @type {any} */ (graph)._blobStorage = blobStorage; + + setupGraphState(graph, (/** @type {any} */ state) => { + addNode(state, 'doc:1', 1); + const propKey = encodePropKey('doc:1', '_content'); + state.prop.set(propKey, { eventId: null, value: 'cas-tree-oid' }); + }); + + const stream = await graph.getContentStream('doc:1'); + expect(stream).not.toBeNull(); + + const chunks = []; + for await (const chunk of /** @type {AsyncIterable} */ (stream)) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(2); + expect(chunks[0]).toBe(chunk1); + expect(chunks[1]).toBe(chunk2); + expect(blobStorage.retrieveStream).toHaveBeenCalledWith('cas-tree-oid'); + }); + + it('returns null when no content is attached', async () => { + setupGraphState(graph, (/** @type {any} */ state) => { + addNode(state, 'doc:1', 1); + }); + + const stream = await graph.getContentStream('doc:1'); + expect(stream).toBeNull(); + }); + + it('returns null for nonexistent node', async () => { + setupGraphState(graph, () => {}); + + const stream = await graph.getContentStream('nonexistent'); + expect(stream).toBeNull(); + }); + }); + + describe('getEdgeContentStream()', () => { + it('returns an async iterable of edge content chunks', async () => { + const chunk = new TextEncoder().encode('edge stream data'); + const blobStorage = { + store: vi.fn(), + retrieve: vi.fn(), + storeStream: vi.fn(), + retrieveStream: vi.fn().mockReturnValue((async function* () { + yield chunk; + })()), + }; + /** @type {any} */ (graph)._blobStorage = blobStorage; + + setupGraphState(graph, (/** @type {any} */ state) => { + addNode(state, 'a', 1); + addNode(state, 'b', 2); + addEdge(state, 'a', 'b', 'rel', 3); + const propKey = encodeEdgePropKey('a', 'b', 'rel', '_content'); + state.prop.set(propKey, { + eventId: { lamport: 2, writerId: 'w1', patchSha: 'aabbccdd', opIndex: 0 }, + value: 'cas-edge-tree-oid', + }); + }); + + const stream = await graph.getEdgeContentStream('a', 'b', 'rel'); + expect(stream).not.toBeNull(); + + const chunks = []; + for await (const c of /** @type {AsyncIterable} */ (stream)) { + chunks.push(c); + } + + expect(chunks).toHaveLength(1); + expect(chunks[0]).toBe(chunk); + expect(blobStorage.retrieveStream).toHaveBeenCalledWith('cas-edge-tree-oid'); + }); + + it('returns null when no edge content is attached', async () => { + setupGraphState(graph, (/** @type {any} */ state) => { + addNode(state, 'a', 1); + addNode(state, 'b', 2); + addEdge(state, 'a', 'b', 'rel', 3); + }); + + const stream = await graph.getEdgeContentStream('a', 'b', 'rel'); + expect(stream).toBeNull(); + }); + }); }); diff --git a/test/unit/domain/WarpGraph.prototypeWiring.test.js b/test/unit/domain/WarpGraph.prototypeWiring.test.js index e64019ef..a677b5b6 100644 --- a/test/unit/domain/WarpGraph.prototypeWiring.test.js +++ b/test/unit/domain/WarpGraph.prototypeWiring.test.js @@ -99,7 +99,7 @@ describe('WarpRuntime prototype completeness', () => { 'applySyncResponse', 'syncNeeded', 'createCheckpoint', 'maybeRunGC', 'runGC', 'getGCMetrics', 'getFrontier', 'hasFrontierChanged', 'status', - 'writer', 'createWriter', + 'writer', 'query', 'observer', 'translationCost', 'discoverWriters', 'discoverTicks', 'join', 'syncCoverage', diff --git a/test/unit/domain/WarpGraph.test.js b/test/unit/domain/WarpGraph.test.js index 06202f3f..725ea4ae 100644 --- a/test/unit/domain/WarpGraph.test.js +++ b/test/unit/domain/WarpGraph.test.js @@ -1971,125 +1971,6 @@ eg-schema: 2`; }); }); - describe('createWriter()', () => { - /** @type {ReturnType} */ - let consoleWarnSpy; - - beforeEach(() => { - consoleWarnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); - }); - - afterEach(() => { - consoleWarnSpy.mockRestore(); - }); - - it('generates fresh canonical ID', async () => { - const persistence = createMockPersistence(); - persistence.configGet = vi.fn(); - persistence.configSet = vi.fn(); - - const graph = await WarpRuntime.open({ - persistence, - graphName: 'events', - writerId: 'node-1', - }); - - const writer = await graph.createWriter(); - - expect(writer.writerId).toMatch(/^w_[0-9a-hjkmnp-tv-z]{26}$/); - // By default, should not persist - expect(persistence.configSet).not.toHaveBeenCalled(); - }); - - it('generates unique IDs on each call', async () => { - const persistence = createMockPersistence(); - persistence.configGet = vi.fn(); - persistence.configSet = vi.fn(); - - const graph = await WarpRuntime.open({ - persistence, - graphName: 'events', - writerId: 'node-1', - }); - - const writer1 = await graph.createWriter(); - const writer2 = await graph.createWriter(); - - expect(writer1.writerId).not.toBe(writer2.writerId); - expect(writer1.writerId).toMatch(/^w_[0-9a-hjkmnp-tv-z]{26}$/); - expect(writer2.writerId).toMatch(/^w_[0-9a-hjkmnp-tv-z]{26}$/); - }); - - it('persists to git config when persist: "config"', async () => { - const persistence = createMockPersistence(); - persistence.configGet = vi.fn(); - persistence.configSet = vi.fn(); - - const graph = await WarpRuntime.open({ - persistence, - graphName: 'my-graph', - writerId: 'node-1', - }); - - const writer = await graph.createWriter({ persist: 'config' }); - - expect(persistence.configSet).toHaveBeenCalledWith( - 'warp.writerId.my-graph', - writer.writerId - ); - }); - - it('uses alias for config key when provided', async () => { - const persistence = createMockPersistence(); - persistence.configGet = vi.fn(); - persistence.configSet = vi.fn(); - - const graph = await WarpRuntime.open({ - persistence, - graphName: 'events', - writerId: 'node-1', - }); - - const writer = await graph.createWriter({ persist: 'config', alias: 'secondary' }); - - expect(persistence.configSet).toHaveBeenCalledWith( - 'warp.writerId.secondary', - writer.writerId - ); - }); - - it('does not persist when persist: "none" (default)', async () => { - const persistence = createMockPersistence(); - persistence.configGet = vi.fn(); - persistence.configSet = vi.fn(); - - const graph = await WarpRuntime.open({ - persistence, - graphName: 'events', - writerId: 'node-1', - }); - - await graph.createWriter({ persist: 'none' }); - - expect(persistence.configSet).not.toHaveBeenCalled(); - }); - - it('returns Writer instance with correct graphName', async () => { - const persistence = createMockPersistence(); - persistence.configGet = vi.fn(); - persistence.configSet = vi.fn(); - - const graph = await WarpRuntime.open({ - persistence, - graphName: 'my-events', - writerId: 'node-1', - }); - - const writer = await graph.createWriter(); - - expect(writer.graphName).toBe('my-events'); - }); - }); }); // =========================================================================== diff --git a/test/unit/domain/WarpGraph.writerApi.test.js b/test/unit/domain/WarpGraph.writerApi.test.js index 19a92695..4fa765ac 100644 --- a/test/unit/domain/WarpGraph.writerApi.test.js +++ b/test/unit/domain/WarpGraph.writerApi.test.js @@ -48,57 +48,4 @@ describe('WarpRuntime writer API', () => { expect(w.writerId).toBe('alice'); }); - // ============================================================================ - // createWriter() — deprecated, still functional - // ============================================================================ - - it('createWriter() logs deprecation warning', async () => { - const consoleWarnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); - try { - await graph.createWriter(); - expect(consoleWarnSpy).toHaveBeenCalledWith( - '[warp] createWriter() is deprecated. Use writer() or writer(id) instead.', - ); - } finally { - consoleWarnSpy.mockRestore(); - } - }); - - it('createWriter() logs via logger when present', async () => { - /** @type {any} */ - const mockLogger = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; - const graphWithLogger = await WarpRuntime.open({ - persistence: mockPersistence, - graphName: 'test', - writerId: 'writer-1', - logger: mockLogger, - }); - - const consoleWarnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); - try { - await graphWithLogger.createWriter(); - expect(mockLogger.warn).toHaveBeenCalledWith( - '[warp] createWriter() is deprecated. Use writer() or writer(id) instead.', - ); - } finally { - consoleWarnSpy.mockRestore(); - } - }); - - it('createWriter() returns a Writer with a generated unique ID', async () => { - const consoleWarnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); - try { - const w1 = await graph.createWriter(); - const w2 = await graph.createWriter(); - - // Both should have canonical writer IDs (w_ prefix, 28 chars) - expect(w1.writerId).toMatch(/^w_[0-9a-hjkmnp-tv-z]{26}$/); - expect(w2.writerId).toMatch(/^w_[0-9a-hjkmnp-tv-z]{26}$/); - - // Each call generates a distinct ID - expect(w1.writerId).not.toBe(w2.writerId); - } finally { - consoleWarnSpy.mockRestore(); - } - }); }); diff --git a/test/unit/domain/WarpGraph.writerInvalidation.test.js b/test/unit/domain/WarpGraph.writerInvalidation.test.js index eb0cbe21..c9d2426b 100644 --- a/test/unit/domain/WarpGraph.writerInvalidation.test.js +++ b/test/unit/domain/WarpGraph.writerInvalidation.test.js @@ -9,8 +9,8 @@ import { createMockPersistence } from '../../helpers/warpGraphTestUtils.js'; * * The Writer and PatchSession are higher-level APIs that delegate to * PatchBuilderV2. The onCommitSuccess callback wired in WarpRuntime.writer() - * and WarpRuntime.createWriter() must trigger eager state update so that - * queries after a writer commit reflect the new state immediately. + * must trigger eager state update so that queries after a writer commit + * reflect the new state immediately. */ const FAKE_BLOB_OID = 'a'.repeat(40); @@ -163,19 +163,13 @@ describe('WarpRuntime Writer invalidation (AP/INVAL/3)', () => { expect(/** @type {any} */ (graph)._stateDirty).toBe(true); }); - // ── createWriter() path ────────────────────────────────────────── + // ── writer(id) path ────────────────────────────────────────── - it('createWriter() path also triggers eager invalidation', async () => { + it('writer(id) path also triggers eager invalidation', async () => { await graph.materialize(); mockWriterFirstCommit(persistence); - const consoleWarnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); - let writer; - try { - writer = await graph.createWriter(); - } finally { - consoleWarnSpy.mockRestore(); - } + const writer = await graph.writer('fresh-writer'); await writer.commitPatch((/** @type {any} */ p) => p.addNode('test:node')); diff --git a/test/unit/domain/WarpRuntime.blobAutoConstruct.test.js b/test/unit/domain/WarpRuntime.blobAutoConstruct.test.js new file mode 100644 index 00000000..78d6dfd8 --- /dev/null +++ b/test/unit/domain/WarpRuntime.blobAutoConstruct.test.js @@ -0,0 +1,91 @@ +import { describe, it, expect, vi } from 'vitest'; +import WarpRuntime from '../../../src/domain/WarpRuntime.js'; + +/** + * Spec tests for OG-014: auto-construction of blob storage. + * + * When no explicit `blobStorage` is provided to `WarpRuntime.open()`, + * the runtime should auto-construct the appropriate adapter: + * - `CasBlobAdapter` when persistence has plumbing (Git-backed) + * - `InMemoryBlobStorageAdapter` when persistence lacks plumbing (in-memory) + * + * These tests should all FAIL against the current code (red phase). + */ + +/** @returns {any} */ +function makeMockPersistence({ hasPlumbing = false } = {}) { + const persistence = { + readRef: vi.fn().mockResolvedValue(null), + listRefs: vi.fn().mockResolvedValue([]), + updateRef: vi.fn().mockResolvedValue(undefined), + configGet: vi.fn().mockResolvedValue(null), + configSet: vi.fn().mockResolvedValue(undefined), + readBlob: vi.fn().mockResolvedValue(new Uint8Array()), + writeBlob: vi.fn().mockResolvedValue('a'.repeat(40)), + }; + if (hasPlumbing) { + /** @type {any} */ (persistence).plumbing = {}; + } + return persistence; +} + +describe('WarpRuntime blob storage auto-construction (OG-014)', () => { + it('auto-constructs blob storage when none is provided', async () => { + const graph = await WarpRuntime.open({ + persistence: makeMockPersistence(), + graphName: 'test', + writerId: 'w1', + }); + + // _blobStorage should not be null — it should be auto-constructed + expect(/** @type {any} */ (graph)._blobStorage).not.toBeNull(); + }); + + it('auto-constructs InMemoryBlobStorageAdapter when persistence lacks plumbing', async () => { + const graph = await WarpRuntime.open({ + persistence: makeMockPersistence({ hasPlumbing: false }), + graphName: 'test', + writerId: 'w1', + }); + + const bs = /** @type {any} */ (graph)._blobStorage; + expect(bs).not.toBeNull(); + // Should have the streaming methods + expect(typeof bs.storeStream).toBe('function'); + expect(typeof bs.retrieveStream).toBe('function'); + }); + + it('preserves explicitly provided blobStorage', async () => { + const customStorage = { + store: vi.fn(), + retrieve: vi.fn(), + storeStream: vi.fn(), + retrieveStream: vi.fn(), + }; + const graph = await WarpRuntime.open({ + persistence: makeMockPersistence(), + graphName: 'test', + writerId: 'w1', + blobStorage: /** @type {any} */ (customStorage), + }); + + expect(/** @type {any} */ (graph)._blobStorage).toBe(customStorage); + }); + + it('attachContent uses blob storage even when caller did not provide one', async () => { + const graph = await WarpRuntime.open({ + persistence: makeMockPersistence(), + graphName: 'test', + writerId: 'w1', + }); + + const patch = await graph.createPatch(); + patch.addNode('n1'); + + // This should NOT call persistence.writeBlob — it should use auto-constructed blob storage + await patch.attachContent('n1', 'hello'); + + const persistence = /** @type {any} */ (graph)._persistence; + expect(persistence.writeBlob).not.toHaveBeenCalled(); + }); +}); diff --git a/test/unit/domain/__snapshots__/WarpRuntime.apiSurface.test.js.snap b/test/unit/domain/__snapshots__/WarpRuntime.apiSurface.test.js.snap index a3cbd174..3c0cf293 100644 --- a/test/unit/domain/__snapshots__/WarpRuntime.apiSurface.test.js.snap +++ b/test/unit/domain/__snapshots__/WarpRuntime.apiSurface.test.js.snap @@ -212,11 +212,6 @@ exports[`WarpRuntime API surface > all prototype methods have correct property d "enumerable": false, "type": "method", }, - "createWriter": { - "configurable": true, - "enumerable": false, - "type": "method", - }, "discoverTicks": { "configurable": true, "enumerable": false, @@ -257,6 +252,11 @@ exports[`WarpRuntime API surface > all prototype methods have correct property d "enumerable": false, "type": "method", }, + "getContentStream": { + "configurable": true, + "enumerable": false, + "type": "method", + }, "getEdgeContent": { "configurable": true, "enumerable": false, @@ -272,6 +272,11 @@ exports[`WarpRuntime API surface > all prototype methods have correct property d "enumerable": false, "type": "method", }, + "getEdgeContentStream": { + "configurable": true, + "enumerable": false, + "type": "method", + }, "getEdgeProps": { "configurable": true, "enumerable": false, @@ -560,7 +565,7 @@ exports[`WarpRuntime API surface > all prototype methods have correct property d } `; -exports[`WarpRuntime API surface > prototype method count matches snapshot 1`] = `111`; +exports[`WarpRuntime API surface > prototype method count matches snapshot 1`] = `112`; exports[`WarpRuntime API surface > prototype methods match snapshot 1`] = ` [ @@ -606,7 +611,6 @@ exports[`WarpRuntime API surface > prototype methods match snapshot 1`] = ` "createStrandPatch", "createSyncRequest", "createWormhole", - "createWriter", "discoverTicks", "discoverWriters", "dropStrand", @@ -615,9 +619,11 @@ exports[`WarpRuntime API surface > prototype methods match snapshot 1`] = ` "getContent", "getContentMeta", "getContentOid", + "getContentStream", "getEdgeContent", "getEdgeContentMeta", "getEdgeContentOid", + "getEdgeContentStream", "getEdgeProps", "getEdges", "getFrontier", diff --git a/test/unit/domain/index.exports.test.js b/test/unit/domain/index.exports.test.js index 826e8135..da66040d 100644 --- a/test/unit/domain/index.exports.test.js +++ b/test/unit/domain/index.exports.test.js @@ -20,7 +20,6 @@ import WarpAppDefault, { HealthCheckService, HealthStatus, CommitDagTraversalService, - TraversalService, GraphPersistencePort, IndexStoragePort, @@ -166,11 +165,6 @@ describe('index.js exports', () => { expect(typeof CommitDagTraversalService).toBe('function'); }); - it('exports TraversalService', () => { - expect(TraversalService).toBeDefined(); - expect(typeof TraversalService).toBe('function'); - expect(TraversalService).toBe(CommitDagTraversalService); - }); }); describe('port interfaces', () => { diff --git a/test/unit/domain/services/CheckpointService.test.js b/test/unit/domain/services/CheckpointService.test.js index e558ab6b..9f8f87ec 100644 --- a/test/unit/domain/services/CheckpointService.test.js +++ b/test/unit/domain/services/CheckpointService.test.js @@ -718,8 +718,8 @@ describe('CheckpointService', () => { const treeEntries = mockPersistence.writeTree.mock.calls[0][0]; expect(treeEntries).toEqual([ - `100644 blob ${sharedOid}\t_content_${sharedOid}`, - `100644 blob ${edgeOid}\t_content_${edgeOid}`, + `040000 tree ${sharedOid}\t_content_${sharedOid}`, + `040000 tree ${edgeOid}\t_content_${edgeOid}`, expect.stringContaining('\tappliedVV.cbor'), expect.stringContaining('\tfrontier.cbor'), expect.stringContaining('\tstate.cbor'), @@ -771,8 +771,8 @@ describe('CheckpointService', () => { const treeEntries = mockPersistence.writeTree.mock.calls[0][0]; const contentEntries = treeEntries.filter((/** @type {string} */ entry) => entry.includes('\t_content_')); expect(contentEntries).toHaveLength(300); - expect(contentEntries[0]).toBe(`100644 blob ${makeSequentialOid(0)}\t_content_${makeSequentialOid(0)}`); - expect(contentEntries[299]).toBe(`100644 blob ${makeSequentialOid(299)}\t_content_${makeSequentialOid(299)}`); + expect(contentEntries[0]).toBe(`040000 tree ${makeSequentialOid(0)}\t_content_${makeSequentialOid(0)}`); + expect(contentEntries[299]).toBe(`040000 tree ${makeSequentialOid(299)}\t_content_${makeSequentialOid(299)}`); }); }); diff --git a/test/unit/domain/services/PatchBuilderV2.content.test.js b/test/unit/domain/services/PatchBuilderV2.content.test.js index 2fd9bad8..3e07de59 100644 --- a/test/unit/domain/services/PatchBuilderV2.content.test.js +++ b/test/unit/domain/services/PatchBuilderV2.content.test.js @@ -5,6 +5,21 @@ import { createORSet, orsetAdd } from '../../../../src/domain/crdt/ORSet.js'; import { createDot } from '../../../../src/domain/crdt/Dot.js'; import { encodeEdgeKey } from '../../../../src/domain/services/KeyCodec.js'; +/** + * Creates a mock blob storage with configurable OID return. + * @param {{ storeOid?: string }} [opts] + * @returns {any} + */ +function createMockBlobStorage(opts = {}) { + const oid = opts.storeOid || 'd'.repeat(40); + return { + store: vi.fn().mockResolvedValue(oid), + retrieve: vi.fn(), + storeStream: vi.fn().mockResolvedValue(oid), + retrieveStream: vi.fn(), + }; +} + /** * Creates a mock persistence adapter for testing. * @param {Object} [overrides] @@ -40,20 +55,21 @@ describe('PatchBuilderV2 content attachment', () => { it('writes blob and sets content reference metadata properties', async () => { const state = createMockState(); orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockResolvedValue('abc123'), - }); + const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage({ storeOid: 'abc123' }); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await builder.attachContent('node:1', 'hello world'); - expect(persistence.writeBlob).toHaveBeenCalledWith('hello world'); + expect(blobStorage.store).toHaveBeenCalledWith('hello world', { slug: 'g/node:1', mime: null, size: 11 }); const patch = builder.build(); expect(patch.ops).toHaveLength(3); expect(patch.ops).toContainEqual(expect.objectContaining({ @@ -79,15 +95,16 @@ describe('PatchBuilderV2 content attachment', () => { it('accepts optional content metadata and persists it alongside the blob oid', async () => { const state = createMockState(); orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockResolvedValue('abc123'), - }); + const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage({ storeOid: 'abc123' }); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await builder.attachContent('node:1', 'hello world', { @@ -109,15 +126,16 @@ describe('PatchBuilderV2 content attachment', () => { it('tracks blob OID in _contentBlobs', async () => { const state = createMockState(); orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockResolvedValue('abc123'), - }); + const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage({ storeOid: 'abc123' }); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await builder.attachContent('node:1', 'content'); @@ -129,30 +147,35 @@ describe('PatchBuilderV2 content attachment', () => { const state = createMockState(); orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage(); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); const result = await builder.attachContent('node:1', 'content'); expect(result).toBe(builder); }); - it('propagates writeBlob errors', async () => { + it('propagates blob storage errors', async () => { const state = createMockState(); orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockRejectedValue(new Error('disk full')), - }); + const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage(); + blobStorage.store = vi.fn().mockRejectedValue(new Error('disk full')); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await expect(builder.attachContent('node:1', 'x')).rejects.toThrow('disk full'); @@ -178,17 +201,20 @@ describe('PatchBuilderV2 content attachment', () => { const state = createMockState(); orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage(); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await expect(builder.attachContent('node:1', 'hello', { size: 9 })) .rejects.toThrow('content metadata size 9 does not match actual byte size 5'); - expect(persistence.writeBlob).not.toHaveBeenCalled(); + expect(blobStorage.store).not.toHaveBeenCalled(); expect(builder._contentBlobs).toEqual([]); }); }); @@ -256,20 +282,21 @@ describe('PatchBuilderV2 content attachment', () => { const edgeKey = encodeEdgeKey('a', 'b', 'rel'); orsetAdd(state.edgeAlive, edgeKey, createDot('w1', 1)); - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockResolvedValue('def456'), - }); + const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage({ storeOid: 'def456' }); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await builder.attachEdgeContent('a', 'b', 'rel', Buffer.from('binary')); - expect(persistence.writeBlob).toHaveBeenCalledWith(Buffer.from('binary')); + expect(blobStorage.store).toHaveBeenCalledWith(Buffer.from('binary'), { slug: 'g/a/b/rel', mime: null, size: 6 }); const patch = builder.build(); expect(patch.ops).toHaveLength(3); expect(patch.ops).toContainEqual(expect.objectContaining({ @@ -295,15 +322,16 @@ describe('PatchBuilderV2 content attachment', () => { const state = createMockState(); orsetAdd(state.edgeAlive, encodeEdgeKey('a', 'b', 'rel'), createDot('w1', 1)); - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockResolvedValue('def456'), - }); + const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage({ storeOid: 'def456' }); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await builder.attachEdgeContent('a', 'b', 'rel', 'content'); @@ -316,12 +344,15 @@ describe('PatchBuilderV2 content attachment', () => { orsetAdd(state.edgeAlive, encodeEdgeKey('a', 'b', 'rel'), createDot('w1', 1)); const persistence = createMockPersistence(); + const blobStorage = createMockBlobStorage(); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); const result = await builder.attachEdgeContent('a', 'b', 'rel', 'x'); @@ -409,25 +440,130 @@ describe('PatchBuilderV2 content attachment', () => { orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); orsetAdd(state.nodeAlive, 'node:2', createDot('w1', 2)); let callCount = 0; - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockImplementation(() => { - callCount++; - return Promise.resolve(`blob${callCount}`); - }), + const blobStorage = createMockBlobStorage(); + blobStorage.store = vi.fn().mockImplementation(() => { + callCount++; + return Promise.resolve(`blob${callCount}`); }); + const persistence = createMockPersistence(); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, + graphName: 'g', writerId: 'w1', lamport: 1, versionVector: createVersionVector(), getCurrentState: () => state, + blobStorage, })); await builder.attachContent('node:1', 'first'); await builder.attachContent('node:2', 'second'); expect(builder._contentBlobs).toEqual(['blob1', 'blob2']); - expect(persistence.writeBlob).toHaveBeenCalledTimes(2); + expect(blobStorage.store).toHaveBeenCalledTimes(2); + }); + }); + + describe('attachContent() with streaming input', () => { + it('accepts an AsyncIterable as content', async () => { + const state = createMockState(); + orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); + const blobStorage = { + store: vi.fn().mockResolvedValue('cas-oid-1'), + retrieve: vi.fn(), + storeStream: vi.fn().mockResolvedValue('cas-stream-oid'), + retrieveStream: vi.fn(), + }; + const builder = new PatchBuilderV2(/** @type {any} */ ({ + persistence: createMockPersistence(), + graphName: 'g', + writerId: 'w1', + lamport: 1, + versionVector: createVersionVector(), + getCurrentState: () => state, + blobStorage, + })); + + async function* source() { + yield new TextEncoder().encode('hello '); + yield new TextEncoder().encode('world'); + } + + await builder.attachContent('node:1', source(), { size: 11 }); + + // Should call storeStream, not store + expect(blobStorage.storeStream).toHaveBeenCalledOnce(); + expect(blobStorage.store).not.toHaveBeenCalled(); + const patch = builder.build(); + expect(patch.ops).toContainEqual(expect.objectContaining({ + type: 'PropSet', + node: 'node:1', + key: '_content', + value: 'cas-stream-oid', + })); + }); + + it('accepts a ReadableStream as content', async () => { + const state = createMockState(); + orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); + const blobStorage = { + store: vi.fn().mockResolvedValue('cas-oid-1'), + retrieve: vi.fn(), + storeStream: vi.fn().mockResolvedValue('cas-rs-oid'), + retrieveStream: vi.fn(), + }; + const builder = new PatchBuilderV2(/** @type {any} */ ({ + persistence: createMockPersistence(), + graphName: 'g', + writerId: 'w1', + lamport: 1, + versionVector: createVersionVector(), + getCurrentState: () => state, + blobStorage, + })); + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('rs content')); + controller.close(); + }, + }); + + await builder.attachContent('node:1', stream, { size: 10 }); + + expect(blobStorage.storeStream).toHaveBeenCalledOnce(); + }); + }); + + describe('attachEdgeContent() with streaming input', () => { + it('accepts an AsyncIterable as content', async () => { + const state = createMockState(); + orsetAdd(state.edgeAlive, encodeEdgeKey('a', 'b', 'rel'), createDot('w1', 1)); + const blobStorage = { + store: vi.fn().mockResolvedValue('cas-oid-1'), + retrieve: vi.fn(), + storeStream: vi.fn().mockResolvedValue('cas-edge-stream-oid'), + retrieveStream: vi.fn(), + }; + const builder = new PatchBuilderV2(/** @type {any} */ ({ + persistence: createMockPersistence(), + graphName: 'g', + writerId: 'w1', + lamport: 1, + versionVector: createVersionVector(), + getCurrentState: () => state, + blobStorage, + })); + + async function* source() { + yield new TextEncoder().encode('edge '); + yield new TextEncoder().encode('data'); + } + + await builder.attachEdgeContent('a', 'b', 'rel', source(), { size: 9 }); + + expect(blobStorage.storeStream).toHaveBeenCalledOnce(); + expect(blobStorage.store).not.toHaveBeenCalled(); }); }); @@ -467,12 +603,10 @@ describe('PatchBuilderV2 content attachment', () => { })); }); - it('falls back to persistence.writeBlob() when blobStorage is not provided', async () => { + it('throws NO_BLOB_STORAGE when blobStorage is not provided', async () => { const state = createMockState(); orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); - const persistence = createMockPersistence({ - writeBlob: vi.fn().mockResolvedValue('raw-blob-oid'), - }); + const persistence = createMockPersistence(); const builder = new PatchBuilderV2(/** @type {any} */ ({ persistence, writerId: 'w1', @@ -481,9 +615,33 @@ describe('PatchBuilderV2 content attachment', () => { getCurrentState: () => state, })); - await builder.attachContent('node:1', 'hello'); + await expect(builder.attachContent('node:1', 'hello')) + .rejects.toThrow('Cannot attach content without blob storage'); + expect(persistence.writeBlob).not.toHaveBeenCalled(); + }); + }); - expect(persistence.writeBlob).toHaveBeenCalledWith('hello'); + describe('no raw writeBlob fallback (OG-014)', () => { + it('throws when blobStorage is absent and content is attached', async () => { + // OG-014 mandates that CAS is mandatory. Once implemented, + // attachContent without blobStorage should throw, not silently + // fall back to persistence.writeBlob(). + const state = createMockState(); + orsetAdd(state.nodeAlive, 'node:1', createDot('w1', 1)); + const persistence = createMockPersistence(); + const builder = new PatchBuilderV2(/** @type {any} */ ({ + persistence, + writerId: 'w1', + lamport: 1, + versionVector: createVersionVector(), + getCurrentState: () => state, + // blobStorage intentionally omitted + })); + + // Should throw because there is no blob storage to handle content + await expect(builder.attachContent('node:1', 'hello')) + .rejects.toThrow(); + expect(persistence.writeBlob).not.toHaveBeenCalled(); }); }); @@ -522,10 +680,9 @@ describe('PatchBuilderV2 content attachment', () => { it('includes _content_ entries in tree when content blobs exist', async () => { const contentOid = 'a'.repeat(40); const patchBlobOid = 'b'.repeat(40); + const blobStorage = createMockBlobStorage({ storeOid: contentOid }); const persistence = createMockPersistence({ - writeBlob: vi.fn() - .mockResolvedValueOnce(contentOid) // attachContent writeBlob - .mockResolvedValueOnce(patchBlobOid), // commit() CBOR blob + writeBlob: vi.fn().mockResolvedValue(patchBlobOid), // commit() CBOR blob only writeTree: vi.fn().mockResolvedValue('c'.repeat(40)), }); const builder = new PatchBuilderV2(/** @type {any} */ ({ @@ -536,6 +693,7 @@ describe('PatchBuilderV2 content attachment', () => { versionVector: createVersionVector(), getCurrentState: () => null, expectedParentSha: null, + blobStorage, })); builder.addNode('n1'); @@ -546,7 +704,7 @@ describe('PatchBuilderV2 content attachment', () => { const treeEntries = persistence.writeTree.mock.calls[0][0]; expect(treeEntries).toHaveLength(2); expect(treeEntries[0]).toBe(`100644 blob ${patchBlobOid}\tpatch.cbor`); - expect(treeEntries[1]).toBe(`100644 blob ${contentOid}\t_content_${contentOid}`); + expect(treeEntries[1]).toBe(`040000 tree ${contentOid}\t_content_${contentOid}`); }); it('creates single-entry tree when no content blobs', async () => { @@ -570,14 +728,16 @@ describe('PatchBuilderV2 content attachment', () => { }); it('includes multiple _content_ entries for multiple attachments', async () => { - let blobIdx = 0; const contentA = '1'.repeat(40); const contentB = '2'.repeat(40); const patchBlob = '3'.repeat(40); - const blobOids = [contentA, contentB, patchBlob]; + let storeIdx = 0; + const storeOids = [contentA, contentB]; + const blobStorage = createMockBlobStorage(); + blobStorage.store = vi.fn().mockImplementation(() => + Promise.resolve(storeOids[storeIdx++])); const persistence = createMockPersistence({ - writeBlob: vi.fn().mockImplementation(() => - Promise.resolve(blobOids[blobIdx++])), + writeBlob: vi.fn().mockResolvedValue(patchBlob), // CBOR blob only writeTree: vi.fn().mockResolvedValue('4'.repeat(40)), }); const builder = new PatchBuilderV2(/** @type {any} */ ({ @@ -588,6 +748,7 @@ describe('PatchBuilderV2 content attachment', () => { versionVector: createVersionVector(), getCurrentState: () => null, expectedParentSha: null, + blobStorage, })); builder.addNode('n1').addNode('n2'); @@ -598,18 +759,16 @@ describe('PatchBuilderV2 content attachment', () => { const treeEntries = persistence.writeTree.mock.calls[0][0]; expect(treeEntries).toHaveLength(3); expect(treeEntries[0]).toContain('patch.cbor'); - expect(treeEntries[1]).toBe(`100644 blob ${contentA}\t_content_${contentA}`); - expect(treeEntries[2]).toBe(`100644 blob ${contentB}\t_content_${contentB}`); + expect(treeEntries[1]).toBe(`040000 tree ${contentA}\t_content_${contentA}`); + expect(treeEntries[2]).toBe(`040000 tree ${contentB}\t_content_${contentB}`); }); it('deduplicates tree entries when same content is attached to multiple nodes', async () => { const sharedOid = 'a'.repeat(40); const patchBlob = 'b'.repeat(40); - let callCount = 0; - const blobOids = [sharedOid, sharedOid, patchBlob]; + const blobStorage = createMockBlobStorage({ storeOid: sharedOid }); const persistence = createMockPersistence({ - writeBlob: vi.fn().mockImplementation(() => - Promise.resolve(blobOids[callCount++])), + writeBlob: vi.fn().mockResolvedValue(patchBlob), // CBOR blob only writeTree: vi.fn().mockResolvedValue('c'.repeat(40)), }); const builder = new PatchBuilderV2(/** @type {any} */ ({ @@ -620,6 +779,7 @@ describe('PatchBuilderV2 content attachment', () => { versionVector: createVersionVector(), getCurrentState: () => null, expectedParentSha: null, + blobStorage, })); builder.addNode('n1').addNode('n2'); @@ -630,7 +790,7 @@ describe('PatchBuilderV2 content attachment', () => { const treeEntries = persistence.writeTree.mock.calls[0][0]; expect(treeEntries).toHaveLength(2); expect(treeEntries[0]).toContain('patch.cbor'); - expect(treeEntries[1]).toBe(`100644 blob ${sharedOid}\t_content_${sharedOid}`); + expect(treeEntries[1]).toBe(`040000 tree ${sharedOid}\t_content_${sharedOid}`); }); }); }); diff --git a/test/unit/domain/services/TraversalService.test.js b/test/unit/domain/services/TraversalService.test.js deleted file mode 100644 index e4a9eba2..00000000 --- a/test/unit/domain/services/TraversalService.test.js +++ /dev/null @@ -1,9 +0,0 @@ -import { describe, it, expect } from 'vitest'; -import TraversalService from '../../../../src/domain/services/TraversalService.js'; -import CommitDagTraversalService from '../../../../src/domain/services/CommitDagTraversalService.js'; - -describe('TraversalService (deprecation alias)', () => { - it('default export is CommitDagTraversalService', () => { - expect(TraversalService).toBe(CommitDagTraversalService); - }); -}); diff --git a/test/unit/domain/services/TreeConstruction.determinism.test.js b/test/unit/domain/services/TreeConstruction.determinism.test.js index 913eb311..e9cae469 100644 --- a/test/unit/domain/services/TreeConstruction.determinism.test.js +++ b/test/unit/domain/services/TreeConstruction.determinism.test.js @@ -10,6 +10,7 @@ import { orsetAdd } from '../../../../src/domain/crdt/ORSet.js'; import { createDot } from '../../../../src/domain/crdt/Dot.js'; import { CONTENT_PROPERTY_KEY, encodeEdgePropKey } from '../../../../src/domain/services/KeyCodec.js'; import InMemoryGraphAdapter from '../../../../src/infrastructure/adapters/InMemoryGraphAdapter.js'; +import InMemoryBlobStorageAdapter from '../../../../src/domain/utils/defaultBlobStorage.js'; import NodeCryptoAdapter from '../../../../src/infrastructure/adapters/NodeCryptoAdapter.js'; const PROPERTY_TEST_SEED = 4242; @@ -49,6 +50,7 @@ async function createPatchTreeOid(contentIds, shuffleSeed) { versionVector: createVersionVector(), getCurrentState: () => null, expectedParentSha: null, + blobStorage: new InMemoryBlobStorageAdapter(), })); for (let i = 0; i < contentIds.length; i++) { diff --git a/test/unit/infrastructure/adapters/CasBlobAdapter.test.js b/test/unit/infrastructure/adapters/CasBlobAdapter.test.js index 6c987c71..4b23f914 100644 --- a/test/unit/infrastructure/adapters/CasBlobAdapter.test.js +++ b/test/unit/infrastructure/adapters/CasBlobAdapter.test.js @@ -3,6 +3,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; // Mock @git-stunts/git-cas (dynamic import used by _initCas) const mockReadManifest = vi.fn(); const mockRestore = vi.fn(); +const mockRestoreStream = vi.fn(); const mockStore = vi.fn(); const mockCreateTree = vi.fn(); @@ -14,6 +15,7 @@ class MockContentAddressableStore { lastConstructorArgs = opts; this.readManifest = mockReadManifest; this.restore = mockRestore; + this.restoreStream = mockRestoreStream; this.store = mockStore; this.createTree = mockCreateTree; } @@ -321,6 +323,152 @@ describe('CasBlobAdapter', () => { }); }); + describe('storeStream()', () => { + it('stores content from an async iterable via CAS and returns tree OID', async () => { + const manifest = { chunks: ['chunk1', 'chunk2'] }; + mockStore.mockResolvedValue(manifest); + mockCreateTree.mockResolvedValue('tree-oid-stream'); + + const adapter = new CasBlobAdapter({ + plumbing: makePlumbing(), + persistence: makePersistence(), + }); + + async function* source() { + yield new TextEncoder().encode('hello '); + yield new TextEncoder().encode('world'); + } + + const oid = await adapter.storeStream(source(), { slug: 'test/streamed' }); + + expect(oid).toBe('tree-oid-stream'); + expect(mockStore).toHaveBeenCalledOnce(); + expect(mockCreateTree).toHaveBeenCalledWith({ manifest }); + // The source passed to CAS store should be the async iterable (or wrapped) + const storeCall = mockStore.mock.calls[0][0]; + expect(storeCall.slug).toBe('test/streamed'); + }); + + it('passes encryptionKey to CAS store when configured', async () => { + mockStore.mockResolvedValue({}); + mockCreateTree.mockResolvedValue('tree-oid'); + + const encKey = new Uint8Array(32); + const adapter = new CasBlobAdapter({ + plumbing: makePlumbing(), + persistence: makePersistence(), + encryptionKey: encKey, + }); + + async function* source() { + yield new Uint8Array([1]); + } + + await adapter.storeStream(source()); + + const storeCall = mockStore.mock.calls[0][0]; + expect(storeCall.encryptionKey).toBe(encKey); + }); + }); + + describe('retrieveStream()', () => { + it('retrieves content as an async iterable via CAS restoreStream', async () => { + const manifest = { chunks: ['chunk1'] }; + const chunk1 = new TextEncoder().encode('hello '); + const chunk2 = new TextEncoder().encode('world'); + + mockReadManifest.mockResolvedValue(manifest); + mockRestoreStream.mockReturnValue((async function* () { + yield chunk1; + yield chunk2; + })()); + + const adapter = new CasBlobAdapter({ + plumbing: makePlumbing(), + persistence: makePersistence(), + }); + + const stream = adapter.retrieveStream('tree-oid-abc'); + const chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(2); + const result = new Uint8Array(chunks.reduce((n, c) => n + c.byteLength, 0)); + let offset = 0; + for (const c of chunks) { + result.set(c, offset); + offset += c.byteLength; + } + expect(new TextDecoder().decode(result)).toBe('hello world'); + }); + + it('falls back to single-chunk yield for legacy raw Git blobs', async () => { + const rawBuf = new TextEncoder().encode('legacy blob content'); + const persistence = makePersistence(); + persistence.readBlob.mockResolvedValue(rawBuf); + const casErr = Object.assign(new Error('No manifest entry'), { code: 'MANIFEST_NOT_FOUND' }); + mockReadManifest.mockRejectedValue(casErr); + + const adapter = new CasBlobAdapter({ + plumbing: makePlumbing(), + persistence, + }); + + const stream = adapter.retrieveStream('raw-blob-oid'); + const chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(1); + expect(chunks[0]).toBe(rawBuf); + expect(persistence.readBlob).toHaveBeenCalledWith('raw-blob-oid'); + }); + + it('passes encryptionKey to CAS restoreStream when configured', async () => { + const manifest = { chunks: ['chunk1'] }; + mockReadManifest.mockResolvedValue(manifest); + mockRestoreStream.mockReturnValue((async function* () { + yield new Uint8Array([1]); + })()); + + const encKey = new Uint8Array(32); + const adapter = new CasBlobAdapter({ + plumbing: makePlumbing(), + persistence: makePersistence(), + encryptionKey: encKey, + }); + + const stream = adapter.retrieveStream('tree-oid'); + for await (const _ of stream) { /* drain */ } + + expect(mockRestoreStream).toHaveBeenCalledWith( + expect.objectContaining({ manifest, encryptionKey: encKey }), + ); + }); + + it('throws E_MISSING_OBJECT when legacy fallback readBlob returns null', async () => { + const persistence = makePersistence(); + persistence.readBlob.mockResolvedValue(null); + const casErr = Object.assign(new Error('No manifest entry'), { code: 'MANIFEST_NOT_FOUND' }); + mockReadManifest.mockRejectedValue(casErr); + + const adapter = new CasBlobAdapter({ + plumbing: makePlumbing(), + persistence, + }); + + const stream = adapter.retrieveStream('ghost-oid'); + await expect(async () => { + for await (const _ of stream) { /* drain */ } + }).rejects.toMatchObject({ + code: PersistenceError.E_MISSING_OBJECT, + }); + }); + }); + describe('CAS initialization', () => { it('lazily initializes CAS on first store() call', async () => { mockStore.mockResolvedValue({}); diff --git a/test/unit/infrastructure/adapters/InMemoryBlobStorageAdapter.test.js b/test/unit/infrastructure/adapters/InMemoryBlobStorageAdapter.test.js new file mode 100644 index 00000000..30ad6d90 --- /dev/null +++ b/test/unit/infrastructure/adapters/InMemoryBlobStorageAdapter.test.js @@ -0,0 +1,146 @@ +import { describe, it, expect } from 'vitest'; + +import InMemoryBlobStorageAdapter from '../../../../src/domain/utils/defaultBlobStorage.js'; +import BlobStoragePort from '../../../../src/ports/BlobStoragePort.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Collects an async iterable into a single Uint8Array. */ +async function collect(/** @type {AsyncIterable} */ stream) { + const chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + const totalLength = chunks.reduce((sum, c) => sum + c.byteLength, 0); + const result = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.byteLength; + } + return result; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('InMemoryBlobStorageAdapter', () => { + it('extends BlobStoragePort', () => { + const adapter = new InMemoryBlobStorageAdapter(); + expect(adapter).toBeInstanceOf(BlobStoragePort); + }); + + describe('store() + retrieve() round-trip', () => { + it('stores and retrieves Uint8Array content', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + const content = new Uint8Array([10, 20, 30, 40]); + const oid = await adapter.store(content); + + expect(typeof oid).toBe('string'); + expect(oid.length).toBeGreaterThan(0); + + const result = await adapter.retrieve(oid); + expect(result).toEqual(content); + }); + + it('stores and retrieves string content', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + const oid = await adapter.store('hello world'); + + const result = await adapter.retrieve(oid); + expect(new TextDecoder().decode(result)).toBe('hello world'); + }); + + it('returns distinct OIDs for distinct content', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + const oid1 = await adapter.store('aaa'); + const oid2 = await adapter.store('bbb'); + expect(oid1).not.toBe(oid2); + }); + + it('returns the same OID for identical content (content-addressed)', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + const oid1 = await adapter.store('same'); + const oid2 = await adapter.store('same'); + expect(oid1).toBe(oid2); + }); + }); + + describe('storeStream() + retrieveStream() round-trip', () => { + it('stores from an async iterable and retrieves as an async iterable', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + const data = new TextEncoder().encode('streamed content'); + + async function* source() { + // Yield in two chunks + yield data.slice(0, 8); + yield data.slice(8); + } + + const oid = await adapter.storeStream(source()); + + expect(typeof oid).toBe('string'); + expect(oid.length).toBeGreaterThan(0); + + const stream = adapter.retrieveStream(oid); + const result = await collect(stream); + expect(new TextDecoder().decode(result)).toBe('streamed content'); + }); + + it('stores single-chunk streams', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + const data = new Uint8Array([1, 2, 3]); + + async function* source() { + yield data; + } + + const oid = await adapter.storeStream(source()); + const result = await collect(adapter.retrieveStream(oid)); + expect(result).toEqual(data); + }); + }); + + describe('cross-method compatibility', () => { + it('content stored via store() is retrievable via retrieveStream()', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + const oid = await adapter.store('buffered write'); + + const result = await collect(adapter.retrieveStream(oid)); + expect(new TextDecoder().decode(result)).toBe('buffered write'); + }); + + it('content stored via storeStream() is retrievable via retrieve()', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + + async function* source() { + yield new TextEncoder().encode('stream write'); + } + + const oid = await adapter.storeStream(source()); + const result = await adapter.retrieve(oid); + expect(new TextDecoder().decode(result)).toBe('stream write'); + }); + }); + + describe('error cases', () => { + it('retrieve() throws for unknown OID', async () => { + const adapter = new InMemoryBlobStorageAdapter(); + await expect(adapter.retrieve('nonexistent')).rejects.toThrow(); + }); + + it('retrieveStream() throws for unknown OID', () => { + const adapter = new InMemoryBlobStorageAdapter(); + // retrieveStream may throw synchronously or yield an error on first iteration + expect(() => { + const stream = adapter.retrieveStream('nonexistent'); + // Force iteration if it returns a lazy iterable + const iter = stream[Symbol.asyncIterator](); + return iter.next(); + }).toThrow(); + }); + }); +}); diff --git a/test/unit/ports/BlobStoragePort.test.js b/test/unit/ports/BlobStoragePort.test.js index 64b20e0e..7174aaa7 100644 --- a/test/unit/ports/BlobStoragePort.test.js +++ b/test/unit/ports/BlobStoragePort.test.js @@ -11,4 +11,17 @@ describe('BlobStoragePort', () => { const port = new BlobStoragePort(); await expect(port.retrieve('oid')).rejects.toThrow('not implemented'); }); + + it('storeStream() throws not implemented', async () => { + const port = new BlobStoragePort(); + async function* source() { + yield new Uint8Array([1, 2, 3]); + } + await expect(port.storeStream(source())).rejects.toThrow('not implemented'); + }); + + it('retrieveStream() throws not implemented', () => { + const port = new BlobStoragePort(); + expect(() => port.retrieveStream('oid')).toThrow('not implemented'); + }); }); diff --git a/test/unit/scripts/release-policy-shape.test.js b/test/unit/scripts/release-policy-shape.test.js index d2890fc8..22ddf6ae 100644 --- a/test/unit/scripts/release-policy-shape.test.js +++ b/test/unit/scripts/release-policy-shape.test.js @@ -30,7 +30,7 @@ const roadmap = readFileSync( describe('release policy shape', () => { it('keeps package and jsr versions aligned on the release branch', () => { expect(packageJson.version).toBe(jsrJson.version); - expect(packageJson.version).toBe('15.0.1'); + expect(packageJson.version).toBe('16.0.0'); }); it('does not require a README release feed anymore', () => { @@ -46,9 +46,9 @@ describe('release policy shape', () => { }); it('keeps the roadmap header honest about the current release and correction patch', () => { - expect(roadmap).toContain('**Current release on `main`:** v15.0.0'); - expect(roadmap).toContain('**Next intended release:** v15.0.1'); - expect(roadmap).toContain('`OG-010` is complete'); + expect(roadmap).toContain('**Current release on `main`:** v16.0.0'); + expect(roadmap).toContain('**Next intended release:** v16.0.1'); + expect(roadmap).toContain('v16.0.0 release'); }); it('keeps publish artifacts slim instead of shipping the full repo corpus', () => {