Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/uri-graph/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
dist
*.tsbuildinfo
148 changes: 148 additions & 0 deletions packages/uri-graph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# @statewalker/uri-graph

## What it is

A minimal persistent dependency graph for URI-shaped work. Every observable thing — a file, an extracted text, a chunk, an index, a transformed cell — is a URI. Each URI has a `Resource` (status, monotonic stamp, optional meta). Workers are async generators that consume an input stream of resources and yield an output stream. A fixpoint engine drives them to convergence.

The package is small on purpose: one `Engine`, one `Store` interface with two interchangeable backends (`MemoryStore`, `SqlStore`), and a `topoLayers` helper for introspection. There is no run history, no staging table, no hash-based no-op rule, no scope/role machinery — those concerns either belong to the worker or are handled implicitly by the watermark.

## Why it exists

To replace heavier scanner-style pipelines with a single algorithm that works for:

- file transformations (scan → extract → index)
- code transformations (TS/TSX → JS)
- code execution (ObservableHQ-style cells)
- in-process transient pipelines and durable ones, with the same code

The algorithm:

1. Each worker declares a single input scheme (`selects`) and a single output scheme (`emits`).
2. The engine reads the worker's last completion stamp.
3. The engine streams every resource whose latest event has `stamp > watermark` and whose URI begins with `selects` into the worker.
4. The worker yields output resources, each carrying a fresh stamp it minted via `ctx.newStamp()`. Each yield is persisted immediately by the engine.
5. On clean completion, the engine mints another stamp and writes it to `completions(worker, stamp)`. By construction this stamp is larger than every input or output stamp the run touched.
6. `stabilize()` repeats this round until no worker progressed.

Crash safety falls out of the design: outputs are URI-keyed (idempotent on retry); the completion row is the last write of a run. A crashed run leaves no completion; the next round re-executes the same inputs and overwrites the same output URIs.

## How to use

```sh
pnpm add @statewalker/uri-graph
```

```ts
import { Engine, MemoryStore, type WorkerFn } from "@statewalker/uri-graph";

const store = new MemoryStore();
const engine = new Engine(store);

const scanner: WorkerFn = async function* (input, ctx) {
for await (const _tick of input) {
const stamp = await ctx.newStamp();
yield { uri: "file://a.md", stamp, status: "added" };
yield { uri: "file://b.md", stamp, status: "added" };
}
};

const extractor: WorkerFn = async function* (input, ctx) {
for await (const r of input) {
if (!r.uri.endsWith(".md")) continue;
const stamp = await ctx.newStamp();
yield { uri: `text://${r.uri.slice("file://".length)}`, stamp, status: r.status };
}
};

await engine.register({ name: "scanner", selects: "tick://", emits: "file://" }, scanner);
await engine.register({ name: "extractor", selects: "file://", emits: "text://" }, extractor);

// publish a tick to wake up the source worker
await store.put({ uri: "tick://run", stamp: await store.newStamp(), status: "updated" });

for await (const r of engine.stabilize()) {
console.log(r.uri, r.stamp, r.status);
}
```

### Choosing a backend

| | `MemoryStore` | `SqlStore` |
|---|---|---|
| State | JS maps | libSQL/SQLite tables |
| Persistence | none (in-process) | the underlying `Db` |
| Best for | unit tests, in-memory cell evaluation, scratch pipelines | daemons, durable indexes, cross-restart work |

```ts
import { newNodeTursoDb } from "@statewalker/db-turso-node";
import { Engine, SqlStore } from "@statewalker/uri-graph";

const db = await newNodeTursoDb({ path: "./graph.db" });
const store = new SqlStore(db);
const engine = new Engine(store);
// ... register, stabilize ...
await db.close(); // caller owns the Db
```

Workers do not branch on backend. The same `WorkerFn` runs against either store.

### Operator actions

- `store.invalidate(prefix)` — appends `'removed'` events for every live URI under the prefix. Downstream workers see these on the next `stabilize()` and cascade.
- `store.purgeResources({ keepLatestPerUri: true })` — collapses the event log to one row per URI.
- `store.purgeCompletions({ keepLatestPerWorker: N })` — trims completion history.
- `engine.unregister(name)` — removes the worker and its completion rows; resources stay.

### Introspection

```ts
import { topoLayers } from "@statewalker/uri-graph";

