diff --git a/packages/uri-graph/.gitignore b/packages/uri-graph/.gitignore new file mode 100644 index 0000000..06e6038 --- /dev/null +++ b/packages/uri-graph/.gitignore @@ -0,0 +1,3 @@ +node_modules +dist +*.tsbuildinfo diff --git a/packages/uri-graph/README.md b/packages/uri-graph/README.md new file mode 100644 index 0000000..cee606e --- /dev/null +++ b/packages/uri-graph/README.md @@ -0,0 +1,148 @@ +# @statewalker/uri-graph + +## What it is + +A minimal persistent dependency graph for URI-shaped work. Every observable thing — a file, an extracted text, a chunk, an index, a transformed cell — is a URI. Each URI has a `Resource` (status, monotonic stamp, optional meta). Workers are async generators that consume an input stream of resources and yield an output stream. A fixpoint engine drives them to convergence. + +The package is small on purpose: one `Engine`, one `Store` interface with two interchangeable backends (`MemoryStore`, `SqlStore`), and a `topoLayers` helper for introspection. There is no run history, no staging table, no hash-based no-op rule, no scope/role machinery — those concerns either belong to the worker or are handled implicitly by the watermark. + +## Why it exists + +To replace heavier scanner-style pipelines with a single algorithm that works for: + +- file transformations (scan → extract → index) +- code transformations (TS/TSX → JS) +- code execution (ObservableHQ-style cells) +- in-process transient pipelines and durable ones, with the same code + +The algorithm: + +1. Each worker declares a single input scheme (`selects`) and a single output scheme (`emits`). +2. The engine reads the worker's last completion stamp. +3. The engine streams every resource whose latest event has `stamp > watermark` and whose URI begins with `selects` into the worker. +4. The worker yields output resources, each carrying a fresh stamp it minted via `ctx.newStamp()`. Each yield is persisted immediately by the engine. +5. On clean completion, the engine mints another stamp and writes it to `completions(worker, stamp)`. By construction this stamp is larger than every input or output stamp the run touched. +6. `stabilize()` repeats this round until no worker progressed. + +Crash safety falls out of the design: outputs are URI-keyed (idempotent on retry); the completion row is the last write of a run. A crashed run leaves no completion; the next round re-executes the same inputs and overwrites the same output URIs. + +## How to use + +```sh +pnpm add @statewalker/uri-graph +``` + +```ts +import { Engine, MemoryStore, type WorkerFn } from "@statewalker/uri-graph"; + +const store = new MemoryStore(); +const engine = new Engine(store); + +const scanner: WorkerFn = async function* (input, ctx) { + for await (const _tick of input) { + const stamp = await ctx.newStamp(); + yield { uri: "file://a.md", stamp, status: "added" }; + yield { uri: "file://b.md", stamp, status: "added" }; + } +}; + +const extractor: WorkerFn = async function* (input, ctx) { + for await (const r of input) { + if (!r.uri.endsWith(".md")) continue; + const stamp = await ctx.newStamp(); + yield { uri: `text://${r.uri.slice("file://".length)}`, stamp, status: r.status }; + } +}; + +await engine.register({ name: "scanner", selects: "tick://", emits: "file://" }, scanner); +await engine.register({ name: "extractor", selects: "file://", emits: "text://" }, extractor); + +// publish a tick to wake up the source worker +await store.put({ uri: "tick://run", stamp: await store.newStamp(), status: "updated" }); + +for await (const r of engine.stabilize()) { + console.log(r.uri, r.stamp, r.status); +} +``` + +### Choosing a backend + +| | `MemoryStore` | `SqlStore` | +|---|---|---| +| State | JS maps | libSQL/SQLite tables | +| Persistence | none (in-process) | the underlying `Db` | +| Best for | unit tests, in-memory cell evaluation, scratch pipelines | daemons, durable indexes, cross-restart work | + +```ts +import { newNodeTursoDb } from "@statewalker/db-turso-node"; +import { Engine, SqlStore } from "@statewalker/uri-graph"; + +const db = await newNodeTursoDb({ path: "./graph.db" }); +const store = new SqlStore(db); +const engine = new Engine(store); +// ... register, stabilize ... +await db.close(); // caller owns the Db +``` + +Workers do not branch on backend. The same `WorkerFn` runs against either store. + +### Operator actions + +- `store.invalidate(prefix)` — appends `'removed'` events for every live URI under the prefix. Downstream workers see these on the next `stabilize()` and cascade. +- `store.purgeResources({ keepLatestPerUri: true })` — collapses the event log to one row per URI. +- `store.purgeCompletions({ keepLatestPerWorker: N })` — trims completion history. +- `engine.unregister(name)` — removes the worker and its completion rows; resources stay. + +### Introspection + +```ts +import { topoLayers } from "@statewalker/uri-graph"; + +const layers = topoLayers(workers); +// [[w1, w2], [w3], [w4, w5]] — workers in the same layer have no dependency between them +``` + +`topoLayers` is for visualization and parallel scheduling decisions. The engine itself does not use it: the fixpoint loop is data-driven by stamps. + +## Internals + +### Storage shape + +Three append-only tables plus a worker registry and a stamp counter: + +``` +stamp_seq (id=1, next) — single-row counter, only UPDATE in the system +resources (uri, stamp, status, meta) PK (uri, stamp) +workers (name, selects, emits) CRUD +completions (worker, stamp, finished_at) PK (worker, stamp) +``` + +`get(uri)` reads `MAX(stamp)` per URI. `list({ prefix, afterStamp })` joins each URI's max-stamp row and filters. `allWatermarks()` is `SELECT worker, MAX(stamp) FROM completions GROUP BY worker`. The same shape backs `MemoryStore`. + +### Watermark semantics + +A row in `completions(worker, stamp)` means: *this worker has fully processed every resource that existed when that stamp was minted.* Because the completion stamp is minted after the run finishes, it is strictly greater than every output stamp produced by the run, which is in turn greater than every input stamp consumed. + +A worker that consumes input but produces nothing (filter case) still advances its watermark — the engine writes a completion row whenever the worker's input stream yielded at least one resource. A worker that finds no input writes no completion row and re-runs cheaply on the next round. + +### Crash safety + +- `put(resource)` is `INSERT OR REPLACE` keyed by `(uri, stamp)`; safe to re-emit the same URI/stamp pair. +- `markCompleted` runs only on clean generator completion; a crash leaves the watermark unchanged. +- On restart, the engine sees the unchanged watermark and re-runs the worker with the same inputs. Workers are required to be deterministic on URI keys; re-emitted outputs overwrite previous ones. + +### Constraints + +- Single-writer engine. Multi-process is out of scope; if you need it, run a single engine and feed work in. +- `selects` and `emits` are scheme prefixes, not glob patterns. If you need finer filtering, do it inside the worker. +- Workers must produce deterministic output URIs (a function of input, never of time/randomness). Re-running yields the same URIs and overwrites prior values. +- `MemoryStore` keeps full state in memory; not intended for huge graphs. + +## Related + +- `@statewalker/db-api` — abstract `Db` interface used by `SqlStore`. +- `@statewalker/db-turso-node`, `@statewalker/db-turso-browser` — libSQL adapters that implement `Db`. + +## License + +MIT. diff --git a/packages/uri-graph/package.json b/packages/uri-graph/package.json new file mode 100644 index 0000000..9b52252 --- /dev/null +++ b/packages/uri-graph/package.json @@ -0,0 +1,49 @@ +{ + "name": "@statewalker/uri-graph", + "version": "0.2.0", + "private": false, + "type": "module", + "description": "Minimal persistent URI dependency graph: workers as async generators, stamp watermarks, fixpoint stabilizer over an in-memory or SQL store.", + "homepage": "https://github.com/statewalker/statewalker-content", + "author": { + "name": "Mikhail Kotelnikov", + "email": "mikhail.kotelnikov@gmail.com" + }, + "license": "MIT", + "repository": { + "type": "git", + "url": "git+ssh://git@github.com/statewalker/statewalker-content.git" + }, + "exports": { + ".": "./src/index.ts" + }, + "files": [ + "dist", + "src" + ], + "scripts": { + "build": "tsdown", + "dev": "tsdown --watch", + "test": "vitest run", + "test:watch": "vitest", + "typecheck": "tsc --noEmit", + "clean": "rimraf dist", + "lint": "biome check --write .", + "format": "biome format --write ." + }, + "dependencies": { + "@statewalker/db-api": "workspace:*" + }, + "devDependencies": { + "@statewalker/db-turso-node": "workspace:*", + "@types/node": "catalog:", + "rimraf": "catalog:", + "tsdown": "catalog:", + "typescript": "catalog:", + "vitest": "catalog:" + }, + "sideEffects": false, + "publishConfig": { + "access": "public" + } +} diff --git a/packages/uri-graph/src/engine.ts b/packages/uri-graph/src/engine.ts new file mode 100644 index 0000000..83a860d --- /dev/null +++ b/packages/uri-graph/src/engine.ts @@ -0,0 +1,69 @@ +import type { Store } from "./store/store.js"; +import type { Resource, Worker, WorkerContext, WorkerFn } from "./types.js"; + +export class Engine { + private fns = new Map(); + + constructor(private store: Store) {} + + async register(worker: Worker, fn: WorkerFn): Promise { + await this.store.saveWorker(worker); + this.fns.set(worker.name, fn); + } + + async unregister(name: string): Promise { + await this.store.deleteWorker(name); + this.fns.delete(name); + } + + async *runWorker(name: string): AsyncIterable { + const worker = await this.store.getWorker(name); + if (!worker) return; + const watermarks = await this.store.allWatermarks(); + yield* this.runOne(worker, watermarks.get(name) ?? 0); + } + + async *stabilize(): AsyncIterable { + for (;;) { + const watermarks = await this.store.allWatermarks(); + let progressed = false; + for await (const worker of this.store.listWorkers()) { + const watermark = watermarks.get(worker.name) ?? 0; + for await (const r of this.runOne(worker, watermark)) { + progressed = true; + yield r; + } + } + if (!progressed) break; + } + } + + private async *runOne(worker: Worker, watermark: number): AsyncIterable { + const fn = this.fns.get(worker.name); + if (!fn) return; + + const store = this.store; + let consumed = false; + const input = (async function* () { + for await (const r of store.list({ prefix: worker.selects, afterStamp: watermark })) { + consumed = true; + yield r; + } + })(); + + const ctx: WorkerContext = { + newStamp: () => store.newStamp(), + read: (uri) => store.get(uri), + }; + + for await (const out of fn(input, ctx)) { + await store.put(out); + yield out; + } + + if (consumed) { + const completionStamp = await store.newStamp(); + await store.markCompleted(worker.name, completionStamp); + } + } +} diff --git a/packages/uri-graph/src/index.ts b/packages/uri-graph/src/index.ts new file mode 100644 index 0000000..98a7793 --- /dev/null +++ b/packages/uri-graph/src/index.ts @@ -0,0 +1,11 @@ +export { Engine } from "./engine.js"; +export { MemoryStore } from "./store/memory.js"; +export { SqlStore } from "./store/sql.js"; +export type { + ListOptions, + PurgeCompletionsOptions, + PurgeResourcesOptions, + Store, +} from "./store/store.js"; +export { topoLayers } from "./topo-layers.js"; +export type { Resource, Status, Worker, WorkerContext, WorkerFn } from "./types.js"; diff --git a/packages/uri-graph/src/store/memory.ts b/packages/uri-graph/src/store/memory.ts new file mode 100644 index 0000000..578e273 --- /dev/null +++ b/packages/uri-graph/src/store/memory.ts @@ -0,0 +1,113 @@ +import type { Resource, Worker } from "../types.js"; +import type { + ListOptions, + PurgeCompletionsOptions, + PurgeResourcesOptions, + Store, +} from "./store.js"; + +export class MemoryStore implements Store { + private nextStampValue = 1; + private resourcesByUri = new Map(); + private workers = new Map(); + private completionsByWorker = new Map(); + + async newStamp(): Promise { + return this.nextStampValue++; + } + + async put(resource: Resource): Promise { + if (resource.stamp >= this.nextStampValue) { + this.nextStampValue = resource.stamp + 1; + } + const arr = this.resourcesByUri.get(resource.uri); + if (arr) { + arr.push(resource); + } else { + this.resourcesByUri.set(resource.uri, [resource]); + } + } + + async get(uri: string): Promise { + const arr = this.resourcesByUri.get(uri); + if (!arr || arr.length === 0) return undefined; + return arr[arr.length - 1]; + } + + async *list(options: ListOptions): AsyncIterable { + const after = options.afterStamp ?? 0; + const matches: Resource[] = []; + for (const [uri, arr] of this.resourcesByUri) { + if (!uri.startsWith(options.prefix)) continue; + const latest = arr[arr.length - 1]; + if (latest && latest.stamp > after) matches.push(latest); + } + matches.sort((a, b) => a.stamp - b.stamp); + for (const r of matches) yield r; + } + + async saveWorker(worker: Worker): Promise { + this.workers.set(worker.name, { ...worker }); + } + + async deleteWorker(name: string): Promise { + this.workers.delete(name); + this.completionsByWorker.delete(name); + } + + async getWorker(name: string): Promise { + const w = this.workers.get(name); + return w ? { ...w } : undefined; + } + + async *listWorkers(): AsyncIterable { + for (const w of this.workers.values()) yield { ...w }; + } + + async markCompleted(worker: string, stamp: number): Promise { + const arr = this.completionsByWorker.get(worker); + if (arr) arr.push(stamp); + else this.completionsByWorker.set(worker, [stamp]); + } + + async allWatermarks(): Promise> { + const result = new Map(); + for (const [worker, stamps] of this.completionsByWorker) { + let max = 0; + for (const s of stamps) if (s > max) max = s; + result.set(worker, max); + } + return result; + } + + async invalidate(prefix: string): Promise { + const stamp = await this.newStamp(); + for (const [uri, arr] of this.resourcesByUri) { + if (!uri.startsWith(prefix)) continue; + const latest = arr[arr.length - 1]; + if (!latest || latest.status === "removed") continue; + arr.push({ uri, stamp, status: "removed" }); + } + } + + async purgeResources(options?: PurgeResourcesOptions): Promise { + if (options?.keepLatestPerUri !== true) return; + for (const [uri, arr] of this.resourcesByUri) { + if (arr.length > 1) { + const latest = arr[arr.length - 1]; + if (latest) this.resourcesByUri.set(uri, [latest]); + } + } + } + + async purgeCompletions(options?: PurgeCompletionsOptions): Promise { + const keep = options?.keepLatestPerWorker; + if (keep === undefined || keep < 1) return; + for (const [worker, stamps] of this.completionsByWorker) { + if (stamps.length > keep) { + stamps.sort((a, b) => a - b); + this.completionsByWorker.set(worker, stamps.slice(stamps.length - keep)); + } + } + } +} diff --git a/packages/uri-graph/src/store/sql.ts b/packages/uri-graph/src/store/sql.ts new file mode 100644 index 0000000..ebedc09 --- /dev/null +++ b/packages/uri-graph/src/store/sql.ts @@ -0,0 +1,224 @@ +import type { Db } from "@statewalker/db-api"; +import type { Resource, Status, Worker } from "../types.js"; +import type { + ListOptions, + PurgeCompletionsOptions, + PurgeResourcesOptions, + Store, +} from "./store.js"; + +const SCHEMA = ` +CREATE TABLE IF NOT EXISTS stamp_seq ( + id INTEGER PRIMARY KEY CHECK (id = 1), + next INTEGER NOT NULL +); +INSERT OR IGNORE INTO stamp_seq (id, next) VALUES (1, 1); + +CREATE TABLE IF NOT EXISTS resources ( + uri TEXT NOT NULL, + stamp INTEGER NOT NULL, + status TEXT NOT NULL, + meta TEXT, + PRIMARY KEY (uri, stamp) +); +CREATE INDEX IF NOT EXISTS resources_stamp ON resources(stamp); + +CREATE TABLE IF NOT EXISTS workers ( + name TEXT PRIMARY KEY, + selects TEXT NOT NULL, + emits TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS completions ( + worker TEXT NOT NULL, + stamp INTEGER NOT NULL, + finished_at INTEGER NOT NULL, + PRIMARY KEY (worker, stamp) +); +CREATE INDEX IF NOT EXISTS completions_worker_stamp ON completions(worker, stamp DESC); +`; + +type ResourceRow = { + uri: string; + stamp: number; + status: string; + meta: string | null; +}; + +type WorkerRow = { + name: string; + selects: string; + emits: string; +}; + +function rowToResource(row: ResourceRow): Resource { + const r: Resource = { + uri: row.uri, + stamp: row.stamp, + status: row.status as Status, + }; + if (row.meta !== null) r.meta = JSON.parse(row.meta); + return r; +} + +export class SqlStore implements Store { + private initialized = false; + + constructor(private db: Db) {} + + private async ensureInit(): Promise { + if (this.initialized) return; + for (const stmt of SCHEMA.split(";")) { + const trimmed = stmt.trim(); + if (trimmed) await this.db.exec(trimmed); + } + this.initialized = true; + } + + async newStamp(): Promise { + await this.ensureInit(); + const rows = await this.db.query<{ next: number }>( + "UPDATE stamp_seq SET next = next + 1 WHERE id = 1 RETURNING next - 1 AS next", + ); + const row = rows[0]; + if (!row) throw new Error("stamp_seq is missing"); + return row.next; + } + + async put(resource: Resource): Promise { + await this.ensureInit(); + const meta = resource.meta === undefined ? null : JSON.stringify(resource.meta); + await this.db.query( + "INSERT OR REPLACE INTO resources (uri, stamp, status, meta) VALUES (?, ?, ?, ?)", + [resource.uri, resource.stamp, resource.status, meta], + ); + await this.db.query("UPDATE stamp_seq SET next = MAX(next, ? + 1) WHERE id = 1", [ + resource.stamp, + ]); + } + + async get(uri: string): Promise { + await this.ensureInit(); + const rows = await this.db.query( + "SELECT uri, stamp, status, meta FROM resources WHERE uri = ? ORDER BY stamp DESC LIMIT 1", + [uri], + ); + const row = rows[0]; + return row ? rowToResource(row) : undefined; + } + + async *list(options: ListOptions): AsyncIterable { + await this.ensureInit(); + const after = options.afterStamp ?? 0; + const rows = await this.db.query( + `WITH latest AS ( + SELECT uri, MAX(stamp) AS stamp FROM resources + WHERE uri LIKE ? || '%' + GROUP BY uri + ) + SELECT r.uri, r.stamp, r.status, r.meta + FROM latest l + JOIN resources r ON r.uri = l.uri AND r.stamp = l.stamp + WHERE r.stamp > ? + ORDER BY r.stamp ASC, r.uri ASC`, + [options.prefix, after], + ); + for (const row of rows) yield rowToResource(row); + } + + async saveWorker(worker: Worker): Promise { + await this.ensureInit(); + await this.db.query( + `INSERT INTO workers (name, selects, emits) VALUES (?, ?, ?) + ON CONFLICT(name) DO UPDATE SET selects = excluded.selects, emits = excluded.emits`, + [worker.name, worker.selects, worker.emits], + ); + } + + async deleteWorker(name: string): Promise { + await this.ensureInit(); + await this.db.query("DELETE FROM workers WHERE name = ?", [name]); + await this.db.query("DELETE FROM completions WHERE worker = ?", [name]); + } + + async getWorker(name: string): Promise { + await this.ensureInit(); + const rows = await this.db.query( + "SELECT name, selects, emits FROM workers WHERE name = ?", + [name], + ); + const row = rows[0]; + return row ? { name: row.name, selects: row.selects, emits: row.emits } : undefined; + } + + async *listWorkers(): AsyncIterable { + await this.ensureInit(); + const rows = await this.db.query( + "SELECT name, selects, emits FROM workers ORDER BY name ASC", + ); + for (const row of rows) yield { name: row.name, selects: row.selects, emits: row.emits }; + } + + async markCompleted(worker: string, stamp: number): Promise { + await this.ensureInit(); + await this.db.query( + "INSERT OR REPLACE INTO completions (worker, stamp, finished_at) VALUES (?, ?, ?)", + [worker, stamp, Date.now()], + ); + } + + async allWatermarks(): Promise> { + await this.ensureInit(); + const rows = await this.db.query<{ worker: string; stamp: number }>( + "SELECT worker, MAX(stamp) AS stamp FROM completions GROUP BY worker", + ); + const result = new Map(); + for (const row of rows) result.set(row.worker, row.stamp); + return result; + } + + async invalidate(prefix: string): Promise { + await this.ensureInit(); + const stamp = await this.newStamp(); + await this.db.query( + `INSERT OR REPLACE INTO resources (uri, stamp, status, meta) + SELECT r.uri, ?, 'removed', NULL + FROM ( + SELECT uri, MAX(stamp) AS stamp FROM resources + WHERE uri LIKE ? || '%' + GROUP BY uri + ) l + JOIN resources r ON r.uri = l.uri AND r.stamp = l.stamp + WHERE r.status != 'removed'`, + [stamp, prefix], + ); + } + + async purgeResources(options?: PurgeResourcesOptions): Promise { + await this.ensureInit(); + if (options?.keepLatestPerUri !== true) return; + await this.db.exec( + `DELETE FROM resources + WHERE (uri, stamp) NOT IN ( + SELECT uri, MAX(stamp) FROM resources GROUP BY uri + )`, + ); + } + + async purgeCompletions(options?: PurgeCompletionsOptions): Promise { + await this.ensureInit(); + const keep = options?.keepLatestPerWorker; + if (keep === undefined || keep < 1) return; + await this.db.query( + `DELETE FROM completions + WHERE rowid NOT IN ( + SELECT rowid FROM ( + SELECT rowid, + ROW_NUMBER() OVER (PARTITION BY worker ORDER BY stamp DESC) AS rn + FROM completions + ) WHERE rn <= ? + )`, + [keep], + ); + } +} diff --git a/packages/uri-graph/src/store/store.ts b/packages/uri-graph/src/store/store.ts new file mode 100644 index 0000000..19723e0 --- /dev/null +++ b/packages/uri-graph/src/store/store.ts @@ -0,0 +1,35 @@ +import type { Resource, Worker } from "../types.js"; + +export type ListOptions = { + prefix: string; + afterStamp?: number; +}; + +export type PurgeResourcesOptions = { + keepLatestPerUri?: boolean; +}; + +export type PurgeCompletionsOptions = { + keepLatestPerWorker?: number; +}; + +export interface Store { + newStamp(): Promise; + + put(resource: Resource): Promise; + get(uri: string): Promise; + list(options: ListOptions): AsyncIterable; + + saveWorker(worker: Worker): Promise; + deleteWorker(name: string): Promise; + getWorker(name: string): Promise; + listWorkers(): AsyncIterable; + + markCompleted(worker: string, stamp: number): Promise; + allWatermarks(): Promise>; + + invalidate(prefix: string): Promise; + + purgeResources(options?: PurgeResourcesOptions): Promise; + purgeCompletions(options?: PurgeCompletionsOptions): Promise; +} diff --git a/packages/uri-graph/src/topo-layers.ts b/packages/uri-graph/src/topo-layers.ts new file mode 100644 index 0000000..bcd790a --- /dev/null +++ b/packages/uri-graph/src/topo-layers.ts @@ -0,0 +1,45 @@ +import type { Worker } from "./types.js"; + +export function topoLayers(workers: Worker[]): Worker[][] { + const upstreamsOf = new Map>(); + for (const w of workers) { + const upstreams = new Set(); + if (w.selects !== "") { + for (const u of workers) { + if (u.name === w.name) continue; + if (u.emits === "") continue; + if (u.emits.startsWith(w.selects) || w.selects.startsWith(u.emits)) { + upstreams.add(u.name); + } + } + } + upstreamsOf.set(w.name, upstreams); + } + + const layers: Worker[][] = []; + const placed = new Set(); + while (placed.size < workers.length) { + const layer: Worker[] = []; + for (const w of workers) { + if (placed.has(w.name)) continue; + const upstreams = upstreamsOf.get(w.name); + if (!upstreams) continue; + let ready = true; + for (const u of upstreams) { + if (!placed.has(u)) { + ready = false; + break; + } + } + if (ready) layer.push(w); + } + if (layer.length === 0) { + const remaining = workers.filter((w) => !placed.has(w.name)).map((w) => w.name); + throw new Error(`cycle detected among workers: ${remaining.join(", ")}`); + } + layer.sort((a, b) => a.name.localeCompare(b.name)); + for (const w of layer) placed.add(w.name); + layers.push(layer); + } + return layers; +} diff --git a/packages/uri-graph/src/types.ts b/packages/uri-graph/src/types.ts new file mode 100644 index 0000000..1c4bd9a --- /dev/null +++ b/packages/uri-graph/src/types.ts @@ -0,0 +1,24 @@ +export type Status = "added" | "updated" | "removed"; + +export type Resource = { + uri: string; + stamp: number; + status: Status; + meta?: unknown; +}; + +export type Worker = { + name: string; + selects: string; + emits: string; +}; + +export type WorkerContext = { + newStamp: () => Promise; + read: (uri: string) => Promise; +}; + +export type WorkerFn = ( + input: AsyncIterable, + ctx: WorkerContext, +) => AsyncGenerator; diff --git a/packages/uri-graph/tests/e2e/pipeline.test.ts b/packages/uri-graph/tests/e2e/pipeline.test.ts new file mode 100644 index 0000000..c9bd354 --- /dev/null +++ b/packages/uri-graph/tests/e2e/pipeline.test.ts @@ -0,0 +1,110 @@ +import { newNodeTursoDb } from "@statewalker/db-turso-node"; +import { describe, expect, it } from "vitest"; +import { Engine, MemoryStore, SqlStore, type Store, type WorkerFn } from "../../src/index.js"; + +type StoreFactory = () => Promise<{ store: Store; close: () => Promise }>; + +const factories: Array<{ name: string; make: StoreFactory }> = [ + { + name: "MemoryStore", + make: async () => ({ store: new MemoryStore(), close: async () => {} }), + }, + { + name: "SqlStore", + make: async () => { + const db = await newNodeTursoDb(); + return { store: new SqlStore(db), close: async () => db.close() }; + }, + }, +]; + +async function collect(it: AsyncIterable): Promise { + const out: T[] = []; + for await (const x of it) out.push(x); + return out; +} + +for (const { name, make } of factories) { + describe(`Pipeline e2e — ${name}`, () => { + it("scanner → extractor (md only) → indexer", async () => { + const { store, close } = await make(); + try { + const engine = new Engine(store); + + const scanner: WorkerFn = async function* (input, ctx) { + for await (const _tick of input) { + const stamp = await ctx.newStamp(); + yield { uri: "file://a.md", stamp, status: "added" }; + yield { uri: "file://b.png", stamp, status: "added" }; + yield { uri: "file://c.md", stamp, status: "added" }; + } + }; + + const extractor: WorkerFn = async function* (input, ctx) { + for await (const r of input) { + if (!r.uri.endsWith(".md")) continue; + const stamp = await ctx.newStamp(); + yield { uri: `text://${r.uri.slice("file://".length)}`, stamp, status: "added" }; + } + }; + + const indexer: WorkerFn = async function* (input, ctx) { + for await (const r of input) { + const stamp = await ctx.newStamp(); + yield { uri: `db://${r.uri.slice("text://".length)}`, stamp, status: "added" }; + } + }; + + await store.put({ + uri: "tick://run", + stamp: await store.newStamp(), + status: "updated", + }); + + await engine.register({ name: "scanner", selects: "tick://", emits: "file://" }, scanner); + await engine.register( + { name: "extractor", selects: "file://", emits: "text://" }, + extractor, + ); + await engine.register({ name: "indexer", selects: "text://", emits: "db://" }, indexer); + + await collect(engine.stabilize()); + + const dbRows = await collect(store.list({ prefix: "db://" })); + expect(dbRows.map((r) => r.uri).sort()).toEqual(["db://a.md", "db://c.md"]); + + const png = await store.get("text://b.png"); + expect(png).toBeUndefined(); + } finally { + await close(); + } + }); + + it("invalidate triggers downstream re-execution", async () => { + const { store, close } = await make(); + try { + const engine = new Engine(store); + + const stamp1 = await store.newStamp(); + await store.put({ uri: "file://x", stamp: stamp1, status: "added" }); + + const echo: WorkerFn = async function* (input, ctx) { + for await (const r of input) { + const stamp = await ctx.newStamp(); + yield { uri: `text://${r.uri.slice("file://".length)}`, stamp, status: r.status }; + } + }; + + await engine.register({ name: "echo", selects: "file://", emits: "text://" }, echo); + await collect(engine.stabilize()); + expect((await store.get("text://x"))?.status).toBe("added"); + + await store.invalidate("file://"); + await collect(engine.stabilize()); + expect((await store.get("text://x"))?.status).toBe("removed"); + } finally { + await close(); + } + }); + }); +} diff --git a/packages/uri-graph/tests/engine.test.ts b/packages/uri-graph/tests/engine.test.ts new file mode 100644 index 0000000..57cce84 --- /dev/null +++ b/packages/uri-graph/tests/engine.test.ts @@ -0,0 +1,248 @@ +import { describe, expect, it } from "vitest"; +import { Engine, MemoryStore, type Resource, type WorkerFn } from "../src/index.js"; + +async function collect(it: AsyncIterable): Promise { + const out: T[] = []; + for await (const x of it) out.push(x); + return out; +} + +describe("Engine.runWorker", () => { + it("runs a worker with empty input and writes no completion", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + const fn: WorkerFn = async function* (input, ctx) { + for await (const r of input) { + yield { uri: r.uri, stamp: await ctx.newStamp(), status: "added" }; + } + }; + + await engine.register({ name: "w", selects: "x://", emits: "y://" }, fn); + + const out = await collect(engine.runWorker("w")); + expect(out).toEqual([]); + + const wm = await store.allWatermarks(); + expect(wm.get("w")).toBeUndefined(); + }); + + it("runs a worker, persists outputs, and writes a completion stamp", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + await store.put({ uri: "file://a", stamp: 1, status: "added" }); + await store.put({ uri: "file://b", stamp: 2, status: "added" }); + + const fn: WorkerFn = async function* (input, ctx) { + const stamp = await ctx.newStamp(); + for await (const r of input) { + yield { + uri: `text://${r.uri.slice("file://".length)}`, + stamp, + status: "updated", + }; + } + }; + + await engine.register({ name: "extractor", selects: "file://", emits: "text://" }, fn); + + const out: Resource[] = await collect(engine.runWorker("extractor")); + expect(out.map((r) => r.uri).sort()).toEqual(["text://a", "text://b"]); + + const wm = await store.allWatermarks(); + const completion = wm.get("extractor"); + expect(completion).toBeDefined(); + if (!completion) return; + + for (const r of out) expect(r.stamp).toBeLessThan(completion); + }); + + it("filter worker (consumes input, produces nothing) still bumps watermark", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + await store.put({ uri: "file://a.png", stamp: 1, status: "added" }); + await store.put({ uri: "file://b.png", stamp: 2, status: "added" }); + + const fn: WorkerFn = async function* (input, ctx) { + for await (const r of input) { + if (r.uri.endsWith(".md")) { + yield { uri: `text://${r.uri}`, stamp: await ctx.newStamp(), status: "added" }; + } + } + }; + + await engine.register({ name: "filter", selects: "file://", emits: "text://" }, fn); + + await collect(engine.runWorker("filter")); + + const wm = await store.allWatermarks(); + const first = wm.get("filter"); + expect(first).toBeDefined(); + if (!first) return; + expect(first).toBeGreaterThanOrEqual(2); + + await collect(engine.runWorker("filter")); + + const wm2 = await store.allWatermarks(); + expect(wm2.get("filter")).toBe(first); + }); + + it("does not advance watermark when worker generator throws", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + await store.put({ uri: "file://a", stamp: 1, status: "added" }); + + const fn: WorkerFn = async function* (input, ctx) { + for await (const r of input) { + if (r.uri) throw new Error("boom"); + yield { uri: r.uri, stamp: await ctx.newStamp(), status: "added" }; + } + }; + + await engine.register({ name: "w", selects: "file://", emits: "x://" }, fn); + + await expect(collect(engine.runWorker("w"))).rejects.toThrow("boom"); + + const wm = await store.allWatermarks(); + expect(wm.get("w")).toBeUndefined(); + }); + + it("re-running with no new inputs is a no-op", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + await store.put({ uri: "file://a", stamp: 1, status: "added" }); + + let runs = 0; + const fn: WorkerFn = async function* (input, ctx) { + runs++; + const stamp = await ctx.newStamp(); + for await (const r of input) { + yield { uri: `text://${r.uri}`, stamp, status: "updated" }; + } + }; + + await engine.register({ name: "x", selects: "file://", emits: "text://" }, fn); + + await collect(engine.runWorker("x")); + expect(runs).toBe(1); + + await collect(engine.runWorker("x")); + expect(runs).toBe(2); + const got = await collect(store.list({ prefix: "text://" })); + expect(got.length).toBe(1); + }); +}); + +describe("Engine.stabilize", () => { + it("cascades through a 3-stage pipeline", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + const scanner: WorkerFn = async function* (input, ctx) { + for await (const _tick of input) { + const stamp = await ctx.newStamp(); + yield { uri: "file://a", stamp, status: "added" }; + yield { uri: "file://b", stamp, status: "added" }; + } + }; + + const extractor: WorkerFn = async function* (input, ctx) { + const stamp = await ctx.newStamp(); + for await (const r of input) { + yield { uri: `text://${r.uri.slice("file://".length)}`, stamp, status: "updated" }; + } + }; + + const indexer: WorkerFn = async function* (input, ctx) { + const stamp = await ctx.newStamp(); + for await (const r of input) { + yield { uri: `db://${r.uri.slice("text://".length)}`, stamp, status: "updated" }; + } + }; + + await store.put({ uri: "tick://run", stamp: await store.newStamp(), status: "updated" }); + + await engine.register({ name: "scanner", selects: "tick://", emits: "file://" }, scanner); + await engine.register({ name: "extractor", selects: "file://", emits: "text://" }, extractor); + await engine.register({ name: "indexer", selects: "text://", emits: "db://" }, indexer); + + await collect(engine.stabilize()); + + const dbRows = await collect(store.list({ prefix: "db://" })); + expect(dbRows.map((r) => r.uri).sort()).toEqual(["db://a", "db://b"]); + }); + + it("stabilize converges (terminates) when no worker has new input", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + const fn: WorkerFn = async function* (input, ctx) { + const stamp = await ctx.newStamp(); + for await (const r of input) { + yield { uri: `text://${r.uri}`, stamp, status: "updated" }; + } + }; + + await store.put({ uri: "file://a", stamp: await store.newStamp(), status: "added" }); + await engine.register({ name: "w", selects: "file://", emits: "text://" }, fn); + + const out1 = await collect(engine.stabilize()); + expect(out1.length).toBeGreaterThan(0); + + const out2 = await collect(engine.stabilize()); + expect(out2).toEqual([]); + }); + + it("invalidate triggers re-execution of downstream workers", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + const extractor: WorkerFn = async function* (input, ctx) { + const stamp = await ctx.newStamp(); + for await (const r of input) { + if (r.status === "removed") { + yield { uri: `text://${r.uri}`, stamp, status: "removed" }; + } else { + yield { uri: `text://${r.uri}`, stamp, status: "updated" }; + } + } + }; + + await store.put({ uri: "file://a", stamp: await store.newStamp(), status: "added" }); + await engine.register({ name: "extractor", selects: "file://", emits: "text://" }, extractor); + + await collect(engine.stabilize()); + expect((await store.get("text://file://a"))?.status).toBe("updated"); + + await store.invalidate("file://"); + await collect(engine.stabilize()); + expect((await store.get("text://file://a"))?.status).toBe("removed"); + }); +}); + +describe("Engine.unregister", () => { + it("removes worker and clears completions", async () => { + const store = new MemoryStore(); + const engine = new Engine(store); + + await store.put({ uri: "file://a", stamp: await store.newStamp(), status: "added" }); + + const fn: WorkerFn = async function* (input, ctx) { + const stamp = await ctx.newStamp(); + for await (const r of input) { + yield { uri: `text://${r.uri}`, stamp, status: "updated" }; + } + }; + await engine.register({ name: "w", selects: "file://", emits: "text://" }, fn); + await collect(engine.stabilize()); + + expect((await store.allWatermarks()).get("w")).toBeDefined(); + await engine.unregister("w"); + expect((await store.allWatermarks()).get("w")).toBeUndefined(); + expect(await store.getWorker("w")).toBeUndefined(); + }); +}); diff --git a/packages/uri-graph/tests/store/contract.ts b/packages/uri-graph/tests/store/contract.ts new file mode 100644 index 0000000..cd7b566 --- /dev/null +++ b/packages/uri-graph/tests/store/contract.ts @@ -0,0 +1,200 @@ +import { describe, expect, it } from "vitest"; +import type { Resource } from "../../src/index.js"; +import type { Store } from "../../src/store/store.js"; + +export type StoreFactory = () => Promise<{ store: Store; close: () => Promise }>; + +async function collect(it: AsyncIterable): Promise { + const out: T[] = []; + for await (const x of it) out.push(x); + return out; +} + +export function defineStoreContract(name: string, factory: StoreFactory): void { + describe(`${name} — Store contract`, () => { + it("mints monotonically increasing stamps", async () => { + const { store, close } = await factory(); + try { + const a = await store.newStamp(); + const b = await store.newStamp(); + const c = await store.newStamp(); + expect(a < b).toBe(true); + expect(b < c).toBe(true); + } finally { + await close(); + } + }); + + it("put + get returns the latest event for a uri", async () => { + const { store, close } = await factory(); + try { + await store.put({ uri: "file://a", stamp: 1, status: "added" }); + await store.put({ uri: "file://a", stamp: 5, status: "updated", meta: { size: 10 } }); + const r = await store.get("file://a"); + expect(r?.stamp).toBe(5); + expect(r?.status).toBe("updated"); + expect((r?.meta as { size: number }).size).toBe(10); + } finally { + await close(); + } + }); + + it("get returns undefined for unknown uri", async () => { + const { store, close } = await factory(); + try { + expect(await store.get("file://missing")).toBeUndefined(); + } finally { + await close(); + } + }); + + it("list filters by prefix and returns latest event per uri sorted by stamp", async () => { + const { store, close } = await factory(); + try { + await store.put({ uri: "file://a", stamp: 1, status: "added" }); + await store.put({ uri: "file://b", stamp: 2, status: "added" }); + await store.put({ uri: "text://a", stamp: 3, status: "added" }); + await store.put({ uri: "file://a", stamp: 4, status: "updated" }); + + const files = await collect(store.list({ prefix: "file://" })); + expect(files.map((r) => r.uri)).toEqual(["file://b", "file://a"]); + expect(files.find((r) => r.uri === "file://a")?.stamp).toBe(4); + + const texts = await collect(store.list({ prefix: "text://" })); + expect(texts.map((r) => r.uri)).toEqual(["text://a"]); + } finally { + await close(); + } + }); + + it("list filters by afterStamp", async () => { + const { store, close } = await factory(); + try { + await store.put({ uri: "file://a", stamp: 1, status: "added" }); + await store.put({ uri: "file://b", stamp: 5, status: "added" }); + await store.put({ uri: "file://a", stamp: 10, status: "updated" }); + + const after3 = await collect(store.list({ prefix: "file://", afterStamp: 3 })); + expect(after3.map((r) => r.uri).sort()).toEqual(["file://a", "file://b"]); + + const after7 = await collect(store.list({ prefix: "file://", afterStamp: 7 })); + expect(after7.map((r) => r.uri)).toEqual(["file://a"]); + } finally { + await close(); + } + }); + + it("saves, gets, lists, deletes workers", async () => { + const { store, close } = await factory(); + try { + await store.saveWorker({ name: "scanner", selects: "", emits: "file://" }); + await store.saveWorker({ name: "extractor", selects: "file://", emits: "text://" }); + + expect((await store.getWorker("scanner"))?.emits).toBe("file://"); + + const all = await collect(store.listWorkers()); + expect(all.map((w) => w.name).sort()).toEqual(["extractor", "scanner"]); + + await store.deleteWorker("scanner"); + expect(await store.getWorker("scanner")).toBeUndefined(); + } finally { + await close(); + } + }); + + it("saveWorker upserts on conflict", async () => { + const { store, close } = await factory(); + try { + await store.saveWorker({ name: "w", selects: "a://", emits: "b://" }); + await store.saveWorker({ name: "w", selects: "x://", emits: "y://" }); + const w = await store.getWorker("w"); + expect(w?.selects).toBe("x://"); + expect(w?.emits).toBe("y://"); + } finally { + await close(); + } + }); + + it("allWatermarks returns max stamp per worker", async () => { + const { store, close } = await factory(); + try { + await store.markCompleted("a", 5); + await store.markCompleted("a", 10); + await store.markCompleted("a", 7); + await store.markCompleted("b", 3); + + const wm = await store.allWatermarks(); + expect(wm.get("a")).toBe(10); + expect(wm.get("b")).toBe(3); + expect(wm.get("c")).toBeUndefined(); + } finally { + await close(); + } + }); + + it("invalidate emits 'removed' events for matching uris", async () => { + const { store, close } = await factory(); + try { + await store.put({ uri: "text://a", stamp: 1, status: "added" }); + await store.put({ uri: "text://b", stamp: 2, status: "updated" }); + await store.put({ uri: "file://x", stamp: 3, status: "added" }); + + await store.invalidate("text://"); + + const a = await store.get("text://a"); + const b = await store.get("text://b"); + const x = await store.get("file://x"); + expect(a?.status).toBe("removed"); + expect(b?.status).toBe("removed"); + expect(x?.status).toBe("added"); + } finally { + await close(); + } + }); + + it("invalidate skips uris that are already removed", async () => { + const { store, close } = await factory(); + try { + await store.put({ uri: "text://a", stamp: 1, status: "removed" }); + + const wmBefore = await store.newStamp(); + await store.invalidate("text://"); + const r = await store.get("text://a"); + expect(r?.stamp).toBeLessThanOrEqual(wmBefore); + } finally { + await close(); + } + }); + + it("purgeResources({ keepLatestPerUri: true }) collapses to one event per uri", async () => { + const { store, close } = await factory(); + try { + await store.put({ uri: "file://a", stamp: 1, status: "added" }); + await store.put({ uri: "file://a", stamp: 2, status: "updated" }); + await store.put({ uri: "file://a", stamp: 3, status: "updated" }); + + await store.purgeResources({ keepLatestPerUri: true }); + + const r = await store.get("file://a"); + expect(r?.stamp).toBe(3); + + const all: Resource[] = await collect(store.list({ prefix: "file://" })); + expect(all.length).toBe(1); + } finally { + await close(); + } + }); + + it("purgeCompletions({ keepLatestPerWorker }) keeps only the N newest", async () => { + const { store, close } = await factory(); + try { + for (let s = 1; s <= 5; s++) await store.markCompleted("w", s); + await store.purgeCompletions({ keepLatestPerWorker: 2 }); + const wm = await store.allWatermarks(); + expect(wm.get("w")).toBe(5); + } finally { + await close(); + } + }); + }); +} diff --git a/packages/uri-graph/tests/store/memory.test.ts b/packages/uri-graph/tests/store/memory.test.ts new file mode 100644 index 0000000..569c6c0 --- /dev/null +++ b/packages/uri-graph/tests/store/memory.test.ts @@ -0,0 +1,7 @@ +import { MemoryStore } from "../../src/store/memory.js"; +import { defineStoreContract } from "./contract.js"; + +defineStoreContract("MemoryStore", async () => { + const store = new MemoryStore(); + return { store, close: async () => {} }; +}); diff --git a/packages/uri-graph/tests/store/sql.test.ts b/packages/uri-graph/tests/store/sql.test.ts new file mode 100644 index 0000000..ad28414 --- /dev/null +++ b/packages/uri-graph/tests/store/sql.test.ts @@ -0,0 +1,14 @@ +import { newNodeTursoDb } from "@statewalker/db-turso-node"; +import { SqlStore } from "../../src/store/sql.js"; +import { defineStoreContract } from "./contract.js"; + +defineStoreContract("SqlStore", async () => { + const db = await newNodeTursoDb(); + const store = new SqlStore(db); + return { + store, + close: async () => { + await db.close(); + }, + }; +}); diff --git a/packages/uri-graph/tests/topo-layers.test.ts b/packages/uri-graph/tests/topo-layers.test.ts new file mode 100644 index 0000000..de96fd4 --- /dev/null +++ b/packages/uri-graph/tests/topo-layers.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, it } from "vitest"; +import { topoLayers } from "../src/index.js"; + +describe("topoLayers", () => { + it("groups independent workers into the same layer", async () => { + const layers = topoLayers([ + { name: "a", selects: "x://", emits: "y://" }, + { name: "b", selects: "p://", emits: "q://" }, + ]); + expect(layers.length).toBe(1); + expect(layers[0]?.map((w) => w.name).sort()).toEqual(["a", "b"]); + }); + + it("orders dependents after their producers", async () => { + const layers = topoLayers([ + { name: "scanner", selects: "", emits: "file://" }, + { name: "extractor", selects: "file://", emits: "text://" }, + { name: "indexer", selects: "text://", emits: "db://" }, + ]); + expect(layers.map((l) => l.map((w) => w.name))).toEqual([ + ["scanner"], + ["extractor"], + ["indexer"], + ]); + }); + + it("places fan-in dependents in a layer after all upstreams", async () => { + const layers = topoLayers([ + { name: "a", selects: "", emits: "alpha://" }, + { name: "b", selects: "", emits: "beta://" }, + { name: "join", selects: "alpha://", emits: "gamma://" }, + ]); + expect(layers[0]?.map((w) => w.name).sort()).toEqual(["a", "b"]); + expect(layers[1]?.map((w) => w.name)).toEqual(["join"]); + }); + + it("throws on cycles", async () => { + expect(() => + topoLayers([ + { name: "a", selects: "x://", emits: "y://" }, + { name: "b", selects: "y://", emits: "x://" }, + ]), + ).toThrow(/cycle/); + }); +}); diff --git a/packages/uri-graph/tsconfig.json b/packages/uri-graph/tsconfig.json new file mode 100644 index 0000000..c342f5b --- /dev/null +++ b/packages/uri-graph/tsconfig.json @@ -0,0 +1,17 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "lib": ["ES2022", "DOM"], + "types": ["node"], + "verbatimModuleSyntax": true, + "isolatedModules": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + "noUncheckedIndexedAccess": true, + "noEmit": true + }, + "include": ["./src", "./tests"], + "exclude": ["node_modules", "dist"] +}