diff --git a/.changeset/subscriptions-listen-client.md b/.changeset/subscriptions-listen-client.md new file mode 100644 index 0000000000..3cc5c10757 --- /dev/null +++ b/.changeset/subscriptions-listen-client.md @@ -0,0 +1,6 @@ +--- +'@modelcontextprotocol/core': minor +'@modelcontextprotocol/client': minor +--- + +`Client.listen(filter)` opens a `subscriptions/listen` stream on a 2026-07-28-era connection, resolving once the server's acknowledged notification arrives with an `McpSubscription { honoredFilter, close(), closed }`. `closed` is a `Promise<'local' | 'remote'>` that resolves exactly once when the subscription terminates (`'local'` = you called `close()`; `'remote'` = the server cancelled, the stream ended, or the transport dropped — re-listen if you still want events) and never rejects. Change notifications delivered on the stream dispatch to the existing `setNotificationHandler` registrations — the same handlers the 2025-era unsolicited notifications fire on a legacy connection — so `listen()` is era-transparent for consumers that already register those. `close()` aborts the listen request's stream (where the transport supports it) and sends `notifications/cancelled` referencing the listen id — both, on every transport; no automatic re-listen. On a 2025-era connection `listen()` throws a typed `MethodNotSupportedByProtocolVersion` steering to `resources/subscribe` and `ClientOptions.listChanged`. `ClientOptions.listChanged` now auto-opens a listen stream on a modern connection — the filter is the intersection of the configured sub-options and the server-advertised `listChanged` capabilities; auto-open is skipped (`client.autoOpenedSubscription` stays `undefined`) when that intersection is empty; otherwise the auto-opened subscription is exposed at `client.autoOpenedSubscription`. `TransportSendOptions` gains `requestSignal` (per-request abort) and `onRequestStreamEnd` (fires when a per-request response stream ends or errors for any non-deliberate reason) on the Streamable HTTP transport. diff --git a/docs/migration.md b/docs/migration.md index 445304b986..2e79e8964b 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -1159,6 +1159,23 @@ Resolution is per field, most specific author first: for each of `ttlMs` and `ca per-resource hint that sets only one field never suppresses the other field configured at the operation level. Configured hints are validated when they are configured — an invalid `ttlMs` (negative or non-integer) or `cacheScope` throws a `RangeError`. Responses on 2025-era connections never carry these fields, with or without configuration. +### `subscriptions/listen` (2026-07-28): change-notification streams replace unsolicited delivery + +The 2026-07-28 revision delivers `tools/prompts/resources` `list_changed` and `resources/updated` only on a `subscriptions/listen` stream the client opened — the server never sends an un-requested notification type. Both halves ship: + +**Server side.** Nothing to register: the serving entries handle `subscriptions/listen` themselves. `createMcpHandler` returns `.notify.{toolsChanged, promptsChanged, resourcesChanged, resourceUpdated(uri)}` typed publish sugar over an in-process bus (supply your own +`ServerEventBus` for multi-process deployments). On stdio, `serveStdio` routes the pinned instance's existing `send*ListChanged()` calls onto the active subscriptions automatically. The 2025-era unsolicited delivery model is unchanged on legacy connections. + +```typescript +const handler = createMcpHandler(() => buildServer()); +// after a tool registration changes: +handler.notify.toolsChanged(); +``` + +**Client side.** `ClientOptions.listChanged` keeps working: on a 2026-07-28 connection the SDK auto-opens a `subscriptions/listen` stream whose filter is the intersection of the configured sub-options and the server-advertised `listChanged` capabilities, so the same handlers +fire on every published change (the auto-opened subscription is exposed at `client.autoOpenedSubscription` for `close()`; when the intersection is empty auto-open is skipped and `autoOpenedSubscription` stays `undefined`). `client.listen(filter)` opens a stream explicitly and resolves once the server's acknowledged notification arrives with `{ honoredFilter, close(), closed }` (where `closed` is a `Promise<'local' | 'remote'>` that resolves once on termination — `'remote'` means the server cancelled, the stream ended, or the transport dropped, so re-listen if you still want events); change notifications dispatch to the existing `setNotificationHandler` +registrations. `resources/subscribe` is 2025-only — on a 2026-07-28 connection, request `notifications/resources/updated` via the `resourceSubscriptions` field of the listen filter instead. + ### Multi round-trip requests (2026-07-28): write-once handlers and the client auto-fulfilment driver The 2026-07-28 revision removes the server→client JSON-RPC request channel: servers obtain client input (elicitation, sampling, roots) **in-band**, by answering `tools/call`, `prompts/get`, or `resources/read` with an `input_required` result that embeds the requests, and the @@ -1192,9 +1209,9 @@ has: only `tools/call` has a catch-all that wraps handler failures into `isError **`requestState` is untrusted input — protect it yourself.** `inputRequired({ requestState })` lets a server round-trip opaque state through the client instead of holding it in memory. The SDK treats it as an opaque string end to end: the client echoes it back byte-exact and never parses it, and the server sees the echoed value raw at `ctx.mcpReq.requestState`. The specification's requirement is the consumer's obligation: the value comes back as **attacker-controlled input**, so if it influences authorization, resource access, or business logic you -MUST integrity-protect it when minting it (for example HMAC or AEAD over the payload, bound to the principal, the originating method/parameters, and an expiry) and MUST reject state that fails verification on re-entry. The SDK does not provide or apply any sealing of its own, -but it does provide the place to put your verification: configure `ServerOptions.requestState.verify`, and the seam runs it before the handler whenever `requestState` is present — a thrown rejection answers the client with a frozen `-32602` (above the tool funnel, so it is a -real JSON-RPC error rather than an `isError` result). See `examples/server/src/multiRoundTrip.ts` for a worked HMAC example. +MUST integrity-protect it when minting it (for example HMAC or AEAD over the payload, bound to the principal, the originating method/parameters, and an expiry) and MUST reject state that fails verification on re-entry. The SDK does not provide or apply any sealing of its own, but +it does provide the place to put your verification: configure `ServerOptions.requestState.verify`, and the seam runs it before the handler whenever `requestState` is present — a thrown rejection answers the client with a frozen `-32602` (above the tool funnel, so it is a real +JSON-RPC error rather than an `isError` result). See `examples/server/src/multiRoundTrip.ts` for a worked HMAC example. **Client side — auto-fulfilment by default.** When a call to `tools/call`, `prompts/get`, or `resources/read` on a 2026-07-28 connection answers `input_required`, the client fulfils the embedded requests through the same handlers registered with `setRequestHandler('elicitation/create' | 'sampling/createMessage' | 'roots/list', …)` and retries the original request (fresh request id, `inputResponses`, byte-exact `requestState` echo) up to `inputRequired.maxRounds` rounds (default 10). `client.callTool()` and its siblings diff --git a/packages/client/src/client/client.ts b/packages/client/src/client/client.ts index 4eef2f2ec8..e6ae442893 100644 --- a/packages/client/src/client/client.ts +++ b/packages/client/src/client/client.ts @@ -17,6 +17,7 @@ import type { InputRequiredOptions, JSONRPCNotification, JSONRPCRequest, + JSONRPCResponse, JsonSchemaType, JsonSchemaValidator, jsonSchemaValidator, @@ -45,6 +46,7 @@ import type { ServerCapabilities, StandardSchemaV1, SubscribeRequest, + SubscriptionFilter, Tool, Transport, UnsubscribeRequest @@ -57,6 +59,7 @@ import { CreateMessageResultWithToolsSchema, DEFAULT_REQUEST_TIMEOUT_MSEC, DiscoverResultSchema, + isJSONRPCErrorResponse, isJSONRPCRequest, isModernProtocolVersion, legacyProtocolVersions, @@ -70,7 +73,9 @@ import { resolveInputRequiredDriverConfig, runInputRequiredFlow, SdkError, - SdkErrorCode + SdkErrorCode, + SUBSCRIPTION_ID_META_KEY, + SubscriptionFilterSchema } from '@modelcontextprotocol/core'; import type { ResolvedVersionNegotiation, VersionNegotiationOptions } from './versionNegotiation.js'; @@ -252,6 +257,48 @@ export type ClientOptions = ProtocolOptions & { listChanged?: ListChangedHandlers; }; +/** + * A handle to an open `subscriptions/listen` stream (protocol revision + * 2026-07-28). Change notifications delivered on the stream dispatch to the + * existing {@linkcode Client.setNotificationHandler} registrations. + */ +export interface McpSubscription { + /** + * The subset of the requested filter the server agreed to honor (from + * `notifications/subscriptions/acknowledged`). + */ + readonly honoredFilter: SubscriptionFilter; + /** + * Tears the subscription down. Idempotent. Aborts the listen request's + * stream (where the transport supports it) AND sends + * `notifications/cancelled` referencing the listen request id — both, + * always, so close works on any transport. + */ + close(): Promise; + /** + * Resolves exactly once when the subscription has terminated. Never + * rejects — this is an observation, not an operation. + * + * - `'local'` — you called {@linkcode close} (or aborted the + * `RequestOptions.signal` you passed to `listen()`). + * - `'remote'` — the server cancelled, the stream ended, or the transport + * dropped. Re-listen if you still want events. + */ + readonly closed: Promise<'local' | 'remote'>; +} + +/** @internal */ +interface ListenStateEntry { + /** + * The single funnel for the per-listen `opening → open → closed` state + * machine. Every transport-level feed source — the `_onnotification` / + * `_onresponse` / `_onclose` overrides, `onRequestStreamEnd`, send + * failure, ack timeout, caller-signal abort, `_resetConnectionState` — + * routes through it. + */ + settle: (outcome: { ack: SubscriptionFilter } | { cause: 'local' | 'remote'; error?: Error }) => void; +} + /** * An MCP client on top of a pluggable transport. * @@ -294,11 +341,74 @@ export class Client extends Protocol { private _jsonSchemaValidator: jsonSchemaValidator; private _cachedToolOutputValidators: Map> = new Map(); private _listChangedDebounceTimers: Map> = new Map(); - private _pendingListChangedConfig?: ListChangedHandlers; + /** + * The constructor `listChanged` configuration. Durable across reconnects: + * read fresh on every connect (legacy or modern), never consumed. + */ + private readonly _listChangedConfig?: ListChangedHandlers; private _enforceStrictCapabilities: boolean; private _versionNegotiation?: VersionNegotiationOptions; private _supportedProtocolVersionsOption?: string[]; private _inputRequiredDriverConfig: ResolvedInputRequiredDriverConfig; + /** + * Active subscriptions/listen state, keyed by subscription id (= the + * listen request's JSON-RPC id verbatim). The id is a STRING from a + * Client-owned counter (`'listen:' + N`) — JSON-RPC permits string ids, + * and Protocol's numeric `_requestMessageId` counter only ever issues + * numbers, so listen ids cannot collide with ordinary request ids. + */ + private _listenState = new Map(); + private _nextListenId = 0; + /** The auto-opened subscription backing ClientOptions.listChanged on a modern connection. */ + private _autoOpenedSubscription?: McpSubscription; + + /** + * Clears every per-connection field in one place. Called at the start of + * each fresh (non-resuming) connect and from `close()`, so a stale + * negotiated era / server identity / auto-opened subscription cannot + * survive a reconnect. + */ + private _resetConnectionState(): void { + this._negotiatedProtocolVersion = undefined; + this._serverCapabilities = undefined; + this._serverVersion = undefined; + this._instructions = undefined; + this._autoOpenedSubscription = undefined; + // Settle every live per-listen state machine before clearing the map: + // a fresh connect (or close) on a connection whose prior transport + // never fired onclose would otherwise leave an in-flight listen() + // promise hanging forever. Each entry's settle() deletes only itself + // (Map self-delete during iteration is well-defined). + if (this._listenState.size > 0) { + const reason = new SdkError( + SdkErrorCode.ConnectionClosed, + 'subscriptions/listen: client reconnected or closed; subscription state from the previous connection was reset' + ); + for (const entry of this._listenState.values()) { + entry.settle({ cause: 'remote', error: reason }); + } + } + this._listenState.clear(); + // Debounce timers are connection-scoped: a callback armed on a + // connection that is now gone must not fire onto whatever connection + // (if any) replaces it. + for (const timer of this._listChangedDebounceTimers.values()) { + clearTimeout(timer); + } + this._listChangedDebounceTimers.clear(); + this._cachedToolOutputValidators.clear(); + } + + override async close(): Promise { + try { + await super.close(); + } finally { + // Per-connection state is cleared even when the transport's close + // rejects, so a stale negotiated era / live listen state cannot + // survive a failed close. + this._resetConnectionState(); + } + } /** * Initializes this client with the given name and version information. @@ -319,7 +429,7 @@ export class Client extends Protocol { // Store list changed config for setup after connection (when we know server capabilities) if (options?.listChanged) { - this._pendingListChangedConfig = options.listChanged; + this._listChangedConfig = options.listChanged; } } @@ -657,15 +767,15 @@ export class Client extends Protocol { } return; } - // Fresh connect: the negotiated protocol version is connection state — - // a value left over from a previous connection must not survive into a - // new handshake. Clearing it puts the instance back in the - // pre-negotiation phase, so the initialize exchange below rides the - // bootstrap method pins (legacy era) instead of a dead session's era. - // Without this, an instance that once negotiated a modern era could - // never re-run a fresh handshake: `initialize` is physically absent - // from the modern registry. (The resume branch above keeps it instead.) - this._negotiatedProtocolVersion = undefined; + // Fresh connect: per-connection state left over from a previous + // connection must not survive into a new handshake. Clearing it puts + // the instance back in the pre-negotiation phase, so the initialize + // exchange below rides the bootstrap method pins (legacy era) instead + // of a dead session's era. Without this, an instance that once + // negotiated a modern era could never re-run a fresh handshake: + // `initialize` is physically absent from the modern registry. (The + // resume branch above keeps it instead.) + this._resetConnectionState(); await this._legacyHandshake(transport, options); } @@ -731,9 +841,8 @@ export class Client extends Protocol { this._negotiatedProtocolVersion = result.protocolVersion; // Set up list changed handlers now that we know server capabilities - if (this._pendingListChangedConfig) { - this._setupListChangedHandlers(this._pendingListChangedConfig); - this._pendingListChangedConfig = undefined; + if (this._listChangedConfig) { + this._setupListChangedHandlers(this._listChangedConfig); } } catch (error) { // Disconnect if initialization fails. @@ -765,7 +874,7 @@ export class Client extends Protocol { // Fresh connect: stale connection state must not survive into a new // negotiation — every fresh negotiated connect re-runs the probe. - this._negotiatedProtocolVersion = undefined; + this._resetConnectionState(); let result: Awaited>; try { @@ -802,10 +911,90 @@ export class Client extends Protocol { transport.setProtocolVersion(result.version); } // The modern era has no notifications/initialized; list-changed handlers - // are configured straight from the advertised capabilities. - if (this._pendingListChangedConfig) { - this._setupListChangedHandlers(this._pendingListChangedConfig); - this._pendingListChangedConfig = undefined; + // are configured straight from the advertised capabilities. On a modern + // connection the configured handlers are fed by an auto-opened + // subscriptions/listen stream (the modern era never delivers change + // notifications unsolicited); on a legacy connection they fire on the + // 2025-era unsolicited notifications, no listen needed. + if (this._listChangedConfig) { + const config = this._listChangedConfig; + // Compute configured ∩ server-advertised ONCE and use that single + // value for BOTH handler registration and the auto-open filter, so + // a configured-but-not-advertised type is neither subscribed to + // nor handled (the two stay in lockstep). + const advertised = this._serverCapabilities; + const effective: ListChangedHandlers = { + ...(config.tools && advertised?.tools?.listChanged && { tools: config.tools }), + ...(config.prompts && advertised?.prompts?.listChanged && { prompts: config.prompts }), + ...(config.resources && advertised?.resources?.listChanged && { resources: config.resources }) + }; + // Handler registration validates the per-type options and can + // throw on misconfiguration; the modern connection IS established + // at this point and is fully usable without listChanged handlers, + // so a misconfiguration surfaces via onerror and connect resolves + // (matching the auto-open soft-fail posture). When registration + // fails the auto-open is SKIPPED — opening a listen stream for + // types whose handler never registered would consume a server + // slot to deliver notifications nothing handles. + let handlersRegistered = true; + try { + this._setupListChangedHandlers(effective); + } catch (error) { + handlersRegistered = false; + this.onerror?.(error instanceof Error ? error : new Error(String(error))); + } + const filter: SubscriptionFilter = handlersRegistered + ? { + ...(effective.tools && { toolsListChanged: true as const }), + ...(effective.prompts && { promptsListChanged: true as const }), + ...(effective.resources && { resourcesListChanged: true as const }) + } + : {}; + if (Object.keys(filter).length > 0) { + // A failed auto-open MUST NOT fail connect: the modern + // connection is fully usable without a listen stream (the + // server may not support it, or refuse on capacity). Surface + // via onerror; the consumer can call listen() later. + // + // listen() binds RequestOptions.signal to the SUBSCRIPTION + // lifetime, so connect()'s signal must NOT be forwarded + // verbatim — a connect-scoped `AbortSignal.timeout(30_000)` + // would silently tear the auto-opened stream down the moment + // it fires after connect has resolved. But connect()'s signal + // MUST still cancel the in-connect ack WAIT (otherwise an + // aborted connect blocks here for the full ack timeout). + // Derived one-shot: bound to connect()'s signal only for the + // duration of the listen() await; the listener is removed in + // `finally` so the auto-opened subscription outlives connect's + // signal. + const ackAbort = new AbortController(); + const onConnectAbort = (): void => ackAbort.abort(options?.signal?.reason); + // Handle the already-aborted case (aborted between the + // discover leg resolving and now): the listener never fires + // for a past event. + if (options?.signal?.aborted) onConnectAbort(); + options?.signal?.addEventListener('abort', onConnectAbort); + try { + this._autoOpenedSubscription = await this.listen(filter, { + timeout: options?.timeout, + signal: ackAbort.signal + }); + } catch (error) { + // Connect-signal abort during the ack wait propagates as a + // connect() rejection (caller asked to abort connect). The + // transport is already started, so close it before + // rethrowing — a connect() rejection MUST NOT leave a + // half-open connection. A server-side refusal stays a + // soft onerror (connect succeeds, no listen stream). + if (options?.signal?.aborted) { + await this.close().catch(() => {}); + throw error; + } + this.onerror?.(error instanceof Error ? error : new Error(String(error))); + } finally { + options?.signal?.removeEventListener('abort', onConnectAbort); + } + } } } @@ -1134,6 +1323,303 @@ export class Client extends Protocol { return this.request({ method: 'resources/unsubscribe', params }, options); } + /** + * Opens a `subscriptions/listen` stream (protocol revision 2026-07-28). + * + * Resolves once the server's `notifications/subscriptions/acknowledged` + * arrives (the standard request timeout applies to this ack phase). Change + * notifications delivered on the stream are dispatched to the existing + * {@linkcode setNotificationHandler} registrations — the same handlers the + * 2025-era unsolicited notifications fire on a legacy connection — so + * `listen()` is era-transparent for consumers that already register those. + * + * `close()` tears the subscription down by aborting the listen request's + * `requestSignal` (closes the SSE stream where the transport honors it) + * AND sending `notifications/cancelled` referencing the listen request id + * — both, unconditionally, so any spec-compliant server on any transport + * sees the cancel. No automatic re-listen — call `listen()` again to + * re-establish. + * + * On a 2025-era connection this throws a typed + * {@linkcode SdkErrorCode.MethodNotSupportedByProtocolVersion} steering to + * `resources/subscribe` and `ClientOptions.listChanged` (the legacy + * unsolicited delivery model still applies there); no transparent shim. + */ + async listen(filter: SubscriptionFilter, options?: RequestOptions): Promise { + // Connectivity is checked first so a closed instance rejects with + // NotConnected (no setup or ack timer is started); after close(), + // `_resetConnectionState` has also cleared the negotiated era, so the + // era guard alone would surface a misleading + // MethodNotSupportedByProtocolVersion. + if (this.transport === undefined) { + throw new SdkError(SdkErrorCode.NotConnected, 'Not connected'); + } + const negotiated = this._negotiatedProtocolVersion; + if (negotiated === undefined || !isModernProtocolVersion(negotiated)) { + throw new SdkError( + SdkErrorCode.MethodNotSupportedByProtocolVersion, + `subscriptions/listen requires a 2026-07-28-era connection (negotiated: ${negotiated ?? 'none'}). ` + + 'On a 2025-era connection, change notifications are delivered unsolicited: use ClientOptions.listChanged ' + + 'and resources/subscribe instead.', + { method: 'subscriptions/listen', protocolVersion: negotiated } + ); + } + + // Honor RequestOptions.signal exactly as request() does: an + // already-aborted signal rejects synchronously before any setup. + options?.signal?.throwIfAborted(); + + const requestAbort = new AbortController(); + // The listen request's JSON-RPC id (= the spec's subscription id + // verbatim). A STRING from a Client-owned counter so it cannot + // collide with Protocol's numeric `_requestMessageId` counter — the + // `_onresponse`/`_onnotification` overrides demux by string-id alone. + const listenId = `listen:${this._nextListenId++}`; + + // Explicit `opening → open → closed` state machine. Every termination + // path — ack-arrives, ack-timeout, server-cancelled, user-close, + // stream-end, transport-close, send-failure — funnels through the + // single `settle` below, which clears the ack timer, transitions + // state, and resolves/rejects the opening promise exactly once. The + // cancelled-before-ack / close-before-ack hangs are impossible by + // construction. + let state: 'opening' | 'open' | 'closed' = 'opening'; + let ackTimer: ReturnType | undefined; + let onCallerAbort: (() => void) | undefined; + let resolveOpening!: (honored: SubscriptionFilter) => void; + let rejectOpening!: (error: Error) => void; + const opening = new Promise((resolve, reject) => { + resolveOpening = resolve; + rejectOpening = reject; + }); + // The McpSubscription.closed observation. Resolved exactly once by + // settle()'s `→ closed` transition; never rejects. When listen() + // itself rejects (pre-ack) there is no McpSubscription to observe it + // on — settle() resolves it anyway so nothing dangles. + let resolveClosed!: (cause: 'local' | 'remote') => void; + const closed = new Promise<'local' | 'remote'>(resolve => { + resolveClosed = resolve; + }); + + const settle = (outcome: { ack: SubscriptionFilter } | { cause: 'local' | 'remote'; error?: Error }): void => { + if (state === 'closed') return; + const wasOpening = state === 'opening'; + if (ackTimer !== undefined) { + clearTimeout(ackTimer); + ackTimer = undefined; + } + if ('ack' in outcome) { + // The single `opening → open` transition; an ack after close + // hits the `closed` guard above and is a no-op. + state = 'open'; + resolveOpening(outcome.ack); + return; + } + state = 'closed'; + if (onCallerAbort !== undefined) { + options?.signal?.removeEventListener('abort', onCallerAbort); + } + this._listenState.delete(listenId); + // Abort the per-request signal so an HTTP SSE reader stops on a + // remote-initiated close too (server-cancel / stream-end / + // transport-drop). Idempotent; a no-op on transports that ignore + // requestSignal. wireTeardown() also aborts on the local paths — + // harmless redundancy. + requestAbort.abort(); + resolveClosed(outcome.cause); + if (wasOpening) { + rejectOpening( + outcome.error ?? + new SdkError(SdkErrorCode.ConnectionClosed, 'subscriptions/listen closed before the server acknowledged') + ); + } + }; + + // Wire-level teardown for a locally-initiated close (user close, ack + // timeout, caller-signal abort). Transport-agnostic: ALWAYS abort the + // request signal (closes the SSE stream where the transport honors + // `requestSignal` — HTTP does, stdio does not) AND send + // `notifications/cancelled` referencing the listen id (which the + // stdio listen router and any spec-compliant server honor). Sent via + // `notification()` so the modern auto-envelope is attached exactly as + // for every other outbound. Idempotent over HTTP — the cancelled + // notification is a no-op once the stream is gone; correct on every + // other transport. Not called when the server already terminated. + const wireTeardown = async (): Promise => { + requestAbort.abort(); + await this.notification({ method: 'notifications/cancelled', params: { requestId: listenId } }).catch(() => {}); + }; + + const close = async (): Promise => { + if (state === 'closed') return; + settle({ cause: 'local' }); + await wireTeardown(); + }; + + // The per-subscription state is registered BEFORE the request is sent + // so a synchronously-delivered ack (an in-process transport) cannot + // race the registration. + this._listenState.set(listenId, { settle }); + + const ackTimeout = options?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC; + ackTimer = setTimeout(() => { + settle({ + cause: 'remote', + error: new SdkError(SdkErrorCode.RequestTimeout, 'subscriptions/listen ack timed out', { timeout: ackTimeout }) + }); + void wireTeardown().catch(() => {}); + }, ackTimeout); + + // RequestOptions.signal aborts the subscription at any point in its + // lifecycle (mirrors request()'s cancel path). While `opening`, settle + // rejects the pending listen() promise with the signal's reason; while + // `open`, it transitions to `closed` (`closed` resolves `'local'`) and + // tears the wire down. The listener is removed by `settle()` once the + // subscription has closed. + if (options?.signal) { + const callerSignal = options.signal; + onCallerAbort = () => { + if (state === 'closed') return; + const reason = callerSignal.reason; + settle({ cause: 'local', error: reason instanceof Error ? reason : new Error(String(reason ?? 'Aborted')) }); + void wireTeardown().catch(() => {}); + }; + callerSignal.addEventListener('abort', onCallerAbort, { once: true }); + } + + // Send the listen request directly on the transport. The `_meta` + // envelope is built via the same `_outboundMetaEnvelope()` seam every + // other outbound uses (so a future envelope key cannot silently + // diverge here). `onRequestStreamEnd` feeds the per-request stream's + // non-deliberate end into the state machine on transports that open + // one (Streamable HTTP); stdio/InMemory ignore it. + const jsonrpcRequest: JSONRPCRequest = { + jsonrpc: '2.0', + id: listenId, + method: 'subscriptions/listen', + params: { _meta: { ...this._outboundMetaEnvelope() }, notifications: filter } + }; + try { + await this.transport.send(jsonrpcRequest, { + requestSignal: requestAbort.signal, + onRequestStreamEnd: () => settle({ cause: 'remote', error: new Error('subscriptions/listen: stream ended') }) + }); + } catch (error) { + // Synchronous OR awaited send failure (including a per-request + // abort fired before response headers — `streamableHttp._send` + // rethrows with onerror suppressed). `settle()` is idempotent so + // a locally-aborted send hitting this path after `close()` is a + // no-op. + settle({ cause: 'remote', error: error instanceof Error ? error : new Error(String(error)) }); + } + + const honored = await opening; + return { honoredFilter: honored, close, closed }; + } + + /** + * The subscription auto-opened by `ClientOptions.listChanged` on a modern + * connection — the listen filter is the intersection of the configured + * sub-options and the server-advertised `listChanged` capabilities. + * `undefined` on a legacy connection, before connect, or when that + * intersection is empty (auto-open skipped). Exposed so the consumer can + * `close()` it. + */ + get autoOpenedSubscription(): McpSubscription | undefined { + return this._autoOpenedSubscription; + } + + /** + * Transport-level demux for `subscriptions/listen` notifications, before + * any decoding/era-gating/handler dispatch. Consumes the leading + * `notifications/subscriptions/acknowledged` referencing a live + * subscription id (resolves the ack waiter) and an inbound + * `notifications/cancelled` referencing a live string-typed subscription + * id (server-side teardown on stdio). Change notifications carrying a + * subscription id pass through to the existing registered handlers via + * `super`. An unmatched ack/cancelled is NOT consumed: it reaches + * `setNotificationHandler` / `fallbackNotificationHandler` instead of + * being silently swallowed. + */ + protected override _onnotification(raw: JSONRPCNotification, extra?: MessageExtraInfo): void { + if (raw.method === 'notifications/subscriptions/acknowledged') { + const params = raw.params as { _meta?: Record; notifications?: unknown } | undefined; + const subscriptionId = params?._meta?.[SUBSCRIPTION_ID_META_KEY]; + const entry = typeof subscriptionId === 'string' ? this._listenState.get(subscriptionId) : undefined; + if (entry !== undefined) { + const honored = SubscriptionFilterSchema.safeParse(params?.notifications ?? {}); + entry.settle({ ack: honored.success ? honored.data : {} }); + return; + } + } + if (raw.method === 'notifications/cancelled') { + const cancelledId = (raw.params as { requestId?: unknown } | undefined)?.requestId; + const entry = typeof cancelledId === 'string' ? this._listenState.get(cancelledId) : undefined; + if (entry !== undefined) { + // Handles BOTH the pre-ack and post-ack server-side cancel: + // while opening, settle rejects the pending listen() promise; + // once open, settle transitions to closed and `closed` resolves + // 'remote' so the consumer can observe the server-initiated + // close. + entry.settle({ cause: 'remote', error: new Error('subscriptions/listen: server cancelled the subscription') }); + return; + } + } + super._onnotification(raw, extra); + } + + /** + * Transport-level demux for `subscriptions/listen` responses. The spec + * defines listen as never receiving a JSON-RPC result; a JSON-RPC ERROR + * for the listen id is the server's pre-ack capacity/params rejection. A + * string-id response that matches a live `_listenState` entry is consumed + * here (Protocol's `_responseHandlers` map is keyed by NUMBER and never + * holds a listen id, so passing a string-id response through would + * surface as "unknown message ID" via `onerror`). + */ + protected override _onresponse(response: JSONRPCResponse): void { + const id = response.id; + const entry = typeof id === 'string' ? this._listenState.get(id) : undefined; + if (entry !== undefined) { + if (isJSONRPCErrorResponse(response)) { + entry.settle({ + cause: 'remote', + error: ProtocolError.fromError(response.error.code, response.error.message, response.error.data) + }); + } else { + entry.settle({ + cause: 'remote', + error: new SdkError( + SdkErrorCode.InvalidResult, + 'server answered subscriptions/listen with a result; expected the acknowledged notification' + ) + }); + } + return; + } + super._onresponse(response); + } + + /** + * Settle every live per-listen state machine on a transport-initiated + * close (the server dropping the connection on stdio/InMemory) before + * Protocol's `_onclose` tears the transport down. The base + * `_responseHandlers` settlement does not reach `_listenState` (listen + * ids are never registered there), so without this override a remote + * close would leave an in-flight `listen()` / open `McpSubscription` + * hanging. + */ + protected override _onclose(): void { + if (this._listenState.size > 0) { + const reason = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed'); + for (const entry of this._listenState.values()) { + entry.settle({ cause: 'remote', error: reason }); + } + this._listenState.clear(); + } + super._onclose(); + } + /** * Calls a tool on the connected server and returns the result. Automatically validates structured output * if the tool has an `outputSchema`. diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index 5dea9a7cc5..277e84e0fd 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -50,6 +50,24 @@ export interface StartSSEOptions { * so that the response can be associated with the new resumed request. */ replayMessageId?: string | number; + + /** + * The per-request abort signal supplied by the caller via + * `TransportSendOptions.requestSignal`. When this signal is aborted the + * originating POST and its SSE response stream are torn down + * intentionally — `_handleSseStream` treats it exactly like the + * transport-level abort: no `onerror`, no reconnect. + */ + requestSignal?: AbortSignal; + + /** + * The per-request stream-end callback supplied via + * `TransportSendOptions.onRequestStreamEnd`. Fired when the SSE response + * stream for the originating POST ends or errors for any non-deliberate + * reason (server closed, network dropped, reconnection exhausted) — never + * when `requestSignal` was aborted. + */ + onRequestStreamEnd?: () => void; } /** @@ -164,6 +182,43 @@ export type StreamableHTTPClientTransportOptions = { protocolVersion?: string; }; +/** + * `AbortSignal.any` with a manual fallback. `AbortSignal.any` landed in + * Node 20.3; this package's `engines` floor is `>=20`, so 20.0–20.2 must be + * served by the fallback combinator (a controller that aborts on the first + * of `a` or `b`). The native path is preferred because it propagates the + * originating signal's `reason` and participates in GC the way the spec + * defines. + */ +function anySignal(a: AbortSignal, b: AbortSignal): AbortSignal { + if (typeof AbortSignal.any === 'function') { + return AbortSignal.any([a, b]); + } + const controller = new AbortController(); + if (a.aborted) return (controller.abort(a.reason), controller.signal); + if (b.aborted) return (controller.abort(b.reason), controller.signal); + // Standard polyfill shape: when EITHER input fires, remove the listener + // registered on the OTHER input too. `{once:true}` alone leaks the + // sibling listener — for `_send()`, `a` is the transport-lifetime signal, + // so every request-scoped `b` that aborts would otherwise leave one + // listener + closure pinned on `a` for the life of the transport. + const cleanup = (): void => { + a.removeEventListener('abort', onA); + b.removeEventListener('abort', onB); + }; + function onA(): void { + cleanup(); + controller.abort(a.reason); + } + function onB(): void { + cleanup(); + controller.abort(b.reason); + } + a.addEventListener('abort', onA, { once: true }); + b.addEventListener('abort', onB, { once: true }); + return controller.signal; +} + /** * Client transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification. * It will connect to a server using HTTP `POST` for sending messages and HTTP `GET` with Server-Sent Events @@ -254,7 +309,13 @@ export class StreamableHTTPClientTransport implements Transport { } private async _startOrAuthSse(options: StartSSEOptions, isAuthRetry = false): Promise { - const { resumptionToken } = options; + const { resumptionToken, requestSignal } = options; + // Same guard as `_handleSseStream`: a resurrected listen stream (the + // POST-SSE → GET reconnect path threads `requestSignal` through + // `StartSSEOptions`) must honour the per-request abort exactly as the + // original POST did — both as a fetch signal and as a "do not surface + // onerror" gate. + const isIntentionalAbort = (): boolean => this._abortController?.signal.aborted === true || requestSignal?.aborted === true; try { // Try to open an initial SSE stream with GET to listen for server messages @@ -269,11 +330,16 @@ export class StreamableHTTPClientTransport implements Transport { headers.set('last-event-id', resumptionToken); } + const transportSignal = this._abortController?.signal; + const signal = + requestSignal !== undefined && transportSignal !== undefined + ? anySignal(transportSignal, requestSignal) + : (requestSignal ?? transportSignal); const response = await (this._fetch ?? fetch)(this._url, { ...this._requestInit, method: 'GET', headers, - signal: this._abortController?.signal + signal }); if (!response.ok) { @@ -309,6 +375,15 @@ export class StreamableHTTPClientTransport implements Transport { // 405 indicates that the server does not offer an SSE stream at GET endpoint // This is an expected case that should not trigger an error if (response.status === 405) { + // A 405 on the standalone-GET path is benign (the caller + // never had a per-request stream). On the POST→GET resume + // path it is a TERMINAL non-resumable outcome for a + // per-request stream the caller is observing — fire the + // stream-end callback so the caller can settle (otherwise + // a resumed listen subscription dead-ends silently). The + // standalone-GET callers never pass `onRequestStreamEnd`, + // so this is a no-op for them. + options.onRequestStreamEnd?.(); return; } @@ -320,7 +395,9 @@ export class StreamableHTTPClientTransport implements Transport { this._handleSseStream(response.body, options, true); } catch (error) { - this.onerror?.(error as Error); + if (!isIntentionalAbort()) { + this.onerror?.(error as Error); + } throw error; } } @@ -359,6 +436,8 @@ export class StreamableHTTPClientTransport implements Transport { // Check if we've exceeded maximum retry attempts if (attemptCount >= maxRetries) { this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`)); + // The per-request stream is now definitively gone. + options.onRequestStreamEnd?.(); return; } @@ -367,8 +446,12 @@ export class StreamableHTTPClientTransport implements Transport { const reconnect = (): void => { this._cancelReconnection = undefined; - if (this._abortController?.signal.aborted) return; + // Honour BOTH the transport-wide abort and the per-request abort + // (a listen subscription closed during the backoff delay): do not + // resurrect a stream the caller already tore down. + if (this._abortController?.signal.aborted || options.requestSignal?.aborted) return; this._startOrAuthSse(options).catch(error => { + if (this._abortController?.signal.aborted || options.requestSignal?.aborted) return; this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); try { this._scheduleReconnection(options, attemptCount + 1); @@ -389,9 +472,20 @@ export class StreamableHTTPClientTransport implements Transport { private _handleSseStream(stream: ReadableStream | null, options: StartSSEOptions, isReconnectable: boolean): void { if (!stream) { + // A null body on a per-request stream (or its GET resume) is the + // same terminal non-resumable outcome as a 405 — fire the + // stream-end callback so the caller can settle. No-op for + // standalone-GET callers (they never pass `onRequestStreamEnd`). + options.onRequestStreamEnd?.(); return; } - const { onresumptiontoken, replayMessageId } = options; + const { onresumptiontoken, replayMessageId, requestSignal, onRequestStreamEnd } = options; + // An intentional abort — transport-wide close OR a per-request abort + // (McpSubscription.close() aborting its `requestSignal`) — must read as + // a clean shutdown: no misleading "SSE stream disconnected" onerror, + // and no GET+Last-Event-ID reconnect that would resurrect a stream the + // caller just tore down. + const isIntentionalAbort = (): boolean => this._abortController?.signal.aborted === true || requestSignal?.aborted === true; let lastEventId: string | undefined; // Track whether we've received a priming event (event with ID) @@ -460,17 +554,29 @@ export class StreamableHTTPClientTransport implements Transport { // BUT don't reconnect if we already received a response - the request is complete const canResume = isReconnectable || hasPrimingEvent; const needsReconnect = canResume && !receivedResponse; - if (needsReconnect && this._abortController && !this._abortController.signal.aborted) { + if (needsReconnect && this._abortController && !isIntentionalAbort()) { this._scheduleReconnection( { resumptionToken: lastEventId, onresumptiontoken, - replayMessageId + replayMessageId, + requestSignal, + onRequestStreamEnd }, 0 ); + } else if (!isIntentionalAbort()) { + // The per-request stream ended without reconnecting (no + // priming event for a POST stream, or response already + // received). Not a deliberate abort — notify the caller. + onRequestStreamEnd?.(); } } catch (error) { + if (isIntentionalAbort()) { + // The reader threw because we aborted it. Not an error; do + // not surface onerror, do not reconnect. + return; + } // Handle stream errors - likely a network disconnect this.onerror?.(new Error(`SSE stream disconnected: ${error}`)); @@ -479,20 +585,27 @@ export class StreamableHTTPClientTransport implements Transport { // BUT don't reconnect if we already received a response - the request is complete const canResume = isReconnectable || hasPrimingEvent; const needsReconnect = canResume && !receivedResponse; - if (needsReconnect && this._abortController && !this._abortController.signal.aborted) { + if (needsReconnect && this._abortController && !isIntentionalAbort()) { // Use the exponential backoff reconnection strategy try { this._scheduleReconnection( { resumptionToken: lastEventId, onresumptiontoken, - replayMessageId + replayMessageId, + requestSignal, + onRequestStreamEnd }, 0 ); } catch (error) { this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); + onRequestStreamEnd?.(); } + } else { + // Non-deliberate stream error without reconnection: the + // per-request stream is gone — notify the caller. + onRequestStreamEnd?.(); } } }; @@ -541,14 +654,26 @@ export class StreamableHTTPClientTransport implements Transport { async send( message: JSONRPCMessage | JSONRPCMessage[], - options?: { resumptionToken?: string; onresumptiontoken?: (token: string) => void } + options?: { + resumptionToken?: string; + onresumptiontoken?: (token: string) => void; + requestSignal?: AbortSignal; + onRequestStreamEnd?: () => void; + } ): Promise { return this._send(message, options, false); } private async _send( message: JSONRPCMessage | JSONRPCMessage[], - options: { resumptionToken?: string; onresumptiontoken?: (token: string) => void } | undefined, + options: + | { + resumptionToken?: string; + onresumptiontoken?: (token: string) => void; + requestSignal?: AbortSignal; + onRequestStreamEnd?: () => void; + } + | undefined, isAuthRetry: boolean ): Promise { try { @@ -569,12 +694,21 @@ export class StreamableHTTPClientTransport implements Transport { const types = [...(userAccept?.split(',').map(s => s.trim().toLowerCase()) ?? []), 'application/json', 'text/event-stream']; headers.set('accept', [...new Set(types)].join(', ')); + // Per-request abort: when the caller supplies a request-scoped + // signal (the `subscriptions/listen` driver), aborting it cancels + // this POST and its SSE response stream without closing the + // transport. + const transportSignal = this._abortController?.signal; + const signal = + options?.requestSignal !== undefined && transportSignal !== undefined + ? anySignal(transportSignal, options.requestSignal) + : (options?.requestSignal ?? transportSignal); const init = { ...this._requestInit, method: 'POST', headers, body: JSON.stringify(message), - signal: this._abortController?.signal + signal }; const response = await (this._fetch ?? fetch)(this._url, init); @@ -690,7 +824,15 @@ export class StreamableHTTPClientTransport implements Transport { // Handle SSE stream responses for requests // We use the same handler as standalone streams, which now supports // reconnection with the last event ID - this._handleSseStream(response.body, { onresumptiontoken }, false); + this._handleSseStream( + response.body, + { + onresumptiontoken, + requestSignal: options?.requestSignal, + onRequestStreamEnd: options?.onRequestStreamEnd + }, + false + ); } else if (contentType?.includes('application/json')) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); @@ -712,7 +854,15 @@ export class StreamableHTTPClientTransport implements Transport { await response.text?.().catch(() => {}); } } catch (error) { - this.onerror?.(error as Error); + // Intentional per-request abort BEFORE response headers (the + // `subscriptions/listen` driver aborting its `requestSignal`): + // fetch rejects with AbortError. Same guard as + // `_handleSseStream`'s `isIntentionalAbort` — do not surface a + // misleading onerror; still rethrow so `listen()`'s send-catch + // settles the per-subscription state machine. + if (options?.requestSignal?.aborted !== true) { + this.onerror?.(error as Error); + } throw error; } } diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 678bb4d45d..7fe7acb958 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -52,7 +52,7 @@ export { PrivateKeyJwtProvider, StaticPrivateKeyJwtProvider } from './client/authExtensions.js'; -export type { ClientOptions } from './client/client.js'; +export type { ClientOptions, McpSubscription } from './client/client.js'; export { Client } from './client/client.js'; export { getSupportedElicitationModes } from './client/client.js'; export type { DiscoverAndRequestJwtAuthGrantOptions, JwtAuthGrantResult, RequestJwtAuthGrantOptions } from './client/crossAppAccess.js'; diff --git a/packages/client/test/client/listen.test.ts b/packages/client/test/client/listen.test.ts new file mode 100644 index 0000000000..2476325c23 --- /dev/null +++ b/packages/client/test/client/listen.test.ts @@ -0,0 +1,936 @@ +/** + * `Client.listen()` — the `subscriptions/listen` driver (protocol revision + * 2026-07-28). Covers ack-resolved-promise, change-notification dispatch to + * existing setNotificationHandler registrations, the F-12 legacy-era steer, + * transport-agnostic close (always sends notifications/cancelled), inbound + * server-side cancel, and ClientOptions.listChanged auto-open on a modern + * connection. + */ +import type { JSONRPCMessage, JSONRPCNotification } from '@modelcontextprotocol/core'; +import { + InMemoryTransport, + LATEST_PROTOCOL_VERSION, + PROTOCOL_VERSION_META_KEY, + SdkError, + SdkErrorCode, + SUBSCRIPTION_ID_META_KEY +} from '@modelcontextprotocol/core'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { Client } from '../../src/client/client.js'; + +const MODERN = '2026-07-28'; +const flush = () => new Promise(r => setTimeout(r, 10)); + +async function scriptedModern(onListen?: (id: number | string, filter: unknown, send: (m: JSONRPCMessage) => void) => void) { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + const written: JSONRPCMessage[] = []; + serverTx.onmessage = message => { + written.push(message); + const req = message as { id?: number | string; method?: string; params?: { notifications?: unknown } }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: { tools: { listChanged: true }, prompts: { listChanged: true } }, + serverInfo: { name: 'scripted', version: '1' } + } + }); + } + if (req.method === 'subscriptions/listen' && req.id !== undefined) { + const filter = req.params?.notifications ?? {}; + const ack: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/subscriptions/acknowledged', + params: { _meta: { [SUBSCRIPTION_ID_META_KEY]: req.id }, notifications: filter } + }; + void serverTx.send(ack); + onListen?.(req.id, filter, m => void serverTx.send(m)); + } + }; + await serverTx.start(); + return { clientTx, serverTx, written }; +} + +/** + * Like `scriptedModern` but does NOT auto-ack `subscriptions/listen`: the + * test drives ack / cancel / transport-close itself. + */ +async function scriptedModernNoAck() { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + const written: JSONRPCMessage[] = []; + serverTx.onmessage = message => { + written.push(message); + const req = message as { id?: number | string; method?: string }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: { tools: { listChanged: true }, prompts: { listChanged: true } }, + serverInfo: { name: 'scripted', version: '1' } + } + }); + } + }; + await serverTx.start(); + return { clientTx, serverTx, written }; +} + +describe('Client.listen()', () => { + it('throws a typed steer on a legacy-era connection (no wire write)', async () => { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + const written: JSONRPCMessage[] = []; + serverTx.onmessage = m => { + written.push(m); + const req = m as { id?: number | string; method?: string }; + if (req.method === 'initialize' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { protocolVersion: LATEST_PROTOCOL_VERSION, capabilities: {}, serverInfo: { name: 's', version: '1' } } + }); + } + }; + await serverTx.start(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'legacy' } }); + await client.connect(clientTx); + written.length = 0; + + const error = await client.listen({ toolsListChanged: true }).catch(e => e as SdkError); + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.MethodNotSupportedByProtocolVersion); + expect((error as SdkError).message).toContain('resources/subscribe'); + expect((error as SdkError).message).toContain('listChanged'); + // The steer fires before any wire write. + expect(written.some(m => (m as { method?: string }).method === 'subscriptions/listen')).toBe(false); + await client.close(); + }); + + it('resolves on ack with the honored filter; change notifications reach setNotificationHandler', async () => { + let send!: (m: JSONRPCMessage) => void; + const { clientTx } = await scriptedModern((_id, _f, s) => { + send = s; + }); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + const seen: string[] = []; + client.setNotificationHandler('notifications/tools/list_changed', () => { + seen.push('tools'); + }); + await client.connect(clientTx); + + const sub = await client.listen({ toolsListChanged: true }); + expect(sub.honoredFilter).toEqual({ toolsListChanged: true }); + + send({ + jsonrpc: '2.0', + method: 'notifications/tools/list_changed', + params: { _meta: { [SUBSCRIPTION_ID_META_KEY]: 0 } } + }); + await flush(); + expect(seen).toEqual(['tools']); + await sub.close(); + await client.close(); + }); + + it('close() sends notifications/cancelled referencing the listen id on any transport', async () => { + // Plain InMemoryTransport (neither child-process nor SSE-stream + // semantics): close() must NOT depend on transport-kind detection — + // it always sends notifications/cancelled, so a spec-compliant server + // on InMemory / SSE / a custom transport tears the subscription down. + const { clientTx, written } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + const listenId = (written.find(m => (m as { method?: string }).method === 'subscriptions/listen') as { id: number | string }).id; + written.length = 0; + await sub.close(); + expect(written).toHaveLength(1); + const cancel = written[0] as unknown as { method: string; params: { requestId: unknown; _meta?: Record } }; + expect(cancel.method).toBe('notifications/cancelled'); + expect(cancel.params.requestId).toBe(listenId); + // The listen-path cancel carries the same modern auto-envelope as + // every other outbound (request()'s cancel, Protocol.notification()). + expect(cancel.params._meta?.[PROTOCOL_VERSION_META_KEY]).toBe(MODERN); + // Idempotent. + await sub.close(); + expect(written).toHaveLength(1); + await client.close(); + }); + + it("inbound notifications/cancelled post-ack: closed resolves 'remote'; subscription torn down; handlers stop firing", async () => { + let listenId!: number | string; + let send!: (m: JSONRPCMessage) => void; + const { clientTx } = await scriptedModern((id, _f, s) => { + listenId = id; + send = s; + }); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + const seen: string[] = []; + client.setNotificationHandler('notifications/tools/list_changed', () => { + seen.push('tools'); + }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: listenId } } as JSONRPCNotification); + // The spec-defined remote termination signal is now observable on the + // subscription handle; settle() is the funnel and resolves it once. + await expect(sub.closed).resolves.toBe('remote'); + // Per-listen state is gone; the request signal was aborted (so an HTTP + // SSE reader would have stopped). + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + // After a server-side close, the server stops delivering on this stream + // — a notification carrying this subscription id is no longer routed + // through any per-listen entry (the entry is gone). The handler is the + // shared setNotificationHandler registration; assert no later + // dispatch from THIS subscription's stream by asserting no entry exists + // to demux it. + expect((client as unknown as { _listenState: Map })._listenState.has(listenId)).toBe(false); + expect(seen).toEqual([]); + // close() after server-cancel is idempotent and does NOT change the + // already-resolved cause. + await sub.close(); + await expect(sub.closed).resolves.toBe('remote'); + await client.close(); + }); + + it("close() resolves closed with 'local' exactly once", async () => { + const { clientTx } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + await sub.close(); + await expect(sub.closed).resolves.toBe('local'); + // A second close() and a later remote signal cannot change it. + await sub.close(); + await expect(sub.closed).resolves.toBe('local'); + await client.close(); + }); + + it('closed resolves exactly once even when multiple termination signals arrive', async () => { + let listenId!: number | string; + let send!: (m: JSONRPCMessage) => void; + const { clientTx, serverTx } = await scriptedModern((id, _f, s) => { + listenId = id; + send = s; + }); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + const resolutions: string[] = []; + void sub.closed.then(cause => resolutions.push(cause)); + // Three signals in quick succession: server-cancel, a duplicate + // server-cancel, then transport close. settle()'s `closed` guard + // means only the first transitions; `closed` resolves once. + send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: listenId } } as JSONRPCNotification); + send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: listenId } } as JSONRPCNotification); + await serverTx.close(); + await flush(); + expect(resolutions).toEqual(['remote']); + // sub.close() after the fact is still idempotent and cannot flip it. + await sub.close(); + await expect(sub.closed).resolves.toBe('remote'); + }); + + it('rejects with the typed pre-ack error when the server answers -32603', async () => { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + serverTx.onmessage = m => { + const req = m as { id?: number | string; method?: string }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: {}, + serverInfo: { name: 's', version: '1' } + } + }); + } + if (req.method === 'subscriptions/listen' && req.id !== undefined) { + void serverTx.send({ jsonrpc: '2.0', id: req.id, error: { code: -32_603, message: 'Subscription limit reached' } }); + } + }; + await serverTx.start(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const error = await client.listen({ toolsListChanged: true }).catch(e => e as Error); + expect(error).toBeInstanceOf(Error); + expect((error as { code?: number }).code).toBe(-32_603); + await client.close(); + }); + + it('server cancels BEFORE the ack: listen() rejects immediately, no 60s hang', async () => { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + serverTx.onmessage = m => { + const req = m as { id?: number | string; method?: string }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: {}, + serverInfo: { name: 's', version: '1' } + } + }); + } + if (req.method === 'subscriptions/listen' && req.id !== undefined) { + // Server cancels the listen id BEFORE sending the ack. + void serverTx.send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: req.id } }); + } + }; + await serverTx.start(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const t0 = Date.now(); + const error = await client.listen({ toolsListChanged: true }).catch(e => e as Error); + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toContain('server cancelled the subscription'); + // Rejected promptly (well under the 60s ack timeout). + expect(Date.now() - t0).toBeLessThan(1000); + // No leaked per-listen state for the listen id. + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + await client.close(); + }); + + it('an ack arriving AFTER the subscription was server-cancelled is a no-op', async () => { + let listenId!: number | string; + let send!: (m: JSONRPCMessage) => void; + const { clientTx } = await scriptedModern((id, _f, s) => { + listenId = id; + send = s; + }); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + // Server tears the open subscription down. + send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: listenId } } as JSONRPCNotification); + await flush(); + // A late duplicate ack must not throw or resurrect state. + send({ + jsonrpc: '2.0', + method: 'notifications/subscriptions/acknowledged', + params: { _meta: { [SUBSCRIPTION_ID_META_KEY]: listenId }, notifications: {} } + }); + await flush(); + await sub.close(); + await client.close(); + }); + + it('a synchronously-delivered server-cancel during send does not leak a _listenState entry', async () => { + // In-process delivery: the server's notifications/cancelled arrives + // inside `transport.send()` (before the `await opening`). settle() + // must still drop the `_listenState` entry registered before send. + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + serverTx.onmessage = m => { + const req = m as { id?: number | string; method?: string }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: {}, + serverInfo: { name: 's', version: '1' } + } + }); + } + if (req.method === 'subscriptions/listen' && req.id !== undefined) { + void serverTx.send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: req.id } }); + } + }; + await serverTx.start(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const listenState = (client as unknown as { _listenState: Map })._listenState; + const before = listenState.size; + const error = await client.listen({ toolsListChanged: true }).catch(e => e as Error); + expect((error as Error).message).toContain('server cancelled the subscription'); + // No leaked _listenState entry for the listen id. + expect(listenState.size).toBe(before); + await client.close(); + }); + + it('a synchronous transport.send throw does not leak a _listenState entry', async () => { + const { clientTx } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const realSend = clientTx.send.bind(clientTx); + clientTx.send = () => { + throw new Error('send blew up'); + }; + const error = await client.listen({ toolsListChanged: true }).catch(e => e as Error); + expect((error as Error).message).toContain('send blew up'); + // settle() in the catch path dropped the _listenState entry that was + // registered before send threw; listen() never registers in + // Protocol's `_responseHandlers` so there is nothing to leak there. + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + expect((client as unknown as { _responseHandlers: Map })._responseHandlers.size).toBe(0); + clientTx.send = realSend; + await client.close(); + }); + + it('options.signal aborted while opening: listen() rejects fast with the signal reason', async () => { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + const written: JSONRPCMessage[] = []; + serverTx.onmessage = m => { + written.push(m); + const req = m as { id?: number | string; method?: string }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: {}, + serverInfo: { name: 's', version: '1' } + } + }); + } + // No ack for subscriptions/listen — stays in `opening`. + }; + await serverTx.start(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const ac = new AbortController(); + const t0 = Date.now(); + const pending = client.listen({ toolsListChanged: true }, { signal: ac.signal }); + ac.abort(new Error('caller-abort')); + const error = await pending.catch(e => e as Error); + expect((error as Error).message).toBe('caller-abort'); + expect(Date.now() - t0).toBeLessThan(1000); + // wireTeardown sent notifications/cancelled referencing the listen id. + await flush(); + const listenId = (written.find(m => (m as { method?: string }).method === 'subscriptions/listen') as { id: number | string }).id; + const cancelled = written.find(m => (m as { method?: string }).method === 'notifications/cancelled') as + | { params: { requestId: unknown } } + | undefined; + expect(cancelled?.params.requestId).toBe(listenId); + // No leaked state. + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + await client.close(); + }); + + it('options.signal aborted while open: closes the subscription (notifications/cancelled sent)', async () => { + const { clientTx, written } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const ac = new AbortController(); + const sub = await client.listen({ toolsListChanged: true }, { signal: ac.signal }); + const listenId = (written.find(m => (m as { method?: string }).method === 'subscriptions/listen') as { id: number | string }).id; + written.length = 0; + ac.abort(); + await flush(); + expect(written).toHaveLength(1); + expect((written[0] as JSONRPCNotification).method).toBe('notifications/cancelled'); + expect((written[0] as unknown as { params: { requestId: unknown } }).params.requestId).toBe(listenId); + // Caller-signal abort is consumer-initiated → 'local'. + await expect(sub.closed).resolves.toBe('local'); + // close() after signal-abort is idempotent. + await sub.close(); + expect(written).toHaveLength(1); + await client.close(); + }); + + it('rejects with NotConnected (as a rejected promise, no setup) when no transport is connected', async () => { + const { clientTx } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + await client.close(); + // listen() is async, so a pre-send guard throw is delivered as the + // returned promise's rejection (no ack timer started, no park state). + const pending = client.listen({ toolsListChanged: true }); + const error = await pending.catch(e => e as SdkError); + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.NotConnected); + }); + + it('ClientOptions.listChanged auto-opens a listen stream on a modern connection (filter = configured ∩ server-advertised)', async () => { + const filters: unknown[] = []; + const { clientTx } = await scriptedModern((_id, filter) => filters.push(filter)); + const onChanged = () => {}; + const client = new Client( + { name: 'c', version: '1' }, + { versionNegotiation: { mode: 'auto' }, listChanged: { tools: { onChanged }, prompts: { onChanged } } } + ); + await client.connect(clientTx); + expect(filters).toEqual([{ toolsListChanged: true, promptsListChanged: true }]); + expect(client.autoOpenedSubscription).toBeDefined(); + expect(client.autoOpenedSubscription!.honoredFilter).toEqual({ toolsListChanged: true, promptsListChanged: true }); + await client.autoOpenedSubscription!.close(); + await client.close(); + }); + + it('autoOpenedSubscription is cleared on close() and on a fresh reconnect', async () => { + const onChanged = () => {}; + const client = new Client( + { name: 'c', version: '1' }, + { versionNegotiation: { mode: 'auto' }, listChanged: { tools: { onChanged } } } + ); + const { clientTx } = await scriptedModern(); + await client.connect(clientTx); + expect(client.autoOpenedSubscription).toBeDefined(); + await client.close(); + // close() clears every per-connection field. + expect(client.autoOpenedSubscription).toBeUndefined(); + expect(client.getServerCapabilities()).toBeUndefined(); + expect(client.getNegotiatedProtocolVersion()).toBeUndefined(); + }); + + it('auto-open filter is configured ∩ server-advertised; empty intersection skips auto-open', async () => { + const filters: unknown[] = []; + // scriptedModern advertises tools.listChanged + prompts.listChanged but NOT resources. + const { clientTx } = await scriptedModern((_id, filter) => filters.push(filter)); + const onChanged = () => {}; + const client = new Client( + { name: 'c', version: '1' }, + // Configures tools + resources; server advertises tools + prompts. + { versionNegotiation: { mode: 'auto' }, listChanged: { tools: { onChanged }, resources: { onChanged } } } + ); + await client.connect(clientTx); + // Intersection = tools only. + expect(filters).toEqual([{ toolsListChanged: true }]); + expect(client.autoOpenedSubscription?.honoredFilter).toEqual({ toolsListChanged: true }); + await client.close(); + + // Empty intersection: configures resources only; server advertises tools+prompts. + const filters2: unknown[] = []; + const { clientTx: clientTx2 } = await scriptedModern((_id, filter) => filters2.push(filter)); + const client2 = new Client( + { name: 'c', version: '1' }, + { versionNegotiation: { mode: 'auto' }, listChanged: { resources: { onChanged } } } + ); + await client2.connect(clientTx2); + expect(filters2).toEqual([]); + expect(client2.autoOpenedSubscription).toBeUndefined(); + await client2.close(); + }); + + it('a failed auto-open surfaces via onerror and does NOT fail connect', async () => { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + serverTx.onmessage = m => { + const req = m as { id?: number | string; method?: string }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: { tools: { listChanged: true } }, + serverInfo: { name: 's', version: '1' } + } + }); + } + if (req.method === 'subscriptions/listen' && req.id !== undefined) { + // Server refuses listen (capacity guard / not supported). + void serverTx.send({ jsonrpc: '2.0', id: req.id, error: { code: -32_603, message: 'Subscription limit reached' } }); + } + }; + await serverTx.start(); + const onChanged = () => {}; + const client = new Client( + { name: 'c', version: '1' }, + { versionNegotiation: { mode: 'auto' }, listChanged: { tools: { onChanged } } } + ); + const errors: Error[] = []; + client.onerror = e => errors.push(e); + // connect MUST resolve: the modern connection is usable without listen. + await client.connect(clientTx); + expect(client.autoOpenedSubscription).toBeUndefined(); + expect(errors).toHaveLength(1); + expect((errors[0] as { code?: number }).code).toBe(-32_603); + await client.close(); + }); + + it('a misconfigured listChanged handler surfaces via onerror and SKIPS auto-open (no wire write)', async () => { + // Regression: when handler registration threw (the soft-fail catch), + // the auto-open filter was still built from the same `effective`, + // opening a listen stream for types whose handler never registered — + // delivered notifications dropped on the floor while consuming a + // server slot. Now a registration failure skips auto-open entirely. + const { clientTx, written } = await scriptedModernNoAck(); + const onChanged = () => {}; + const client = new Client( + { name: 'c', version: '1' }, + { versionNegotiation: { mode: 'auto' }, listChanged: { tools: { onChanged, debounceMs: -1 } } } + ); + const errors: Error[] = []; + client.onerror = e => errors.push(e); + // connect MUST resolve: the modern connection is usable without listen. + await client.connect(clientTx); + expect(errors).toHaveLength(1); + expect(errors[0]!.message).toContain('Invalid tools listChanged options'); + // Auto-open SKIPPED: no listen request hit the wire, no subscription. + expect(client.autoOpenedSubscription).toBeUndefined(); + expect(written.some(m => (m as { method?: string }).method === 'subscriptions/listen')).toBe(false); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + await client.close(); + }); + + it('connect-scoped signal does NOT bind to the auto-opened subscription lifetime', async () => { + // Regression: forwarding connect()'s full RequestOptions into the + // auto-open listen() call meant a connect-scoped signal — typically + // `AbortSignal.timeout(30_000)` for the handshake — was bound to the + // SUBSCRIPTION lifetime. When it fired after connect resolved, the + // auto-opened stream was silently torn down. + const { clientTx, written } = await scriptedModern(); + const onChanged = () => {}; + const client = new Client( + { name: 'c', version: '1' }, + { versionNegotiation: { mode: 'auto' }, listChanged: { tools: { onChanged } } } + ); + const errors: Error[] = []; + client.onerror = e => errors.push(e); + const connectScoped = new AbortController(); + await client.connect(clientTx, { signal: connectScoped.signal }); + expect(client.autoOpenedSubscription).toBeDefined(); + written.length = 0; + + // The connect-scoped signal fires AFTER connect resolved (as a + // handshake `AbortSignal.timeout` would). + connectScoped.abort(); + await flush(); + + // The auto-opened subscription is still live: no wire teardown + // (`notifications/cancelled`) was sent, and the per-listen state + // entry is still registered. + expect(written.some(m => (m as JSONRPCNotification).method === 'notifications/cancelled')).toBe(false); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(1); + expect(errors).toHaveLength(0); + await client.close(); + }); + + it('connect-scoped signal aborted DURING the auto-open ack wait: connect rejects fast (no 60s hang)', async () => { + // Regression: forwarding only {timeout} into the auto-open listen() + // meant connect()'s signal could not cancel the in-connect ack wait — + // an aborted connect blocked here for the full ack timeout. + const { clientTx } = await scriptedModernNoAck(); + const closeSpy = vi.spyOn(clientTx, 'close'); + const onChanged = () => {}; + const client = new Client( + { name: 'c', version: '1' }, + { versionNegotiation: { mode: 'auto' }, listChanged: { tools: { onChanged } } } + ); + const connectScoped = new AbortController(); + const t0 = Date.now(); + const pending = client.connect(clientTx, { signal: connectScoped.signal }); + // discover resolves; connect is now awaiting the auto-open ack. + await flush(); + connectScoped.abort(new Error('connect-abort')); + const error = await pending.catch(e => e as Error); + expect(error).toBeInstanceOf(Error); + expect(Date.now() - t0).toBeLessThan(1000); + // No leaked per-listen state on the aborted connect. + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + // A connect() rejection MUST NOT leave a half-open connection: the + // transport was closed before rethrowing (b142b80ea regression assertion). + await flush(); + expect(closeSpy).toHaveBeenCalled(); + expect(client.transport).toBeUndefined(); + await client.close(); + }); + + it('server answers listen with a JSON-RPC RESULT during opening: rejects with a typed InvalidResult (not 60s)', async () => { + const [clientTx, serverTx] = InMemoryTransport.createLinkedPair(); + serverTx.onmessage = m => { + const req = m as { id?: number | string; method?: string }; + if (req.method === 'server/discover' && req.id !== undefined) { + void serverTx.send({ + jsonrpc: '2.0', + id: req.id, + result: { + resultType: 'complete', + supportedVersions: [MODERN], + capabilities: { tools: { listChanged: true } }, + serverInfo: { name: 's', version: '1' } + } + }); + } + if (req.method === 'subscriptions/listen' && req.id !== undefined) { + // Buggy server: answers with a result instead of the + // acknowledged notification. Spec defines listen as never + // receiving a result. + void serverTx.send({ jsonrpc: '2.0', id: req.id, result: {} }); + } + }; + await serverTx.start(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const t0 = Date.now(); + const error = await client.listen({ toolsListChanged: true }).catch(e => e as SdkError); + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.InvalidResult); + expect((error as SdkError).message).toContain('expected the acknowledged notification'); + expect(Date.now() - t0).toBeLessThan(1000); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + await client.close(); + }); + + it('transport closes BEFORE the ack: listen() rejects fast', async () => { + const { clientTx, serverTx } = await scriptedModernNoAck(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const t0 = Date.now(); + const pending = client.listen({ toolsListChanged: true }); + await flush(); + // Server-side transport closes before ever acking → Client's + // `_onclose` override settles every per-listen state machine. + await serverTx.close(); + const error = await pending.catch(e => e as Error); + expect(error).toBeInstanceOf(Error); + expect(Date.now() - t0).toBeLessThan(1000); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + expect((client as unknown as { _responseHandlers: Map })._responseHandlers.size).toBe(0); + }); + + it("transport closes WHILE the subscription is open: closed resolves 'remote'; close() is a no-op", async () => { + const { clientTx, serverTx, written } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(1); + await serverTx.close(); + await expect(sub.closed).resolves.toBe('remote'); + // Transport-close settled the per-listen machine; nothing leaks. + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + // sub.close() after transport-close is a no-op (state already 'closed'): + // no notifications/cancelled lands on a future connection. + written.length = 0; + await sub.close(); + expect(written.some(m => (m as { method?: string }).method === 'notifications/cancelled')).toBe(false); + }); + + it('concurrent listens are independent (each ack resolves its own promise; closing one leaves the other open)', async () => { + const ids: (number | string)[] = []; + const { clientTx, written } = await scriptedModern(id => ids.push(id)); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const [a, b] = await Promise.all([client.listen({ toolsListChanged: true }), client.listen({ promptsListChanged: true })]); + expect(a.honoredFilter).toEqual({ toolsListChanged: true }); + expect(b.honoredFilter).toEqual({ promptsListChanged: true }); + expect(ids).toHaveLength(2); + expect(ids[0]).not.toBe(ids[1]); + const listenState = (client as unknown as { _listenState: Map })._listenState; + expect(listenState.size).toBe(2); + written.length = 0; + await a.close(); + // Only `a`'s id is cancelled; `b` stays open. + expect(written).toHaveLength(1); + expect((written[0] as JSONRPCNotification).method).toBe('notifications/cancelled'); + expect((written[0] as unknown as { params: { requestId: unknown } }).params.requestId).toBe(ids[0]); + expect(listenState.size).toBe(1); + await b.close(); + expect(listenState.size).toBe(0); + await client.close(); + }); + + it('after close(): nothing further dispatched into the per-listen machine; late ack passes through unconsumed', async () => { + let listenId!: number | string; + let send!: (m: JSONRPCMessage) => void; + const { clientTx } = await scriptedModern((id, _f, s) => { + listenId = id; + send = s; + }); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + await sub.close(); + // The per-listen entry is gone; a late server-side ack and a late + // server-side cancel for this id are NOT consumed by the + // `_onnotification` override (no entry matches) and reach the + // fallback handler. + const fallback: string[] = []; + client.fallbackNotificationHandler = async n => { + fallback.push(n.method); + }; + send({ + jsonrpc: '2.0', + method: 'notifications/subscriptions/acknowledged', + params: { _meta: { [SUBSCRIPTION_ID_META_KEY]: listenId }, notifications: {} } + }); + send({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: listenId } } as JSONRPCNotification); + await flush(); + expect(fallback).toContain('notifications/subscriptions/acknowledged'); + // The state machine stayed closed throughout (no leak, no resurrection). + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + await client.close(); + }); + + it('an unmatched ack passes through to fallbackNotificationHandler (not silently swallowed)', async () => { + const { clientTx, serverTx } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + const fallback: string[] = []; + client.fallbackNotificationHandler = async n => { + fallback.push(n.method); + }; + await client.connect(clientTx); + // One listen is active; a stray ack referencing a FOREIGN id must + // reach the fallback handler instead of being silently swallowed. + const sub = await client.listen({ toolsListChanged: true }); + await serverTx.send({ + jsonrpc: '2.0', + method: 'notifications/subscriptions/acknowledged', + params: { _meta: { [SUBSCRIPTION_ID_META_KEY]: 'foreign-id' }, notifications: {} } + }); + await flush(); + expect(fallback).toEqual(['notifications/subscriptions/acknowledged']); + await sub.close(); + await client.close(); + }); + + it('a fresh connect without an intervening close settles in-flight listen() from the prior connection', async () => { + // Edge: prior transport never fires onclose; consumer calls connect() + // again. The in-flight listen() promise from the old connection must + // reject with a clear "client reconnected/closed" error rather than + // hang on the (now-discarded) ack timer. + const { clientTx } = await scriptedModernNoAck(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const pending = client.listen({ toolsListChanged: true }); + await flush(); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(1); + // Fresh connect on a new transport — _resetConnectionState runs. + const { clientTx: clientTx2 } = await scriptedModern(); + await client.connect(clientTx2); + const error = await pending.catch(e => e as Error); + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.ConnectionClosed); + expect((error as SdkError).message).toContain('reconnected or closed'); + // No leaked per-listen state from the old connection. + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + await client.close(); + }); + + it("the listen request id is a STRING on the wire ('listen:N'); cancel echoes it verbatim", async () => { + const { clientTx, written } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + const wireListen = written.find(m => (m as { method?: string }).method === 'subscriptions/listen') as { + id: unknown; + params: { _meta?: Record }; + }; + // String id from a Client-owned counter — JSON-RPC valid; spec + // subscriptionId is the request id verbatim; zero collision with + // Protocol's numeric counter. + expect(typeof wireListen.id).toBe('string'); + expect(wireListen.id).toMatch(/^listen:\d+$/); + // The auto-envelope is on the wire too. + expect(wireListen.params._meta?.[PROTOCOL_VERSION_META_KEY]).toBe(MODERN); + written.length = 0; + await sub.close(); + const cancel = written[0] as unknown as { method: string; params: { requestId: unknown } }; + expect(cancel.params.requestId).toBe(wireListen.id); + await client.close(); + }); + + it("transport-level per-request stream end (onRequestStreamEnd) → closed resolves 'remote'", async () => { + // Mock a transport that captures the per-request `onRequestStreamEnd` + // callback and fires it after the ack — simulating a Streamable HTTP + // server closing the listen request's SSE stream. + const { clientTx, serverTx } = await scriptedModern(); + let onStreamEnd: (() => void) | undefined; + const realSend = clientTx.send.bind(clientTx); + clientTx.send = (m, opts) => { + if ((m as { method?: string }).method === 'subscriptions/listen') { + onStreamEnd = (opts as { onRequestStreamEnd?: () => void } | undefined)?.onRequestStreamEnd; + } + return realSend(m, opts); + }; + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + const sub = await client.listen({ toolsListChanged: true }); + expect(onStreamEnd).toBeDefined(); + // Transport reports the per-request stream ended (server closed the + // SSE response, network dropped it, reconnection exhausted). + onStreamEnd!(); + await expect(sub.closed).resolves.toBe('remote'); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + // close() after stream-end is a no-op (state already 'closed'). + await sub.close(); + await serverTx.close(); + }); + + it('close() resets per-connection state even when transport.close() rejects', async () => { + const { clientTx } = await scriptedModern(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(clientTx); + expect(client.getNegotiatedProtocolVersion()).toBe(MODERN); + clientTx.close = () => Promise.reject(new Error('close blew up')); + await expect(client.close()).rejects.toThrow('close blew up'); + // Per-connection state was cleared regardless. + expect(client.getNegotiatedProtocolVersion()).toBeUndefined(); + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + }); +}); + +describe('_resetConnectionState() clears connection-scoped debounce timers (fake timers)', () => { + beforeEach(() => vi.useFakeTimers()); + afterEach(() => vi.useRealTimers()); + + it('a debounced listChanged callback armed on a closed connection never fires', async () => { + const { clientTx, serverTx } = await scriptedModernNoAck(); + const calls: unknown[] = []; + const client = new Client( + { name: 'c', version: '1' }, + { + versionNegotiation: { mode: 'auto' }, + listChanged: { tools: { onChanged: (e, items) => calls.push({ e, items }), autoRefresh: false, debounceMs: 100 } } + } + ); + const connecting = client.connect(clientTx); + await vi.runAllTimersAsync(); + await connecting; + // Arm the debounce timer for `tools` on the current connection. + await serverTx.send({ jsonrpc: '2.0', method: 'notifications/tools/list_changed' }); + await vi.advanceTimersByTimeAsync(0); + expect((client as unknown as { _listChangedDebounceTimers: Map })._listChangedDebounceTimers.size).toBe(1); + // close() → _resetConnectionState() must clear the armed timer so the + // callback for the dead connection never fires. + await client.close(); + expect((client as unknown as { _listChangedDebounceTimers: Map })._listChangedDebounceTimers.size).toBe(0); + await vi.advanceTimersByTimeAsync(200); + expect(calls).toEqual([]); + }); +}); + +describe('Client.listen() — ack timeout (fake timers)', () => { + beforeEach(() => vi.useFakeTimers()); + afterEach(() => vi.useRealTimers()); + + it('ack timer firing rejects with RequestTimeout and tears the wire down', async () => { + const { clientTx, written } = await scriptedModernNoAck(); + const client = new Client({ name: 'c', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + const connecting = client.connect(clientTx); + await vi.runAllTimersAsync(); + await connecting; + const pending = client.listen({ toolsListChanged: true }, { timeout: 1000 }); + // Capture rejection to avoid an unhandled-rejection on the timer tick. + const settled = pending.catch(e => e as SdkError); + await vi.advanceTimersByTimeAsync(1000); + const error = await settled; + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.RequestTimeout); + // wireTeardown sent notifications/cancelled referencing the listen id. + const listenId = (written.find(m => (m as { method?: string }).method === 'subscriptions/listen') as { id: number | string }).id; + const cancelled = written.find(m => (m as JSONRPCNotification).method === 'notifications/cancelled'); + expect(cancelled).toMatchObject({ params: { requestId: listenId } }); + // No leaked state. + expect((client as unknown as { _listenState: Map })._listenState.size).toBe(0); + expect((client as unknown as { _responseHandlers: Map })._responseHandlers.size).toBe(0); + // Restore real timers before close to avoid hanging on transport timers. + vi.useRealTimers(); + await client.close(); + }); +}); diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index 6542302c9d..04d4615b41 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -1102,6 +1102,407 @@ describe('StreamableHTTPClientTransport', () => { expect(fetchMock.mock.calls[0]![1]?.method).toBe('POST'); }); + it('per-request requestSignal abort: no onerror, no reconnect (McpSubscription.close())', async () => { + // ARRANGE — a POST stream that has been primed with an SSE event id + // (server-side resumability), so without the per-request abort + // guard the transport WOULD schedule a GET+Last-Event-ID reconnect. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxRetries: 1, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + + let streamController!: ReadableStreamDefaultController; + const primedStream = new ReadableStream({ + start(controller) { + streamController = controller; + // Priming event with an id — would arm POST-stream resumability. + controller.enqueue(new TextEncoder().encode('id: ev-1\ndata: \n\n')); + } + }); + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockImplementationOnce((_url, init: RequestInit) => { + // Propagate abort to the stream the way fetch does. + init.signal?.addEventListener('abort', () => streamController.error(init.signal?.reason), { once: true }); + return Promise.resolve({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: primedStream + }); + }); + + const requestAbort = new AbortController(); + await transport.start(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen-1', params: {} }, + { requestSignal: requestAbort.signal } + ); + await vi.advanceTimersByTimeAsync(5); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // ACT — McpSubscription.close() aborts the per-request signal. + requestAbort.abort(); + await vi.advanceTimersByTimeAsync(50); + + // ASSERT — intentional per-request abort: no onerror, no reconnect. + expect(errorSpy).not.toHaveBeenCalled(); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('onRequestStreamEnd fires when the per-request POST stream ends gracefully without reconnecting', async () => { + // ARRANGE — a POST stream with NO priming event id (so the + // graceful-close path does NOT schedule a reconnect): the + // per-request stream simply ends. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + let streamController!: ReadableStreamDefaultController; + const unprimedStream = new ReadableStream({ + start(controller) { + streamController = controller; + // An ack frame with no SSE event id — does NOT arm POST-stream resumability. + controller.enqueue( + new TextEncoder().encode( + 'data: {"jsonrpc":"2.0","method":"notifications/subscriptions/acknowledged","params":{}}\n\n' + ) + ); + } + }); + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockImplementationOnce(() => + Promise.resolve({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: unprimedStream + }) + ); + + const requestAbort = new AbortController(); + const onStreamEnd = vi.fn(); + await transport.start(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen:0', params: {} }, + { requestSignal: requestAbort.signal, onRequestStreamEnd: onStreamEnd } + ); + await vi.advanceTimersByTimeAsync(5); + expect(onStreamEnd).not.toHaveBeenCalled(); + + // ACT — server gracefully closes the SSE stream. + streamController.close(); + await vi.advanceTimersByTimeAsync(5); + + // ASSERT — non-deliberate stream end without reconnecting: + // onRequestStreamEnd fired exactly once; no further fetches. + expect(onStreamEnd).toHaveBeenCalledTimes(1); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('onRequestStreamEnd does NOT fire on a deliberate per-request abort', async () => { + // Same shape as the no-onerror/no-reconnect test, but assert the + // stream-end callback is NEVER invoked when `requestSignal` was the + // abort source. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + let streamController!: ReadableStreamDefaultController; + const stream = new ReadableStream({ + start(controller) { + streamController = controller; + } + }); + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockImplementationOnce((_url, init: RequestInit) => { + init.signal?.addEventListener('abort', () => streamController.error(init.signal?.reason), { once: true }); + return Promise.resolve({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: stream + }); + }); + + const requestAbort = new AbortController(); + const onStreamEnd = vi.fn(); + await transport.start(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen:0', params: {} }, + { requestSignal: requestAbort.signal, onRequestStreamEnd: onStreamEnd } + ); + await vi.advanceTimersByTimeAsync(5); + + // ACT — deliberate per-request abort. + requestAbort.abort(); + await vi.advanceTimersByTimeAsync(50); + + // ASSERT — deliberate abort: onRequestStreamEnd never fires. + expect(onStreamEnd).not.toHaveBeenCalled(); + }); + + it('onRequestStreamEnd fires when reconnection attempts are exhausted (maxRetries reached)', async () => { + // ARRANGE — a primed POST stream (so a non-deliberate close + // schedules a GET resume); every GET resume fails; maxRetries 1 + // means the second schedule hits the exhausted branch. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 5, + maxRetries: 1, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + + let streamController!: ReadableStreamDefaultController; + const primedStream = new ReadableStream({ + start(controller) { + streamController = controller; + controller.enqueue(new TextEncoder().encode('id: ev-1\ndata: \n\n')); + } + }); + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: primedStream + }); + // The GET resume fails with a 5xx → reconnect catch reschedules → exhausted. + fetchMock.mockResolvedValue({ ok: false, status: 503, statusText: 'unavailable', headers: new Headers() }); + + const onStreamEnd = vi.fn(); + await transport.start(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen:0', params: {} }, + { requestSignal: new AbortController().signal, onRequestStreamEnd: onStreamEnd } + ); + await vi.advanceTimersByTimeAsync(5); + expect(onStreamEnd).not.toHaveBeenCalled(); + + // ACT — server closes the primed POST stream non-deliberately. + streamController.close(); + await vi.advanceTimersByTimeAsync(100); + + // ASSERT — exhausted: onRequestStreamEnd fired exactly once (the + // max-retries branch); the exhausted onerror surfaced. + expect(onStreamEnd).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledWith( + expect.objectContaining({ message: expect.stringContaining('Maximum reconnection attempts') }) + ); + }); + + it('onRequestStreamEnd fires when the per-request POST stream ERRORS without reconnecting', async () => { + // ARRANGE — a POST stream with NO priming event id; the body + // errors (network drop). The error-branch `else` (no reconnect, + // not intentional-abort) must fire onRequestStreamEnd. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + const failingStream = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode( + 'data: {"jsonrpc":"2.0","method":"notifications/subscriptions/acknowledged","params":{}}\n\n' + ) + ); + queueMicrotask(() => controller.error(new Error('network drop'))); + } + }); + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: failingStream + }); + + const onStreamEnd = vi.fn(); + await transport.start(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen:0', params: {} }, + { requestSignal: new AbortController().signal, onRequestStreamEnd: onStreamEnd } + ); + await vi.advanceTimersByTimeAsync(50); + + // ASSERT — error-branch fired exactly once; no reconnection + // attempted (POST stream wasn't primed). + expect(onStreamEnd).toHaveBeenCalledTimes(1); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('onRequestStreamEnd does NOT fire on transport.close()', async () => { + // The transport-wide abort is the OTHER deliberate teardown + // (`isIntentionalAbort()` checks both signals): a per-request + // stream-end callback must not fire when close() tore the stream + // down — `_onclose` is the settle path for that. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + let streamController!: ReadableStreamDefaultController; + const stream = new ReadableStream({ + start(controller) { + streamController = controller; + controller.enqueue(new TextEncoder().encode('id: ev-1\ndata: \n\n')); + } + }); + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockImplementationOnce((_url, init: RequestInit) => { + init.signal?.addEventListener('abort', () => streamController.error(init.signal?.reason), { once: true }); + return Promise.resolve({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: stream + }); + }); + + const onStreamEnd = vi.fn(); + await transport.start(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen:0', params: {} }, + { requestSignal: new AbortController().signal, onRequestStreamEnd: onStreamEnd } + ); + await vi.advanceTimersByTimeAsync(5); + + // ACT — transport-wide close. + await transport.close(); + await vi.advanceTimersByTimeAsync(50); + + // ASSERT — deliberate transport close: onRequestStreamEnd never fires. + expect(onStreamEnd).not.toHaveBeenCalled(); + }); + + it('onRequestStreamEnd fires when a primed POST→GET resume hits 405 (non-resumable terminal)', async () => { + // R1 regression: against a server that stamps SSE event ids on the + // listen POST stream but returns 405 on the GET resume, + // `_startOrAuthSse` resolved without a stream and nothing fired — + // the subscription dead-ended silently. The 405 is now a terminal + // per-request stream-end. ALSO asserts the GET resume carried the + // per-request `requestSignal` (the close-after-reconnect path). + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 5, + maxRetries: 3, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + let streamController!: ReadableStreamDefaultController; + const primedStream = new ReadableStream({ + start(controller) { + streamController = controller; + controller.enqueue(new TextEncoder().encode('id: ev-1\ndata: \n\n')); + } + }); + const fetchMock = globalThis.fetch as Mock; + let getSignal: AbortSignal | null | undefined; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: primedStream + }); + fetchMock.mockImplementationOnce((_url, init: RequestInit) => { + getSignal = init.signal; + return Promise.resolve({ ok: false, status: 405, headers: new Headers() }); + }); + + const requestAbort = new AbortController(); + const onStreamEnd = vi.fn(); + await transport.start(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen:0', params: {} }, + { requestSignal: requestAbort.signal, onRequestStreamEnd: onStreamEnd } + ); + await vi.advanceTimersByTimeAsync(5); + + // ACT — server closes the primed POST stream → schedules a GET resume → 405. + streamController.close(); + await vi.advanceTimersByTimeAsync(50); + + // ASSERT — onRequestStreamEnd fired exactly once on the 405; the + // resume was a single GET (no further retries — 405 resolves). + expect(onStreamEnd).toHaveBeenCalledTimes(1); + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[1]![1]?.method).toBe('GET'); + // requestSignal threaded through the GET reconnect: aborting the + // per-request signal aborts the resume's fetch signal. + expect(getSignal).toBeDefined(); + expect(getSignal?.aborted).toBe(false); + requestAbort.abort(); + expect(getSignal?.aborted).toBe(true); + }); + + it('per-request requestSignal abort BEFORE response headers: no misleading onerror; send() still rejects', async () => { + // ARRANGE — fetch is in flight (pending promise) when the + // requestSignal aborts; fetch rejects with AbortError before the + // SSE stream handler ever runs. _send's catch must apply the same + // intentional-abort guard as _handleSseStream. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockImplementationOnce( + (_url, init: RequestInit) => + new Promise((_resolve, reject) => { + init.signal?.addEventListener('abort', () => reject(init.signal?.reason), { once: true }); + }) + ); + + const requestAbort = new AbortController(); + await transport.start(); + const sent = transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: 'listen-1', params: {} }, + { requestSignal: requestAbort.signal } + ); + // Let _send reach the in-flight fetch. + await vi.advanceTimersByTimeAsync(0); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // ACT — abort before headers. + requestAbort.abort(new Error('intentional')); + + // ASSERT — send() rejects (so listen()'s send-catch settles), but no onerror. + await expect(sent).rejects.toThrow(); + expect(errorSpy).not.toHaveBeenCalled(); + }); + + it('anySignal fallback removes the sibling listener (no leak on the transport-lifetime signal)', async () => { + // ARRANGE — force the manual fallback path (Node 20.0–20.2). + const nativeAny = AbortSignal.any; + (AbortSignal as { any?: unknown }).any = undefined; + try { + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockResolvedValue({ ok: true, status: 202, headers: new Headers() }); + await transport.start(); + + const transportSignal = (transport as unknown as { _abortController: AbortController })._abortController.signal; + const addSpy = vi.spyOn(transportSignal, 'addEventListener'); + const removeSpy = vi.spyOn(transportSignal, 'removeEventListener'); + + // ACT — N sends each with a fresh request-scoped signal that + // aborts after the send completes (the McpSubscription.close() + // pattern). Each send registers one fallback listener on the + // transport-lifetime signal; aborting the request-scoped + // signal must remove it. + for (let i = 0; i < 5; i++) { + const requestAbort = new AbortController(); + await transport.send( + { jsonrpc: '2.0', method: 'subscriptions/listen', id: `listen-${i}`, params: {} }, + { requestSignal: requestAbort.signal } + ); + requestAbort.abort(); + } + + // ASSERT — every listener registered on the transport-lifetime + // signal was removed; nothing accrues per send(). + expect(addSpy.mock.calls.length).toBeGreaterThan(0); + expect(removeSpy.mock.calls.length).toBe(addSpy.mock.calls.length); + } finally { + (AbortSignal as { any?: unknown }).any = nativeAny; + } + }); + it('should NOT reconnect a POST stream when error response was received', async () => { // ARRANGE transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index 873aa6f92a..eaf14d5c67 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -735,7 +735,12 @@ export abstract class Protocol { await this._transport.start(); } - private _onclose(): void { + /** + * Transport-close hook. Subclass overrides MUST call `super._onclose()` + * after their own cleanup — base teardown (response-handler settlement, + * timeout clearing, in-flight request abort) does not run otherwise. + */ + protected _onclose(): void { const responseHandlers = this._responseHandlers; this._responseHandlers = new Map(); this._progressHandlers.clear(); @@ -770,7 +775,13 @@ export abstract class Protocol { this.onerror?.(error); } - private _onnotification(rawNotification: JSONRPCNotification, extra?: MessageExtraInfo): void { + /** + * Inbound-notification dispatch. Subclass overrides MUST delegate + * unmatched traffic to `super._onnotification(rawNotification, extra)` — + * an override that consumes only what it owns and falls through to base + * dispatch for everything else. + */ + protected _onnotification(rawNotification: JSONRPCNotification, extra?: MessageExtraInfo): void { // Hide wire-only material from notification handlers too — but ONLY // the reserved envelope `_meta` keys (the retry params names are // reserved on requests, not notifications). There is no @@ -1086,7 +1097,13 @@ export abstract class Protocol { handler(params); } - private _onresponse(response: JSONRPCResponse | JSONRPCErrorResponse): void { + /** + * Inbound-response dispatch. Subclass overrides MUST delegate unmatched + * traffic to `super._onresponse(response)` — an override that consumes + * only what it owns and falls through to base dispatch for everything + * else. + */ + protected _onresponse(response: JSONRPCResponse | JSONRPCErrorResponse): void { const messageId = Number(response.id); const handler = this._responseHandlers.get(messageId); diff --git a/packages/core/src/shared/transport.ts b/packages/core/src/shared/transport.ts index c606e2e3b5..c9be6ee56c 100644 --- a/packages/core/src/shared/transport.ts +++ b/packages/core/src/shared/transport.ts @@ -67,6 +67,26 @@ export type TransportSendOptions = { * This allows clients to persist the latest token for potential reconnection. */ onresumptiontoken?: ((token: string) => void) | undefined; + + /** + * An abort signal for THIS outbound message's underlying request, when the + * transport sends one outbound message per underlying request (the + * Streamable HTTP transport's POST-per-request model). Aborting it cancels + * the underlying request (and its SSE response stream) without closing the + * transport. Transports that share a single channel (stdio, in-memory) + * ignore it. + */ + requestSignal?: AbortSignal | undefined; + + /** + * Fired by transports that open a per-request stream (the Streamable HTTP + * transport's POST-per-request SSE response) when that stream ends or + * errors for any reason OTHER than a deliberate `requestSignal` abort — + * i.e. the server closed the stream, the network dropped it, or + * reconnection was exhausted. Transports that share a single channel + * (stdio, in-memory) ignore it. + */ + onRequestStreamEnd?: (() => void) | undefined; }; /** * Describes the minimal contract for an MCP transport that a client or server can communicate over. diff --git a/packages/core/src/types/types.ts b/packages/core/src/types/types.ts index 3d4bd9940e..4f072fd748 100644 --- a/packages/core/src/types/types.ts +++ b/packages/core/src/types/types.ts @@ -566,7 +566,7 @@ export type ResultTypeMap = { // `subscriptions/listen` never receives a JSON-RPC result on the wire: // termination is stream close (HTTP) or `notifications/cancelled` (stdio). // The `EmptyResult` entry exists only to keep the mapped types total — - // see the serving entries' listen routers. + // see `Client.listen()` and the serving entries' listen routers. 'subscriptions/listen': EmptyResult; 'tools/call': CallToolResult; 'tools/list': ListToolsResult; diff --git a/packages/core/src/wire/rev2026-07-28/registry.ts b/packages/core/src/wire/rev2026-07-28/registry.ts index 49df49429f..969c719bbd 100644 --- a/packages/core/src/wire/rev2026-07-28/registry.ts +++ b/packages/core/src/wire/rev2026-07-28/registry.ts @@ -23,7 +23,9 @@ * (SEP-1865): 2026-only vocabulary, present here as registry shells. * Dispatch never reaches a registered handler — the serving entries * (`createMcpHandler`, `serveStdio`) recognize listen at the entry layer - * and own ack/filter/stamp/teardown themselves. + * and own ack/filter/stamp/teardown themselves; on the client side + * `Client.listen()` sends directly on the transport (string-typed + * request id, transport-level demux) rather than via `request()`. */ import type * as z from 'zod/v4'; diff --git a/test/e2e/CLAUDE.md b/test/e2e/CLAUDE.md index d44ba8a03a..586f390698 100644 --- a/test/e2e/CLAUDE.md +++ b/test/e2e/CLAUDE.md @@ -73,7 +73,7 @@ entryExclusions: [{ arm: 'entryModern', reason: 'method-not-in-modern-registry' ``` Omitting `arm` excludes both arms. The reasons (`EntryExclusionReason` in types.ts) are the acceptance checklist for re-admitting cells when the corresponding entry feature lands; a coverage gate rejects annotations that would never have an effect. Requirement families that the -per-request entry structurally cannot serve at all (server→client requests, sessions/resumability, standalone GET streams, subscriptions) are already expressed through their `transports` restrictions and need no annotation. +per-request entry structurally cannot serve at all (server→client requests, sessions/resumability, standalone GET streams) are already expressed through their `transports` restrictions and need no annotation. Arm-specific helpers: `wire()`'s fourth argument also accepts `entry` (createMcpHandler hosting overrides — e.g. a `responseMode` or a different `legacy` posture), the returned `Wired.httpLog` records every HTTP exchange (request body, status, content-type, a readable response clone) for raw wire assertions, factories may accept the optional per-request context (`EntryServerFactory`), and `modernEnvelopeMeta()` builds the envelope for bodies that POST raw 2026-era requests through `wired.fetch`. Compositions that the entry no longer expresses through diff --git a/test/e2e/requirements.ts b/test/e2e/requirements.ts index 48c715ab66..5058dcdd5d 100644 --- a/test/e2e/requirements.ts +++ b/test/e2e/requirements.ts @@ -2667,6 +2667,53 @@ export const REQUIREMENTS: Record = { 'The SDK provides a server-side legacy HTTP+SSE transport so existing SSE deployments can be hosted on SDK components alone.', transports: ['sse'], note: 'This asserts the availability of the server half of the legacy SSE transport (SSEServerTransport from @modelcontextprotocol/server-legacy/sse); the matrix transport arg is ignored, so it runs as a single sse-labelled cell.' + }, + 'subscriptions:listen:ack-first-stamped': { + source: 'https://modelcontextprotocol.io/specification/draft/basic/patterns/subscriptions#acknowledgment', + behavior: + "notifications/subscriptions/acknowledged is the first message on a subscriptions/listen stream and carries the listen request's JSON-RPC id verbatim under the io.modelcontextprotocol/subscriptionId _meta key, plus the honored subset of the requested filter.", + addedInSpecVersion: '2026-07-28', + transports: ['entryModern'], + note: 'Hosted by the test body via createMcpHandler so it can publish via handler.notify.' + }, + 'subscriptions:listen:per-stream-filter': { + source: 'https://modelcontextprotocol.io/specification/draft/basic/patterns/subscriptions#notification-filter', + behavior: + 'A subscriptions/listen stream receives only the notification types its filter explicitly requested; an un-requested type is provably never delivered. Change notifications dispatch to the existing setNotificationHandler registrations.', + addedInSpecVersion: '2026-07-28', + transports: ['entryModern'], + note: 'Hosted by the test body via createMcpHandler so it can publish via handler.notify.' + }, + 'subscriptions:listen:honored-filter-narrows-to-advertised': { + source: 'https://modelcontextprotocol.io/specification/draft/basic/patterns/subscriptions#acknowledgment', + behavior: + "The acknowledged filter on a subscriptions/listen stream is the requested set narrowed against the server's declared listChanged/subscribe capability bits — a requested type the server does not advertise is dropped from honoredFilter and is never delivered.", + addedInSpecVersion: '2026-07-28', + transports: ['entryModern'], + note: 'Hosted by the test body via createMcpHandler so it can publish via handler.notify. A stdio e2e of the modern listen path is not yet feasible without harness changes (the e2e stdio arms wire the standard child-process StdioServerTransport, not the serveStdio entry); stdio narrowing is covered at unit level in serveStdioListen.test.ts.' + }, + 'subscriptions:listen:capacity-guard': { + source: 'sdk', + behavior: + "A subscriptions/listen request is refused with -32603 'Subscription limit reached' (in-band on HTTP 200, before the ack) when the configured maxSubscriptions is reached.", + addedInSpecVersion: '2026-07-28', + transports: ['entryModern'], + note: 'Hosted by the test body via createMcpHandler with maxSubscriptions: 1.' + }, + 'typescript:subscriptions:listChanged-auto-open-modern': { + source: 'sdk', + behavior: + 'ClientOptions.listChanged auto-opens a subscriptions/listen stream on a modern connection — the filter is the intersection of the configured sub-options and the server-advertised listChanged capabilities (auto-open is skipped and autoOpenedSubscription stays undefined when the intersection is empty) — so the configured handlers fire on every published change. The auto-opened subscription is exposed for close.', + addedInSpecVersion: '2026-07-28', + transports: ['entryModern'], + note: 'Hosted by the test body via createMcpHandler so it can publish via handler.notify.' + }, + 'typescript:subscriptions:listen:legacy-era-steer': { + source: 'sdk', + behavior: + 'On a 2025-era connection, Client.listen() throws a typed MethodNotSupportedByProtocolVersion error steering to resources/subscribe and ClientOptions.listChanged before any wire write (no transparent shim).', + removedInSpecVersion: '2026-07-28', + note: 'Runs on the 2025-era arms; the entryModern arm is bound out by the removedInSpecVersion.' } } satisfies Record; diff --git a/test/e2e/scenarios/subscriptions.test.ts b/test/e2e/scenarios/subscriptions.test.ts new file mode 100644 index 0000000000..bbca6105d5 --- /dev/null +++ b/test/e2e/scenarios/subscriptions.test.ts @@ -0,0 +1,161 @@ +/** + * `subscriptions/listen` (SEP-1865, protocol revision 2026-07-28) through the + * public surface: ack-first, subscription-id stamping, per-stream filtering, + * the listChanged auto-open bridge, and the F-12 legacy steer. + * + * The 2026-era cells host `createMcpHandler` themselves (the test publishes + * via `handler.notify.*`); the legacy cell runs on the standard arms. + */ +import { Client, SdkError, SdkErrorCode, StreamableHTTPClientTransport } from '@modelcontextprotocol/client'; +import { createMcpHandler, McpServer, SUBSCRIPTION_ID_META_KEY } from '@modelcontextprotocol/server'; +import { expect } from 'vitest'; +import { z } from 'zod/v4'; + +import { modernEnvelopeMeta, wire } from '../helpers/index.js'; +import { verifies } from '../helpers/verifies.js'; +import type { TestArgs } from '../types.js'; + +function makeServer() { + const server = new McpServer({ name: 'subs-e2e', version: '1' }); + server.registerTool('greet', { inputSchema: z.object({}) }, async () => ({ content: [] })); + return server; +} + +async function hostListen() { + const handler = createMcpHandler(() => makeServer(), { legacy: 'reject', keepAliveMs: 0 }); + const url = new URL('http://in-process/mcp'); + const fetch = (u: URL | string, init?: RequestInit) => handler.fetch(new Request(u, init)); + const client = new Client({ name: 'subs-e2e-client', version: '1' }, { versionNegotiation: { mode: 'auto' } }); + await client.connect(new StreamableHTTPClientTransport(url, { fetch })); + expect(client.getNegotiatedProtocolVersion()).toBe('2026-07-28'); + return { + client, + handler, + fetch, + url, + [Symbol.asyncDispose]: () => Promise.all([client.close(), handler.close()]).then(() => {}) + }; +} + +verifies('subscriptions:listen:ack-first-stamped', async () => { + const handler = createMcpHandler(() => makeServer(), { legacy: 'reject', keepAliveMs: 0 }); + const response = await handler.fetch( + new Request('http://in-process/mcp', { + method: 'POST', + headers: { 'Content-Type': 'application/json', Accept: 'application/json, text/event-stream' }, + body: JSON.stringify({ + jsonrpc: '2.0', + id: 'sub-1', + method: 'subscriptions/listen', + params: { _meta: modernEnvelopeMeta(), notifications: { toolsListChanged: true } } + }) + }) + ); + expect(response.headers.get('Content-Type')).toBe('text/event-stream'); + const reader = response.body!.getReader(); + const { value } = await reader.read(); + const frame = new TextDecoder().decode(value); + const ack = JSON.parse(frame.slice(frame.indexOf('data: ') + 6, frame.indexOf('\n\n'))) as { + method: string; + params: { _meta: Record; notifications: unknown }; + }; + expect(ack.method).toBe('notifications/subscriptions/acknowledged'); + expect(ack.params._meta[SUBSCRIPTION_ID_META_KEY]).toBe('sub-1'); + expect(ack.params.notifications).toEqual({ toolsListChanged: true }); + await reader.cancel(); + await handler.close(); +}); + +verifies('subscriptions:listen:per-stream-filter', async () => { + await using h = await hostListen(); + const seen: string[] = []; + h.client.setNotificationHandler('notifications/tools/list_changed', () => void seen.push('tools')); + h.client.setNotificationHandler('notifications/prompts/list_changed', () => void seen.push('prompts')); + const sub = await h.client.listen({ toolsListChanged: true }); + h.handler.notify.promptsChanged(); + h.handler.notify.toolsChanged(); + await new Promise(r => setTimeout(r, 30)); + // The un-requested type was provably never delivered. + expect(seen).toEqual(['tools']); + await sub.close(); +}); + +verifies('typescript:subscriptions:listChanged-auto-open-modern', async () => { + const handler = createMcpHandler(() => makeServer(), { legacy: 'reject', keepAliveMs: 0 }); + const fetch = (u: URL | string, init?: RequestInit) => handler.fetch(new Request(u, init)); + let count = 0; + let done!: () => void; + const finished = new Promise(r => { + done = r; + }); + const client = new Client( + { name: 'subs-e2e-client', version: '1' }, + { + versionNegotiation: { mode: 'auto' }, + listChanged: { tools: { autoRefresh: false, onChanged: () => (++count >= 1 ? done() : undefined) } } + } + ); + await client.connect(new StreamableHTTPClientTransport(new URL('http://in-process/mcp'), { fetch })); + expect(client.autoOpenedSubscription?.honoredFilter).toEqual({ toolsListChanged: true }); + handler.notify.toolsChanged(); + await finished; + expect(count).toBe(1); + await client.autoOpenedSubscription!.close(); + await client.close(); + await handler.close(); +}); + +verifies('typescript:subscriptions:listen:legacy-era-steer', async ({ transport }: TestArgs) => { + const client = new Client({ name: 'c', version: '0' }); + await using _ = await wire(transport, makeServer, client); + const error = await client.listen({ toolsListChanged: true }).catch(error_ => error_ as SdkError); + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.MethodNotSupportedByProtocolVersion); + expect((error as SdkError).message).toContain('resources/subscribe'); +}); + +verifies('subscriptions:listen:honored-filter-narrows-to-advertised', async () => { + // makeServer registers a tool but no prompts/resources: a listen requesting + // toolsListChanged + promptsListChanged + resourcesListChanged must come + // back honored as toolsListChanged only — the ack reflects only what the + // server advertises. + await using h = await hostListen(); + const sub = await h.client.listen({ toolsListChanged: true, promptsListChanged: true, resourcesListChanged: true }); + expect(sub.honoredFilter).toEqual({ toolsListChanged: true }); + // And nothing the server doesn't advertise reaches the stream: the entry + // delivers via the same narrowed filter it acknowledged. + const seen: string[] = []; + h.client.setNotificationHandler('notifications/prompts/list_changed', () => void seen.push('prompts')); + h.client.setNotificationHandler('notifications/tools/list_changed', () => void seen.push('tools')); + h.handler.notify.promptsChanged(); + h.handler.notify.toolsChanged(); + await new Promise(r => setTimeout(r, 30)); + expect(seen).toEqual(['tools']); + await sub.close(); +}); + +verifies('subscriptions:listen:capacity-guard', async () => { + const handler = createMcpHandler(() => makeServer(), { legacy: 'reject', keepAliveMs: 0, maxSubscriptions: 1 }); + const post = (id: number) => + handler.fetch( + new Request('http://in-process/mcp', { + method: 'POST', + headers: { 'Content-Type': 'application/json', Accept: 'application/json, text/event-stream' }, + body: JSON.stringify({ + jsonrpc: '2.0', + id, + method: 'subscriptions/listen', + params: { _meta: modernEnvelopeMeta(), notifications: {} } + }) + }) + ); + const first = await post(1); + expect(first.headers.get('Content-Type')).toBe('text/event-stream'); + const second = await post(2); + expect(second.headers.get('Content-Type')).toContain('application/json'); + const body = (await second.json()) as { error: { code: number; message: string } }; + expect(body.error.code).toBe(-32_603); + expect(body.error.message).toBe('Subscription limit reached'); + await first.body!.cancel(); + await handler.close(); +});