const layers = topoLayers(workers);
// [[w1, w2], [w3], [w4, w5]] — workers in the same layer have no dependency between them
```

`topoLayers` is for visualization and parallel scheduling decisions. The engine itself does not use it: the fixpoint loop is data-driven by stamps.

## Internals

### Storage shape

Three append-only tables plus a worker registry and a stamp counter:

```
stamp_seq (id=1, next) — single-row counter, only UPDATE in the system
resources (uri, stamp, status, meta) PK (uri, stamp)
workers (name, selects, emits) CRUD
completions (worker, stamp, finished_at) PK (worker, stamp)
```

`get(uri)` reads `MAX(stamp)` per URI. `list({ prefix, afterStamp })` joins each URI's max-stamp row and filters. `allWatermarks()` is `SELECT worker, MAX(stamp) FROM completions GROUP BY worker`. The same shape backs `MemoryStore`.

### Watermark semantics

A row in `completions(worker, stamp)` means: *this worker has fully processed every resource that existed when that stamp was minted.* Because the completion stamp is minted after the run finishes, it is strictly greater than every output stamp produced by the run, which is in turn greater than every input stamp consumed.

A worker that consumes input but produces nothing (filter case) still advances its watermark — the engine writes a completion row whenever the worker's input stream yielded at least one resource. A worker that finds no input writes no completion row and re-runs cheaply on the next round.

### Crash safety

- `put(resource)` is `INSERT OR REPLACE` keyed by `(uri, stamp)`; safe to re-emit the same URI/stamp pair.
- `markCompleted` runs only on clean generator completion; a crash leaves the watermark unchanged.
- On restart, the engine sees the unchanged watermark and re-runs the worker with the same inputs. Workers are required to be deterministic on URI keys; re-emitted outputs overwrite previous ones.

### Constraints

- Single-writer engine. Multi-process is out of scope; if you need it, run a single engine and feed work in.
- `selects` and `emits` are scheme prefixes, not glob patterns. If you need finer filtering, do it inside the worker.
- Workers must produce deterministic output URIs (a function of input, never of time/randomness). Re-running yields the same URIs and overwrites prior values.
- `MemoryStore` keeps full state in memory; not intended for huge graphs.

## Related

- `@statewalker/db-api` — abstract `Db` interface used by `SqlStore`.
- `@statewalker/db-turso-node`, `@statewalker/db-turso-browser` — libSQL adapters that implement `Db`.

## License

MIT.
49 changes: 49 additions & 0 deletions packages/uri-graph/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"name": "@statewalker/uri-graph",
"version": "0.2.0",
"private": false,
"type": "module",
"description": "Minimal persistent URI dependency graph: workers as async generators, stamp watermarks, fixpoint stabilizer over an in-memory or SQL store.",
"homepage": "https://github.com/statewalker/statewalker-content",
"author": {
"name": "Mikhail Kotelnikov",
"email": "mikhail.kotelnikov@gmail.com"
},
"license": "MIT",
"repository": {
"type": "git",
"url": "git+ssh://git@github.com/statewalker/statewalker-content.git"
},
"exports": {
".": "./src/index.ts"
},
"files": [
"dist",
"src"
],
"scripts": {
"build": "tsdown",
"dev": "tsdown --watch",
"test": "vitest run",
"test:watch": "vitest",
"typecheck": "tsc --noEmit",
"clean": "rimraf dist",
"lint": "biome check --write .",
"format": "biome format --write ."
},
"dependencies": {
"@statewalker/db-api": "workspace:*"
},
"devDependencies": {
"@statewalker/db-turso-node": "workspace:*",
"@types/node": "catalog:",
"rimraf": "catalog:",
"tsdown": "catalog:",
"typescript": "catalog:",
"vitest": "catalog:"
},
"sideEffects": false,
"publishConfig": {
"access": "public"
}
}
69 changes: 69 additions & 0 deletions packages/uri-graph/src/engine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import type { Store } from "./store/store.js";
import type { Resource, Worker, WorkerContext, WorkerFn } from "./types.js";

export class Engine {
private fns = new Map<string, WorkerFn>();

constructor(private store: Store) {}

async register(worker: Worker, fn: WorkerFn): Promise<void> {
await this.store.saveWorker(worker);
this.fns.set(worker.name, fn);
}

async unregister(name: string): Promise<void> {
await this.store.deleteWorker(name);
this.fns.delete(name);
}

async *runWorker(name: string): AsyncIterable<Resource> {
const worker = await this.store.getWorker(name);
if (!worker) return;
const watermarks = await this.store.allWatermarks();
yield* this.runOne(worker, watermarks.get(name) ?? 0);
}

async *stabilize(): AsyncIterable<Resource> {
for (;;) {
const watermarks = await this.store.allWatermarks();
let progressed = false;
for await (const worker of this.store.listWorkers()) {
const watermark = watermarks.get(worker.name) ?? 0;
for await (const r of this.runOne(worker, watermark)) {
progressed = true;
yield r;
}
}
if (!progressed) break;
}
}

private async *runOne(worker: Worker, watermark: number): AsyncIterable<Resource> {
const fn = this.fns.get(worker.name);
if (!fn) return;

const store = this.store;
let consumed = false;
const input = (async function* () {
for await (const r of store.list({ prefix: worker.selects, afterStamp: watermark })) {
consumed = true;
yield r;
}
})();

const ctx: WorkerContext = {
newStamp: () => store.newStamp(),
read: (uri) => store.get(uri),
};

for await (const out of fn(input, ctx)) {
await store.put(out);
yield out;
}

if (consumed) {
const completionStamp = await store.newStamp();
await store.markCompleted(worker.name, completionStamp);
}
}
}
11 changes: 11 additions & 0 deletions packages/uri-graph/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export { Engine } from "./engine.js";
export { MemoryStore } from "./store/memory.js";
export { SqlStore } from "./store/sql.js";
export type {
ListOptions,
PurgeCompletionsOptions,
PurgeResourcesOptions,
Store,
} from "./store/store.js";
export { topoLayers } from "./topo-layers.js";
export type { Resource, Status, Worker, WorkerContext, WorkerFn } from "./types.js";
113 changes: 113 additions & 0 deletions packages/uri-graph/src/store/memory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import type { Resource, Worker } from "../types.js";
import type {
ListOptions,
PurgeCompletionsOptions,
PurgeResourcesOptions,
Store,
} from "./store.js";

export class MemoryStore implements Store {
private nextStampValue = 1;
private resourcesByUri = new Map<string, Resource[]>();
private workers = new Map<string, Worker>();
private completionsByWorker = new Map<string, number[]>();

async newStamp(): Promise<number> {
return this.nextStampValue++;
}

async put(resource: Resource): Promise<void> {
if (resource.stamp >= this.nextStampValue) {
this.nextStampValue = resource.stamp + 1;
}
const arr = this.resourcesByUri.get(resource.uri);
if (arr) {
arr.push(resource);
} else {
this.resourcesByUri.set(resource.uri, [resource]);
}
}

async get(uri: string): Promise<Resource | undefined> {
const arr = this.resourcesByUri.get(uri);
if (!arr || arr.length === 0) return undefined;
return arr[arr.length - 1];
}

async *list(options: ListOptions): AsyncIterable<Resource> {
const after = options.afterStamp ?? 0;
const matches: Resource[] = [];
for (const [uri, arr] of this.resourcesByUri) {
if (!uri.startsWith(options.prefix)) continue;
const latest = arr[arr.length - 1];
if (latest && latest.stamp > after) matches.push(latest);
}
matches.sort((a, b) => a.stamp - b.stamp);
for (const r of matches) yield r;
}

async saveWorker(worker: Worker): Promise<void> {
this.workers.set(worker.name, { ...worker });
}

async deleteWorker(name: string): Promise<void> {
this.workers.delete(name);
this.completionsByWorker.delete(name);
}

async getWorker(name: string): Promise<Worker | undefined> {
const w = this.workers.get(name);
return w ? { ...w } : undefined;
}

async *listWorkers(): AsyncIterable<Worker> {
for (const w of this.workers.values()) yield { ...w };
}

async markCompleted(worker: string, stamp: number): Promise<void> {
const arr = this.completionsByWorker.get(worker);
if (arr) arr.push(stamp);
else this.completionsByWorker.set(worker, [stamp]);
}

async allWatermarks(): Promise<Map<string, number>> {
const result = new Map<string, number>();
for (const [worker, stamps] of this.completionsByWorker) {
let max = 0;
for (const s of stamps) if (s > max) max = s;
result.set(worker, max);
}
return result;
}

async invalidate(prefix: string): Promise<void> {
const stamp = await this.newStamp();
for (const [uri, arr] of this.resourcesByUri) {
if (!uri.startsWith(prefix)) continue;
const latest = arr[arr.length - 1];
if (!latest || latest.status === "removed") continue;
arr.push({ uri, stamp, status: "removed" });
}
}

async purgeResources(options?: PurgeResourcesOptions): Promise<void> {
if (options?.keepLatestPerUri !== true) return;
for (const [uri, arr] of this.resourcesByUri) {
if (arr.length > 1) {
const latest = arr[arr.length - 1];
if (latest) this.resourcesByUri.set(uri, [latest]);
}
}
}

async purgeCompletions(options?: PurgeCompletionsOptions): Promise<void> {
const keep = options?.keepLatestPerWorker;
if (keep === undefined || keep < 1) return;
for (const [worker, stamps] of this.completionsByWorker) {
if (stamps.length > keep) {
stamps.sort((a, b) => a - b);
this.completionsByWorker.set(worker, stamps.slice(stamps.length - keep));
}
}
}
}
Loading
Loading