Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,152 changes: 1,152 additions & 0 deletions docs/superpowers/plans/2026-05-28-cache-buffer-perf.md

Large diffs are not rendered by default.

258 changes: 258 additions & 0 deletions docs/superpowers/specs/2026-05-28-cache-buffer-perf-design.md

Large diffs are not rendered by default.

121 changes: 91 additions & 30 deletions lib/CacheAndBufferLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ export class Database {
private _flushPaused: SelfContainedPromise | null = null;
private _flushPausedCount = 0;
private readonly _locks: Map<string, SelfContainedPromise> = new Map();
private readonly _dirtyKeys: Set<string> = new Set();
private _flushDone: Promise<void> | null = null;
public metrics: Metrics;
private readonly flushInterval: ReturnType<typeof setInterval> | null;
private _flushTimer: ReturnType<typeof setTimeout> | null = null;

constructor(
wrappedDB: LegacyWrappedDB,
Expand Down Expand Up @@ -220,13 +221,6 @@ export class Database {
writesToDbFinished: 0,
writesToDbRetried: 0,
};

this.flushInterval =
this.settings.writeInterval > 0
? setInterval(() => {
void this.flush();
}, this.settings.writeInterval)
: null;
}

private async _lock(key: string): Promise<void> {
Expand All @@ -246,6 +240,28 @@ export class Database {
this._locks.delete(key);
}

private _scheduleFlush(): void {
if (this._flushTimer != null) return;
// A flush is already in progress. Arming a timer now would leave a stray timer behind if
// that flush drains the keys this write just dirtied: flush() iterates _dirtyKeys live and
// re-arms via its own postlude only when keys actually remain. Let it own the rescheduling.
if (this._flushDone != null) return;
if (this.settings.writeInterval <= 0) return;
if (this._dirtyKeys.size === 0) return;
this._flushTimer = setTimeout(() => {
this._flushTimer = null;
void this.flush();
}, this.settings.writeInterval);
this._flushTimer.unref?.();
}

private _cancelFlushTimer(): void {
if (this._flushTimer != null) {
clearTimeout(this._flushTimer);
this._flushTimer = null;
}
}

// Block flush() until _resumeFlush() is called. This ensures a flush() called after a write in
// the same macro/microtask sees the buffered write.
private _pauseFlush(): void {
Expand All @@ -267,21 +283,40 @@ export class Database {
}

async close(): Promise<void> {
clearInterval(this.flushInterval ?? undefined);
this._cancelFlushTimer();
await this.flush();
await this.wrappedDB!.close();
this.wrappedDB = null;
}

async get(key: string): Promise<unknown> {
// Fast path: cache hit + no writer currently holding the per-key lock.
// The check and the buffer read execute synchronously (no `await` until after the return),
// so no concurrent set/setSub can interleave. cloneOut produces a consistent snapshot.
if (!this._locks.has(key)) {
const entry = this.buffer.get(key);
if (entry != null) {
++this.metrics.reads;
++this.metrics.readsFromCache;
++this.metrics.readsFinished;
if (this.logger.isDebugEnabled()) {
this.logger.debug(
`GET - ${key} - ${JSON.stringify(entry.value)} - ` +
`from ${entry.dirty ? "dirty buffer" : "cache"}`,
);
}
return cloneOut(entry.value);
}
}
// Slow path: cache miss or a writer holds the lock.
let v: unknown;
await this._lock(key);
try {
v = await this._getLocked(key);
} finally {
this._unlock(key);
}
return clone(v);
return cloneOut(v);
}

private async _getLocked(key: string): Promise<unknown> {
Expand Down Expand Up @@ -344,7 +379,7 @@ export class Database {
`GET - ${key}-${notKey} - ${JSON.stringify(keyValues)} - from database `,
);
}
return clone(keyValues) as string[];
return cloneOut(keyValues) as string[];
}

async findKeysPaged(
Expand Down Expand Up @@ -379,7 +414,7 @@ export class Database {
}
return lo;
})();
return clone(all.slice(start, start + options.limit)) as string[];
return cloneOut(all.slice(start, start + options.limit)) as string[];
}
const keys = await this.wrappedDB!.findKeysPaged(key, notKey, options);
if (this.logger.isDebugEnabled()) {
Expand All @@ -388,7 +423,7 @@ export class Database {
`- ${JSON.stringify(keys)} - from database `,
);
}
return clone(keys) as string[];
return cloneOut(keys) as string[];
}

