Skip to content

Add ChangeStream support for observable queries#2057

Merged
einari merged 16 commits into
mainfrom
copilot/add-change-stream-support
Apr 11, 2026
Merged

Add ChangeStream support for observable queries#2057
einari merged 16 commits into
mainfrom
copilot/add-change-stream-support

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 6, 2026

Adds a change stream layer on top of the existing observable query infrastructure so consumers can subscribe to per-update deltas (added / replaced / removed) instead of handling full collection replacements themselves.

Added

  • ChangeSet<T> TypeScript interface describing items added, replaced, and removed since the last update
  • IChangeStreamFor<T> TypeScript interface extending IObservableQueryFor<T[]>, automatically satisfied by all enumerable observable query proxies
  • ObservableQueryTransferMode enum (Delta | Full) in Globals; Delta is the default
  • observableQueryTransferMode prop on <Arc> wired through ArcContextBindingsGlobals
  • useChangeStream<TDataType, TQuery, TArguments>() React hook — subscribes via the same caching/connection infrastructure as useObservableQuery, computes deltas client-side between consecutive snapshots, and returns a ChangeSet<T> as React state. Accepts an optional getKey accessor; without it, additions and removals are detected via JSON-key comparison and replaced is not emitted
  • static useChangeStream() method auto-generated on all enumerable observable query proxies via ObservableQuery.hbs
  • ChangeSet C# type in Arc.Core/Queries as the protocol wire format, with a ChangeSet? property on QueryResult populated by the ObservableQueryDemultiplexer on every enumerable result
  • ChangeSetComputor C# class that computes identity-based (via a conventional Id property) or JSON-hash-fallback deltas between consecutive snapshots, used by ObservableQueryDemultiplexer
  • C# specs for ChangeSetComputor covering identity-based delta, JSON-hash fallback, and first-update behaviour
  • C# spec for ObservableQueryDemultiplexer verifying a ChangeSet with the correct added items is sent on the second enumerable SSE update
  • WaitFor polling helper extracted to the given/an_observable_query_demultiplexer base context and shared across all SSE subscribe specs
  • ProxyGenerator integration spec verifying useChangeStream is emitted on generated enumerable observable query proxies
  • TypeScript specs for useChangeStream: first update, key-based delta, server-provided change set, removed item detection, full transfer mode (first and subsequent updates), and disabled hook
  • Backend documentation Documentation/backend/queries/change-stream.md covering ChangeSetComputor, identity property discovery, and wire format
  • Frontend documentation Documentation/frontend/react/change-stream.md covering useChangeStream, transfer modes, generated proxy method, and IChangeStreamFor<T>
  • Frontend documentation for observableQueryTransferMode in arc.md (configuration table and dedicated section) and observable-query-multiplexing.md (props reference and configuration section)

Changed

  • ObservableQuery.hbs proxy template: enumerable queries now import ChangeSet and useChangeStream and expose a static useChangeStream() method alongside the existing use() / useWithPaging() / useSuspense() methods
  • ObservableQueryDemultiplexer now instantiates a ChangeSetComputor and attaches a computed ChangeSet to every QueryResult emitted for enumerable subjects

Fixed

  • Test isolation bug in when_creating_instance.ts: the shared QueryInstanceCache singleton caused the second test to see subscribed=true from the previous test's deferred teardown, skipping the subscribe() call. Fixed by providing a fresh QueryInstanceCacheContext.Provider per test

Usage:

// All.useChangeStream() is generated automatically for any enumerable observable query.
const changes = All.useChangeStream(
    undefined,           // optional query args
    item => item.id      // optional key accessor — enables "replaced" detection
);

useEffect(() => {
    changes.added.forEach(item => console.log('added', item));
    changes.replaced.forEach(item => console.log('replaced', item));
    changes.removed.forEach(item => console.log('removed', item));
}, [changes]);

