Add ChangeStream support for observable queries#2057
Conversation
- Add ChangeSet<T> TypeScript interface (added, replaced, removed) - Add IChangeStreamFor<T> TypeScript interface extending IObservableQueryFor<T[]> - Add ObservableQueryTransferMode enum (Delta/Full) to Globals - Add observableQueryTransferMode to ArcContext, ArcProps, and Bindings - Add useChangeStream() React hook with client-side delta computation - Update ObservableQuery.hbs proxy template to add useChangeStream() static method for enumerable queries - Add ChangeSet C# type to Arc.Core/Queries (protocol wire format) - Update QueryResult.cs with optional ChangeSet property for future server-side delta Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/d7119489-1c4f-41c0-b2f7-5d6e9a0b3aa5 Co-authored-by: einari <134365+einari@users.noreply.github.com>
- Restore Globals.microservice as default in ArcContext to preserve fallback behavior - Use stable getKeyRef to track getKey changes without triggering extra re-subscriptions - Extract Constructor<TQuery> cast to a single variable to eliminate duplication Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/d7119489-1c4f-41c0-b2f7-5d6e9a0b3aa5 Co-authored-by: einari <134365+einari@users.noreply.github.com>
|
@copilot Great. We are going to need specs for all of this on all levels; backend -> frontend. And also integration specs through the ProxyGenerator as we tend to do. I don't see the hookup in the backend, I was expecting there to be need for MongoDB and EntityFramework specific implementations and in general the whole client observable infrastructure. So I think something is missing. Also, we are going to need documentation for this, both configuration and a page for each of the modes explaining how they work. The configuration should be for the React level and the underlying Arc package level. The modes explanation should be in the latter level. |
Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/500a2ecb-d535-4f10-a5e6-56b3d298dc91 Co-authored-by: einari <134365+einari@users.noreply.github.com>
|
NuGet packages for this PR, e.g. Cratis.Arc: |
|
@copilot This looks great. Continue. But since you've introduced the ChangeSetComputor, which seems to do the trick in tracking what has changed, added, removed - you can disregard my comment about having to implement something for MongoDB and EntityFramework - much better solution you have done! |
- Split when_receiving_update_in_full_transfer_mode into separate spec files per outcome - Extract WaitFor polling helper to given/an_observable_query_demultiplexer base class - Add C# spec: ObservableQueryDemultiplexer sends ChangeSet on second enumerable update - Add TypeScript specs: full transfer mode (per-outcome), removed item, disabled hook Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/fb5e1366-ab61-4bd2-a6f5-b74c8db16b26 Co-authored-by: einari <134365+einari@users.noreply.github.com>
Continuing from here. Added the remaining specs and addressed the code review feedback in commit New specs added:
All 877 C# specs and 7 TypeScript |
|
NuGet packages for this PR, e.g. Cratis.Arc: |
|
NuGet packages for this PR, e.g. Cratis.Arc: |
…InstanceCache per test The shared QueryInstanceCache singleton from QueryInstanceCacheContext caused the second 'it' block to see subscribed=true from the previous test's deferred teardown, skipping the subscribe() call and leaving subscribeCallbacks empty. Fix: initialise a new QueryInstanceCache in beforeEach and wrap each render with a QueryInstanceCacheContext.Provider supplying the fresh cache. Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/ad26b323-bc53-4520-adc8-d79de7227488 Co-authored-by: einari <134365+einari@users.noreply.github.com>
Fixed in commit Fix: create a fresh Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
|
@copilot Fix the PR description, its referring to completely unrelated issue numbers |
|
NuGet packages for this PR, e.g. Cratis.Arc: |
Extend ObservableQuerySubscriptionRequest with an optional TransferMode parameter and mirror it as transferMode? on the TypeScript SubscriptionRequest interface. Both default to null/undefined for backwards-compatible legacy behavior. The value is forwarded from client to server so the server can select the correct emission strategy per subscription.
…rough streaming results ObservableQueryDemultiplexer now reads TransferMode from the subscription request and applies one of three emission strategies per subscription: - delta: first push sends the full snapshot without a ChangeSet; subsequent pushes send only the ChangeSet with Data set to null, letting the client reconstruct state incrementally. - full: every push sends the complete snapshot; no ChangeSet is ever computed or sent. - legacy (no mode): existing behavior preserved — every push includes both Data and a computed ChangeSet. CorrelationId from the initial queryPipeline.Perform result is now captured and set on every streaming QueryResult emission (subject-based and async-enumerable paths), so the client receives a consistent correlation ID for the lifetime of the subscription rather than Guid.Empty on every push.
…e in useObservableQuery ObservableQueryMultiplexer now includes Globals.observableQueryTransferMode in every subscription request it builds, so the server can apply the correct emission strategy. useObservableQuery detects delta-mode subsequent pushes by checking for a non-null changeSet alongside an empty data array (the server serialises null Data as an empty array). When detected, applyChangeSet() merges the ChangeSet into the previous snapshot to reconstruct the full collection client-side, using id-based identity matching when available and JSON-equality fallback otherwise — mirroring the server-side ChangeSetComputor strategy.
Both ArcCore and AspNetCore test app toolbars now expose a Transfer Mode dropdown (Delta / Full) alongside the existing transport selector. Changing the mode triggers an Arc provider re-mount, which re-subscribes all observable queries using the selected emission strategy.
Introduces a ChangeStream feature in TestApps/Shared/Features/ChangeStream with: - ChangeStreamItem read model and three commands (Add, Update, Remove) - ChangeStreamPage.tsx split-layout view: live collection on the left, change-log on the right displaying each push as Added/Replaced/Removed - Navigation wired into App.tsx Also includes the regenerated ShowcaseItem.ts proxy (timestamp/hash update from the dotnet build that generated the ChangeStream proxies).
Three new BDD specs for ObservableQueryDemultiplexer covering: - Delta mode first emission: Data is present, ChangeSet is null, CorrelationId matches the one assigned at subscription time. - Delta mode subsequent emission: Data is null, ChangeSet carries only the incremental changes (1 Added item), CorrelationId is preserved. - Full mode emission: Data is present on both the first and second push, ChangeSet is null on every emission. All 31 ObservableQueryDemultiplexer specs pass.
|
NuGet packages for this PR, e.g. Cratis.Arc: |
…e-query-multiplexing.md Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/2142771f-a655-48b7-acd6-b1f2ed068cce Co-authored-by: einari <134365+einari@users.noreply.github.com>
Fixed in commit
Added in commit
|
Adds a change stream layer on top of the existing observable query infrastructure so consumers can subscribe to per-update deltas (added / replaced / removed) instead of handling full collection replacements themselves.
Added
ChangeSet<T>TypeScript interface describing itemsadded,replaced, andremovedsince the last updateIChangeStreamFor<T>TypeScript interface extendingIObservableQueryFor<T[]>, automatically satisfied by all enumerable observable query proxiesObservableQueryTransferModeenum (Delta|Full) inGlobals;Deltais the defaultobservableQueryTransferModeprop on<Arc>wired throughArcContext→Bindings→GlobalsuseChangeStream<TDataType, TQuery, TArguments>()React hook — subscribes via the same caching/connection infrastructure asuseObservableQuery, computes deltas client-side between consecutive snapshots, and returns aChangeSet<T>as React state. Accepts an optionalgetKeyaccessor; without it, additions and removals are detected via JSON-key comparison andreplacedis not emittedstatic useChangeStream()method auto-generated on all enumerable observable query proxies viaObservableQuery.hbsChangeSetC# type inArc.Core/Queriesas the protocol wire format, with aChangeSet?property onQueryResultpopulated by theObservableQueryDemultiplexeron every enumerable resultChangeSetComputorC# class that computes identity-based (via a conventionalIdproperty) or JSON-hash-fallback deltas between consecutive snapshots, used byObservableQueryDemultiplexerChangeSetComputorcovering identity-based delta, JSON-hash fallback, and first-update behaviourObservableQueryDemultiplexerverifying aChangeSetwith the correct added items is sent on the second enumerable SSE updateWaitForpolling helper extracted to thegiven/an_observable_query_demultiplexerbase context and shared across all SSE subscribe specsuseChangeStreamis emitted on generated enumerable observable query proxiesuseChangeStream: first update, key-based delta, server-provided change set, removed item detection, full transfer mode (first and subsequent updates), and disabled hookDocumentation/backend/queries/change-stream.mdcoveringChangeSetComputor, identity property discovery, and wire formatDocumentation/frontend/react/change-stream.mdcoveringuseChangeStream, transfer modes, generated proxy method, andIChangeStreamFor<T>observableQueryTransferModeinarc.md(configuration table and dedicated section) andobservable-query-multiplexing.md(props reference and configuration section)Changed
ObservableQuery.hbsproxy template: enumerable queries now importChangeSetanduseChangeStreamand expose astatic useChangeStream()method alongside the existinguse()/useWithPaging()/useSuspense()methodsObservableQueryDemultiplexernow instantiates aChangeSetComputorand attaches a computedChangeSetto everyQueryResultemitted for enumerable subjectsFixed
when_creating_instance.ts: the sharedQueryInstanceCachesingleton caused the second test to seesubscribed=truefrom the previous test's deferred teardown, skipping thesubscribe()call. Fixed by providing a freshQueryInstanceCacheContext.Providerper testUsage:
The
ObservableQueryTransferMode.Fullmode treats the entire incoming collection asaddedon every update (useful for debugging or non-incremental consumers). The server attaches aChangeSetto everyQueryResultfor enumerable queries; when available the client uses it directly instead of recomputing deltas from snapshots.