feat: add framework-agnostic store engine#63
Conversation
Akryum
commented
Jun 4, 2026
- Move cache storage and merge logic into core store-engine modules
- Bridge Vue cache through fine-grained signals for item, list, and relation updates
- Add store-engine tests for writes, layers, observers, tombstones, serialization, and CRDT behavior
- Add Vue cache granularity coverage and legacy benchmarks
- Move cache storage and merge logic into core store-engine modules - Bridge Vue cache through fine-grained signals for item, list, and relation updates - Add store-engine tests for writes, layers, observers, tombstones, serialization, and CRDT behavior - Add Vue cache granularity coverage and legacy benchmarks
There was a problem hiding this comment.
Code Review
This pull request introduces a new framework-agnostic storage engine under packages/core/src/store-engine and refactors the Vue package to act as a thin reactivity bridge over it. This architecture provides fine-grained reactivity, ensuring that single-item writes do not trigger full collection list re-runs. The review feedback highlights several critical issues, including memory leaks in the Vue cache bridge (due to signals not being dropped on item eviction or cache reset), a bug in index mapping where explicit null updates are ignored, a bug in replaceModuleContents when handling arrays, a performance bottleneck in clearCollection due to multiple synchronous flushes, and dangling timers in the staggering controller upon engine disposal.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| function evictBaseWrappedItem(collection: ResolvedCollection<any, any, any>, key: string | number) { | ||
| const wrapKey = getItemWrapKey(collection, key, undefined) | ||
| wrappedItems.delete(wrapKey) | ||
| wrappedItemsMetadata.delete(wrapKey) | ||
| } |
There was a problem hiding this comment.
When a wrapped item is evicted from the base identity map (e.g., during a delete operation), its associated fine-grained signal subscription in the signals registry is not dropped. This leads to a memory leak where observer callbacks remain registered in the core engine indefinitely. We should call signals.dropItem(collection.name, key) to clean up the subscription.
| function evictBaseWrappedItem(collection: ResolvedCollection<any, any, any>, key: string | number) { | |
| const wrapKey = getItemWrapKey(collection, key, undefined) | |
| wrappedItems.delete(wrapKey) | |
| wrappedItemsMetadata.delete(wrapKey) | |
| } | |
| function evictBaseWrappedItem(collection: ResolvedCollection<any, any, any>, key: string | number) { | |
| const wrapKey = getItemWrapKey(collection, key, undefined) | |
| wrappedItems.delete(wrapKey) | |
| wrappedItemsMetadata.delete(wrapKey) | |
| signals.dropItem(collection.name, key) | |
| } |
| onReset: () => { | ||
| // A reset (clear / setState) invalidates every wrapped proxy. | ||
| wrappedItems.clear() | ||
| wrappedItemsMetadata.clear() | ||
| wrappedItemKeysPerLayer.clear() | ||
| const store = getStore() | ||
| store.$hooks.callHookSync('afterCacheReset', { store, meta: {} }) | ||
| }, |
There was a problem hiding this comment.
During a cache reset (such as clear or setState), the wrappedItems identity map is cleared, but the active observer subscriptions in the signals registry are not disposed. Since the engine's observer registry is not cleared, these callbacks remain registered and hold references to the old signals, causing a significant memory leak. Disposing of the signals on reset prevents this leak and allows them to be lazily recreated on subsequent reads.
onReset: () => {
// A reset (clear / setState) invalidates every wrapped proxy.
wrappedItems.clear()
wrappedItemsMetadata.clear()
wrappedItemKeysPerLayer.clear()
signals.dispose()
const store = getStore()
store.$hooks.callHookSync('afterCacheReset', { store, meta: {} })
},| } | ||
| } | ||
|
|
||
| const newValues = indexFields.map(f => newData[f] ?? previousData?.[f]) |
There was a problem hiding this comment.
Using the nullish coalescing operator (??) when mapping new index values causes explicit null updates (which represent clearing a field) to fall back to the previous field value. This results in the index incorrectly retaining the item under its old value instead of removing it. Checking if the field exists in newData using the in operator ensures that explicit null values are correctly respected.
| const newValues = indexFields.map(f => newData[f] ?? previousData?.[f]) | |
| const newValues = indexFields.map(f => f in newData ? newData[f] : previousData?.[f]) |
| function replaceModuleContents(target: any, source: any): void { | ||
| if (target && typeof target === 'object' && source && typeof source === 'object') { | ||
| for (const key of Object.keys(target)) { | ||
| delete target[key] | ||
| } | ||
| Object.assign(target, source) | ||
| } | ||
| } |
There was a problem hiding this comment.
The replaceModuleContents function uses delete target[key] to empty the target object. If the target is an array, deleting indices leaves empty slots and does not truncate the array's length property, leading to corrupted array states after setState or clear. We should explicitly handle arrays by setting length = 0 and pushing the new elements.
function replaceModuleContents(target: any, source: any): void {
if (target && typeof target === 'object' && source && typeof source === 'object') {
if (Array.isArray(target) && Array.isArray(source)) {
target.length = 0
target.push(...source)
} else {
for (const key of Object.keys(target)) {
delete target[key]
}
Object.assign(target, source)
}
}
}| clearCollection({ collection }) { | ||
| ctx.fieldTimestamps.delete(collection.name) | ||
| const tombs = Array.from(ctx.tombstones.entries(), ([, t]) => t) | ||
| .filter(t => t.collection === collection.name) | ||
| for (const t of tombs) { | ||
| ctx.tombstones.clear(t.collection, t.key) | ||
| } | ||
| const collectionState = ctx.collections.get(collection.name) | ||
| if (collectionState) { | ||
| // Snapshot keys: each delete mutates `base` as the queue drains. | ||
| for (const key of Array.from(collectionState.base.keys())) { | ||
| enqueueOperation(ctx, { type: 'deleteItem', params: { collection, key } }) | ||
| } | ||
| } | ||
| }, |
There was a problem hiding this comment.
In clearCollection, calling enqueueOperation for each key in a loop triggers a full queue flush and observer notification dispatch (ctx.observers.flush()) for every single key synchronously (when the queue is not paused). For large collections, this results in a severe performance bottleneck. Temporarily pausing the queue during the loop and resuming/flushing it once at the end batches all deletions into a single observer flush.
clearCollection({ collection }) {
const wasPaused = ctx.paused
ctx.paused = true
try {
ctx.fieldTimestamps.delete(collection.name)
const tombs = Array.from(ctx.tombstones.entries(), ([, t]) => t)
.filter(t => t.collection === collection.name)
for (const t of tombs) {
ctx.tombstones.clear(t.collection, t.key)
}
const collectionState = ctx.collections.get(collection.name)
if (collectionState) {
// Snapshot keys: each delete mutates `base` as the queue drains.
for (const key of Array.from(collectionState.base.keys())) {
enqueueOperation(ctx, { type: 'deleteItem', params: { collection, key } })
}
}
} finally {
ctx.paused = wasPaused
if (!wasPaused) {
flushQueuedOperations(ctx)
}
}
},| return { | ||
| enabled: budgetMax > 0, | ||
| canProcess() { | ||
| if (!budgetMax) { | ||
| return true | ||
| } | ||
| if (budget > 0) { | ||
| return true | ||
| } | ||
| scheduleReset() | ||
| return false | ||
| }, | ||
| consume() { | ||
| if (!budgetMax) { | ||
| return | ||
| } | ||
| scheduleReset() | ||
| budget = Math.max(0, budget - 1) | ||
| }, | ||
| setFlush(fn) { | ||
| flush = fn | ||
| }, | ||
| } |
There was a problem hiding this comment.
The staggering controller creates a timer via setTimeout to reset the write budget. However, there is no mechanism to clear this timer when the store engine is disposed, which can lead to dangling timers in tests or memory leaks in short-lived environments. We should add a dispose method to the staggering controller to clear the timer.
return {
enabled: budgetMax > 0,
canProcess() {
if (!budgetMax) {
return true
}
if (budget > 0) {
return true
}
scheduleReset()
return false
},
consume() {
if (!budgetMax) {
return
}
scheduleReset()
budget = Math.max(0, budget - 1)
},
setFlush(fn) {
flush = fn
},
dispose() {
if (resetTimer) {
clearTimeout(resetTimer)
resetTimer = undefined
}
},
}| dispose() { | ||
| stopTombstoneGc?.() | ||
| stopTombstoneGc = undefined | ||
| }, |
There was a problem hiding this comment.
Call staggering.dispose?.() inside the engine's dispose method to ensure any active staggering budget-reset timers are cleared, preventing memory leaks and dangling timers.
| dispose() { | |
| stopTombstoneGc?.() | |
| stopTombstoneGc = undefined | |
| }, | |
| dispose() { | |
| stopTombstoneGc?.() | |
| stopTombstoneGc = undefined | |
| staggering.dispose?.() | |
| }, |
@rstore/core
@rstore/devtools
@rstore/nuxt
@rstore/nuxt-directus
@rstore/nuxt-drizzle
@rstore/nuxt-multiplayer
@rstore/nuxt-multiplayer-server
@rstore/offline
@rstore/shared
@rstore/vue
@rstore/yjs
commit: |
…e cases - Release item signals + engine observers on delete/GC/reset to stop unbounded subscription growth; add SignalRegistry.size() introspection - Drop an item from its index bucket when an indexed field is cleared to null, while preserving partial-update inheritance for untouched fields - Truncate array module state in place on setState/clear to avoid stale slots and holes - Batch clearCollection deletes behind a single flush (was O(N) observer dispatches) - Cancel the staggering budget-reset timer on engine dispose so it can't re-drive a torn-down engine Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>