async remove(key: string): Promise<void> {
Expand All @@ -397,7 +432,7 @@ export class Database {
}

async set(key: string, value: unknown): Promise<void> {
value = clone(value);
value = cloneIn(value);
let p!: Promise<void>;
this._pauseFlush();
try {
Expand Down Expand Up @@ -429,6 +464,8 @@ export class Database {
entry.value = value;
if (!entry.dirty) entry.dirty = new SelfContainedPromise();
this.buffer.set(key, entry);
this._dirtyKeys.add(key);
this._scheduleFlush();
const buffered = this.settings.writeInterval > 0;
if (this.logger.isDebugEnabled()) {
this.logger.debug(
Expand All @@ -446,7 +483,7 @@ export class Database {
}

async setSub(key: string, sub: string[], value: unknown): Promise<void> {
value = clone(value);
value = cloneIn(value);
if (this.logger.isDebugEnabled()) {
this.logger.debug(`SETSUB - ${key}${JSON.stringify(sub)} - ${JSON.stringify(value)}`);
}
Expand Down Expand Up @@ -519,23 +556,25 @@ export class Database {
if (this.logger.isDebugEnabled()) {
this.logger.debug(`GETSUB - ${key}${JSON.stringify(sub)} - ${JSON.stringify(v)}`);
}
return clone(v);
return cloneOut(v);
} finally {
this._unlock(key);
}
}

async flush(): Promise<void> {
// Cancel any pending lazy-flush timer — we are doing the work right now.
this._cancelFlushTimer();
if (this._flushDone == null) {
this._flushDone = (async () => {
while (true) {
while (this._flushPaused != null) await this._flushPaused;
const dirtyEntries: [string, CacheEntry][] = [];
for (const entry of this.buffer) {
if (entry[1].dirty && !entry[1].writingInProgress) {
dirtyEntries.push(entry);
if (this.settings.bulkLimit && dirtyEntries.length >= this.settings.bulkLimit) break;
}
for (const key of this._dirtyKeys) {
const entry = this.buffer.get(key, false);
if (!entry || !entry.dirty || entry.writingInProgress) continue;
dirtyEntries.push([key, entry]);
if (this.settings.bulkLimit && dirtyEntries.length >= this.settings.bulkLimit) break;
}
if (dirtyEntries.length === 0) return;
await this._write(dirtyEntries);
Expand All @@ -544,15 +583,21 @@ export class Database {
}
await this._flushDone;
this._flushDone = null;
if (this._dirtyKeys.size > 0) this._scheduleFlush();
}

private async _write(dirtyEntries: [string, CacheEntry][]): Promise<void> {
const markDone = (entry: CacheEntry, err?: Error | null): void => {
const markDone = (key: string, entry: CacheEntry, err?: Error | null): void => {
if (entry.writingInProgress) {
entry.writingInProgress = false;
if (err != null) ++this.metrics.writesToDbFailed;
++this.metrics.writesToDbFinished;
}
// Reference-equality: only clear the dirty marker for THIS key if the entry currently
// in the buffer is the same one we just wrote. If a re-set during the write replaced it
// with a fresh dirty entry, the new entry must stay marked dirty.
const current = this.buffer.get(key, false);
if (current === entry) this._dirtyKeys.delete(key);
entry.dirty!.done(err);
entry.dirty = null;
};
Expand All @@ -565,10 +610,10 @@ export class Database {
if (this.settings.json && entry.value != null) {
serialized = JSON.stringify(entry.value);
} else {
serialized = clone(entry.value) as string | null;
serialized = cloneOut(entry.value) as string | null;
}
} catch (err) {
markDone(entry, err as Error);
markDone(key, entry, err as Error);
continue;
}
entry.writingInProgress = true;
Expand All @@ -590,7 +635,7 @@ export class Database {
} catch (err) {
writeErr = err instanceof Error ? err : new Error(String(err));
}
markDone(entry, writeErr);
markDone(op.key, entry, writeErr);
};

if (ops.length === 1) {
Expand All @@ -609,18 +654,20 @@ export class Database {
this.metrics.writesToDbRetried += ops.length;
await Promise.all(ops.map(async (op, i) => writeOneOp(op, entries[i])));
}
if (success) entries.forEach((entry) => markDone(entry, null));
if (success) {
for (let i = 0; i < entries.length; i++) markDone(ops[i].key, entries[i], null);
}
}
// Evict here to enforce cache = 0 semantics (reads must not hit cache after write completes).
this.buffer.evictOld();
}
}

const clone = (obj: unknown, key = ""): unknown => {
const cloneIn = (obj: unknown, key = ""): unknown => {
if (obj == null || typeof obj !== "object") return obj;

if (typeof (obj as Record<string, unknown>).toJSON === "function") {
return clone((obj as { toJSON(k: string): unknown }).toJSON(key));
return cloneIn((obj as { toJSON(k: string): unknown }).toJSON(key));
}

if (obj instanceof Date) {
Expand All @@ -630,18 +677,32 @@ const clone = (obj: unknown, key = ""): unknown => {
}

if (Array.isArray(obj)) {
return obj.map((item, i) => clone(item, String(i)));
return obj.map((item, i) => cloneIn(item, String(i)));
}

if (obj instanceof Object) {
const copy: Record<string, unknown> = {};
for (const attr of Object.keys(obj)) {
copy[attr] = clone((obj as Record<string, unknown>)[attr], attr);
copy[attr] = cloneIn((obj as Record<string, unknown>)[attr], attr);
}
return copy;
}

throw new Error("Unable to copy obj! Its type isn't supported.");
};

// Read-direction clone. The buffered value has typically been processed by cloneIn (which strips
// toJSON methods at the root and resolves them recursively) or by JSON.parse (always JSON-safe).
// Functions inside cloneIn-output objects ARE preserved as-is, so structuredClone may still throw
// DataCloneError. Fall back to cloneIn (which preserves non-cloneable values by passing them
// through unchanged) to keep parity with the pre-refactor clone() semantics.
const cloneOut = (v: unknown): unknown => {
if (v == null || typeof v !== "object") return v;
try {
return structuredClone(v);
} catch {
return cloneIn(v);
}
};

export const exportedForTesting = { LRU };
11 changes: 11 additions & 0 deletions test/memory/test_tojson.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,15 @@ describe(__filename, () => {
// @ts-expect-error TS(2775): Assertions require every name in the call target t... Remove this comment to see the full error message
assert.deepEqual(await db.get("key"), ["toJSON 0"]);
});
it("object property containing a function survives the round-trip via the cache", async () => {
// cloneIn preserves functions inside objects (they hit the `typeof !== 'object'` branch and
// pass through unchanged). cloneOut therefore needs to tolerate function-containing values
// when reading from the cache; structuredClone would throw DataCloneError without the
// cloneIn fallback in cloneOut.
const fn = () => "hello";
await db.set("key", { fn });
const out: any = await db.get("key");
assert.equal(typeof out.fn, "function");
assert.equal(out.fn(), "hello");
});
});
Loading
Loading