The ObservableQueryTransferMode.Full mode treats the entire incoming collection as added on every update (useful for debugging or non-incremental consumers). The server attaches a ChangeSet to every QueryResult for enumerable queries; when available the client uses it directly instead of recomputing deltas from snapshots.

Copilot AI linked an issue Apr 6, 2026 that may be closed by this pull request
Copilot AI and others added 2 commits April 6, 2026 16:59
- Add ChangeSet<T> TypeScript interface (added, replaced, removed)
- Add IChangeStreamFor<T> TypeScript interface extending IObservableQueryFor<T[]>
- Add ObservableQueryTransferMode enum (Delta/Full) to Globals
- Add observableQueryTransferMode to ArcContext, ArcProps, and Bindings
- Add useChangeStream() React hook with client-side delta computation
- Update ObservableQuery.hbs proxy template to add useChangeStream() static method for enumerable queries
- Add ChangeSet C# type to Arc.Core/Queries (protocol wire format)
- Update QueryResult.cs with optional ChangeSet property for future server-side delta

Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/d7119489-1c4f-41c0-b2f7-5d6e9a0b3aa5

Co-authored-by: einari <134365+einari@users.noreply.github.com>
- Restore Globals.microservice as default in ArcContext to preserve fallback behavior
- Use stable getKeyRef to track getKey changes without triggering extra re-subscriptions
- Extract Constructor<TQuery> cast to a single variable to eliminate duplication

Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/d7119489-1c4f-41c0-b2f7-5d6e9a0b3aa5

Co-authored-by: einari <134365+einari@users.noreply.github.com>
Copilot AI changed the title [WIP] Add support for change stream type extension for observable queries Add ChangeStream support for observable queries Apr 6, 2026
Copilot AI requested a review from einari April 6, 2026 17:06
@einari
Copy link
Copy Markdown
Contributor

einari commented Apr 6, 2026

@copilot Great. We are going to need specs for all of this on all levels; backend -> frontend. And also integration specs through the ProxyGenerator as we tend to do. I don't see the hookup in the backend, I was expecting there to be need for MongoDB and EntityFramework specific implementations and in general the whole client observable infrastructure. So I think something is missing.

Also, we are going to need documentation for this, both configuration and a page for each of the modes explaining how they work. The configuration should be for the React level and the underlying Arc package level. The modes explanation should be in the latter level.

Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/500a2ecb-d535-4f10-a5e6-56b3d298dc91

Co-authored-by: einari <134365+einari@users.noreply.github.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 6, 2026

NuGet packages for this PR, e.g. Cratis.Arc:
https://github.com/cratis/arc/packages/1655206?version=20.3.0-pr2057.ce4e35c

@einari
Copy link
Copy Markdown
Contributor

einari commented Apr 7, 2026

@copilot This looks great. Continue. But since you've introduced the ChangeSetComputor, which seems to do the trick in tracking what has changed, added, removed - you can disregard my comment about having to implement something for MongoDB and EntityFramework - much better solution you have done!

- Split when_receiving_update_in_full_transfer_mode into separate spec files per outcome
- Extract WaitFor polling helper to given/an_observable_query_demultiplexer base class
- Add C# spec: ObservableQueryDemultiplexer sends ChangeSet on second enumerable update
- Add TypeScript specs: full transfer mode (per-outcome), removed item, disabled hook

Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/fb5e1366-ab61-4bd2-a6f5-b74c8db16b26

Co-authored-by: einari <134365+einari@users.noreply.github.com>
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Apr 7, 2026

@copilot This looks great. Continue. But since you've introduced the ChangeSetComputor, which seems to do the trick in tracking what has changed, added, removed - you can disregard my comment about ha...

Continuing from here. Added the remaining specs and addressed the code review feedback in commit 037e3ad:

New specs added:

  • C#: and_connection_is_known_and_enumerable_query_emits_change_set — verifies ObservableQueryDemultiplexer attaches a ChangeSet with the correct Added items on the second SSE update
  • WaitFor polling helper extracted to given/an_observable_query_demultiplexer base class (used by all SSE subscribe specs)
  • TypeScript: when_receiving_second_update_with_removed_item — key-based removal detection
  • TypeScript: when_hook_is_disabled — verifies no-op returns empty ChangeSet
  • TypeScript: when_receiving_update_in_full_transfer_mode/and_first_update_arrives — all items reported as added
  • TypeScript: when_receiving_update_in_full_transfer_mode/and_subsequent_update_arrives — full mode continues reporting all items as added on every update

All 877 C# specs and 7 TypeScript useChangeStream specs pass. The ProxyGenerator integration spec (when_observing_all_items_via_change_stream) also passes.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 7, 2026

NuGet packages for this PR, e.g. Cratis.Arc:
https://github.com/cratis/arc/packages/1655206?version=20.3.1-pr2057.037e3ad

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 7, 2026

NuGet packages for this PR, e.g. Cratis.Arc:
https://github.com/cratis/arc/packages/1655206?version=20.4.0-pr2057.0ca69e0

…InstanceCache per test

The shared QueryInstanceCache singleton from QueryInstanceCacheContext caused the
second 'it' block to see subscribed=true from the previous test's deferred teardown,
skipping the subscribe() call and leaving subscribeCallbacks empty.

Fix: initialise a new QueryInstanceCache in beforeEach and wrap each render with a
QueryInstanceCacheContext.Provider supplying the fresh cache.

Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/ad26b323-bc53-4520-adc8-d79de7227488

Co-authored-by: einari <134365+einari@users.noreply.github.com>
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Apr 10, 2026

@copilot The JavaScript Build fails: https://productionresultssa6.blob.core.windows.net/actions-results/98ebbdde-fc51-4cee-8241-7cb0fe143290/workflow-job-run-dcf46236-7f70-5a84-9822-f6d74961e450/logs...

Fixed in commit 8877f53. The failure was a test isolation issue in when_creating_instance.ts: both it blocks shared the module-level QueryInstanceCache singleton from QueryInstanceCacheContext. The deferred teardown (setTimeout(0)) from the first test hadn't fired when the second test started, so the cache still held subscribed=true for FakeObservableQuery. The second render saw this and skipped the subscribe() call, leaving subscribeCallbacks empty.

Fix: create a fresh QueryInstanceCache in beforeEach and wrap each render() with a QueryInstanceCacheContext.Provider supplying it, ensuring each test sees a clean subscription state.

Warning

Firewall rules blocked me from connecting to one or more addresses (expand for details)

I tried to connect to the following addresses, but was blocked by firewall rules:

  • example.com
    • Triggering command: /opt/hostedtoolcache/node/24.14.1/x64/bin/node /opt/hostedtoolcache/node/24.14.1/x64/bin/node /home/REDACTED/work/Arc/Arc/node_modules/vitest/vitest.mjs run (dns block)

If you need me to access, download, or install something from one of these locations, you can either:

@einari
Copy link
Copy Markdown
Contributor

einari commented Apr 11, 2026

@copilot Fix the PR description, its referring to completely unrelated issue numbers

@github-actions
Copy link
Copy Markdown

NuGet packages for this PR, e.g. Cratis.Arc:
https://github.com/cratis/arc/packages/1655206?version=20.5.1-pr2057.fe20f8d

einari added 6 commits April 11, 2026 17:41
Extend ObservableQuerySubscriptionRequest with an optional TransferMode
parameter and mirror it as transferMode? on the TypeScript SubscriptionRequest
interface. Both default to null/undefined for backwards-compatible legacy
behavior. The value is forwarded from client to server so the server can select
the correct emission strategy per subscription.
…rough streaming results

ObservableQueryDemultiplexer now reads TransferMode from the subscription
request and applies one of three emission strategies per subscription:

- delta: first push sends the full snapshot without a ChangeSet; subsequent
  pushes send only the ChangeSet with Data set to null, letting the client
  reconstruct state incrementally.
- full: every push sends the complete snapshot; no ChangeSet is ever computed
  or sent.
- legacy (no mode): existing behavior preserved — every push includes both
  Data and a computed ChangeSet.

CorrelationId from the initial queryPipeline.Perform result is now captured and
set on every streaming QueryResult emission (subject-based and async-enumerable
paths), so the client receives a consistent correlation ID for the lifetime of
the subscription rather than Guid.Empty on every push.
…e in useObservableQuery

ObservableQueryMultiplexer now includes Globals.observableQueryTransferMode in
every subscription request it builds, so the server can apply the correct
emission strategy.

useObservableQuery detects delta-mode subsequent pushes by checking for a
non-null changeSet alongside an empty data array (the server serialises null
Data as an empty array). When detected, applyChangeSet() merges the ChangeSet
into the previous snapshot to reconstruct the full collection client-side,
using id-based identity matching when available and JSON-equality fallback
otherwise — mirroring the server-side ChangeSetComputor strategy.
Both ArcCore and AspNetCore test app toolbars now expose a Transfer Mode
dropdown (Delta / Full) alongside the existing transport selector. Changing
the mode triggers an Arc provider re-mount, which re-subscribes all observable
queries using the selected emission strategy.
Introduces a ChangeStream feature in TestApps/Shared/Features/ChangeStream with:
- ChangeStreamItem read model and three commands (Add, Update, Remove)
- ChangeStreamPage.tsx split-layout view: live collection on the left,
  change-log on the right displaying each push as Added/Replaced/Removed
- Navigation wired into App.tsx

Also includes the regenerated ShowcaseItem.ts proxy (timestamp/hash update from
the dotnet build that generated the ChangeStream proxies).
Three new BDD specs for ObservableQueryDemultiplexer covering:

- Delta mode first emission: Data is present, ChangeSet is null, CorrelationId
  matches the one assigned at subscription time.
- Delta mode subsequent emission: Data is null, ChangeSet carries only the
  incremental changes (1 Added item), CorrelationId is preserved.
- Full mode emission: Data is present on both the first and second push,
  ChangeSet is null on every emission.

All 31 ObservableQueryDemultiplexer specs pass.
@github-actions
Copy link
Copy Markdown

NuGet packages for this PR, e.g. Cratis.Arc:
https://github.com/cratis/arc/packages/1655206?version=20.6.0-pr2057.c5fdb22

Copy link
Copy Markdown
Contributor

@einari einari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot We also need documentation for the observableQueryTransferMode for the frontend config and the Arc context.

…e-query-multiplexing.md

Agent-Logs-Url: https://github.com/Cratis/Arc/sessions/2142771f-a655-48b7-acd6-b1f2ed068cce

Co-authored-by: einari <134365+einari@users.noreply.github.com>
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Apr 11, 2026

@copilot Fix the PR description, its referring to completely unrelated issue numbers

Fixed in commit f372664 — all (#496) issue references have been removed from the PR description.

@copilot We also need documentation for the observableQueryTransferMode for the frontend config and the Arc context.

Added in commit f372664:

  • arc.mdobservableQueryTransferMode added to the configuration options table, plus a new Observable Query Transfer Mode section explaining both Delta and Full values with a code example.
  • observable-query-multiplexing.md — new Controlling change-stream transfer mode section with an example and value table, and the prop added to the Props Reference table.

Copilot AI requested a review from einari April 11, 2026 15:52
@einari einari marked this pull request as ready for review April 11, 2026 15:56
@einari einari merged commit 41904ef into main Apr 11, 2026
9 checks passed
@einari einari deleted the copilot/add-change-stream-support branch April 11, 2026 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

"ChangeStream" for Observable queries

2 participants