diff --git a/Cargo.toml b/Cargo.toml index 6105f39..df2ca99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,13 @@ required-features = ["rmcp", "test-utils"] name = "transport_integration" required-features = ["test-utils"] +[[test]] +name = "oversized_timeout_e2e" +required-features = ["rmcp", "test-utils"] + [dev-dependencies] +# `start_paused` deterministic-time tests (tokio's "full" does not include test-util). +tokio = { version = "1", features = ["test-util"] } tokio-test = "0.4" anyhow = "1" schemars = "0.8" diff --git a/README.md b/README.md index aae7a56..a5fa746 100644 --- a/README.md +++ b/README.md @@ -206,6 +206,12 @@ The in-repo Rust SDK guides live in [`docs/README.md`](docs/README.md): Encryption uses **NIP-44** for payload encryption and **NIP-59** (Gift Wrap) for metadata-private delivery. Server announcements (kinds 11316–11320) are always public. +Messages too large for a single relay event are fragmented into ordered frames +and reassembled by the receiver (CEP-22 oversized transfer, enabled by default; +it adds no event kind — frames ride inside `notifications/progress` messages). +See [docs/oversized-transfer.md](docs/oversized-transfer.md) for the timeout +model and tuning. + ### Server Transport Config | Field | Default | Description | @@ -221,6 +227,7 @@ metadata-private delivery. Server announcements (kinds 11316–11320) are always | `bootstrap_relay_urls` | `None` | Additional relays for publishing announcements (CEP-6/17) | | `publish_relay_list` | `true` | Whether to publish kind 10002 relay list metadata | | `profile_metadata` | `None` | Profile metadata for kind 0 publication (CEP-23) | +| `oversized_transfer` | enabled | CEP-22 oversized payload transfer config ([guide](docs/oversized-transfer.md)) | ### Client Transport Config @@ -233,6 +240,7 @@ metadata-private delivery. Server announcements (kinds 11316–11320) are always | `timeout` | `30s` | Response timeout | | `discovery_relay_urls` | `None` (bootstrap relays) | Relays for CEP-17 kind 10002 discovery | | `fallback_operational_relay_urls` | `None` | Relays probed in parallel with CEP-17 discovery | +| `oversized_transfer` | enabled | CEP-22 oversized payload transfer config ([guide](docs/oversized-transfer.md)) | When `relay_urls` is empty, `start()` runs automatic relay resolution: configured relays > nprofile hints > CEP-17 kind 10002 discovery > fallback probing > bootstrap defaults. diff --git a/docs/README.md b/docs/README.md index 400ac2c..6f4439d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -22,6 +22,7 @@ For most native Rust applications, the primary entry points are `NostrServerTran - Encryption guide: plaintext, encrypted, and gift-wrap behavior - Stateless mode guide: client-side initialize emulation and when to use it - Discovery guide: public discovery helpers and event kinds +- Oversized transfer guide: CEP-22 fragmentation, the three-timer model, and progress-aware request options ### Bridging existing MCP applications diff --git a/docs/oversized-transfer.md b/docs/oversized-transfer.md new file mode 100644 index 0000000..9a35362 --- /dev/null +++ b/docs/oversized-transfer.md @@ -0,0 +1,128 @@ +# Oversized Transfer Guide (CEP-22) + +Nostr relays cap event sizes (commonly around 64 KiB on the wire, with a +65,535-byte NIP-44 plaintext ceiling for encrypted payloads), while MCP +results — file contents, tool output, resource reads — routinely exceed that. +CEP-22 closes the gap: a JSON-RPC message whose published form would not fit +in a single event is split into an ordered sequence of frames carried inside +MCP `notifications/progress` events, then reassembled and validated +(byte-length + SHA-256) by the receiver before it surfaces as one ordinary +message. Fragmentation is transparent to MCP consumers in both directions: +client→server requests and server→client responses. + +## Enabled by default + +Oversized transfer is **enabled by default** (matching the TypeScript SDK). +The negotiation gates make this safe with peers that don't support it: + +- the **client** fragments only requests that carry a `progressToken` (rmcp + stamps one into every outgoing request), and advertises + `support_oversized_transfer` on its first message; +- the **server** fragments responses only for clients that advertised the + capability, and advertises it on announcements and its first response. + +A peer without CEP-22 support just sees one extra discovery tag. To opt out: + +```rust +// Whole-config form: +let config = NostrClientTransportConfig::default() + .with_oversized_enabled(false); +// Same builder exists on NostrServerTransportConfig. +``` + +## Configuration + +`OversizedTransferConfig` is attached to both transport configs via +`with_oversized_transfer(..)` (or the `with_oversized_enabled(..)` shorthand): + +| Field | Default | Description | +|----------------------------|--------------|-----------------------------------------------------------------------------| +| `enabled` | `true` | Master gate: advertise + activate the capability | +| `threshold` | `48_000` | Published byte size at/above which the sender fragments | +| `chunk_size` | `48_000` | Upper bound on per-chunk payload bytes (shrunk automatically so every published frame stays under `threshold`) | +| `max_transfer_bytes` | `104_857_600` (100 MiB) | Receiver cap on a reassembled payload | +| `max_transfer_chunks` | `10_000` | Receiver cap on chunk count | +| `max_concurrent_transfers` | `64` | Receiver cap on simultaneously active transfers | +| `transfer_timeout_ms` | `300_000` | Receiver-side hard deadline per transfer, from admission; `0` disables the watchdog | +| `max_out_of_order_window` | `21` | How far ahead of the contiguous frontier a chunk may arrive and still be buffered | +| `max_out_of_order_chunks` | `42` | Cap on buffered out-of-order chunks | +| `accept_timeout_ms` | `30_000` | How long an uploading client waits for the server's `accept` handshake | + +The decision to fragment is made on the **final published event size** +(signed, JSON-escaped, and gift-wrapped when encryption is on), so `threshold` +is a real wire budget, not a payload-length heuristic. + +## The three timers + +Three independent timers govern a transfer; knowing who owns each one makes +timeout behavior predictable: + +1. **Requester idle timeout** (rmcp, per request — opt-in). Fails the call if + no progress arrives for `idle`. The transports forward every inbound + transfer frame to the requester as a plain progress notification, so a + *live* transfer resets this timer chunk by chunk while a *stalled* one + fails after `idle`. +2. **Requester max-total timeout** (rmcp, per request — opt-in). Hard cap on + the whole call regardless of progress — a trickling transfer cannot hold a + request open forever. +3. **Receiver watchdog** (`transfer_timeout_ms`, transport-owned). A hard + memory bound on inbound reassembly state, measured from `start` admission + and never refreshed by activity. Reaping is local-only — no abort frame is + emitted; the requester's own timers fail the other side. A reaped token is + re-admittable by a fresh `start`. + +The first two exist only when you pass request options — **a plain rmcp +`call_tool` has no timeout at all** (infinite await). That is the main +consumer footgun this SDK papers over: + +## Recommended MCP client usage + +```rust +use std::time::Duration; +use contextvm_sdk::{progress_aware_options, PeerRequestOptionsExt}; + +let result = running_client + .peer() + .call_tool_with_options( + params, + progress_aware_options(Duration::from_secs(60), Duration::from_secs(300)), + ) + .await?; +``` + +`progress_aware_options(idle, max_total)` builds +`PeerRequestOptions::with_timeout(idle).reset_timeout_on_progress().with_max_total_timeout(max_total)`. +The defaults `DEFAULT_OVERSIZED_IDLE_TIMEOUT` (60 s) and +`DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT` (300 s) intentionally mirror the +transport's numbers (60 s covers the worst-case accept wait; 300 s matches the +receiver watchdog so both sides give up in the same window) — but the peer +layer cannot read transport config, so re-align them manually if you tune +`OversizedTransferConfig`. + +Unlike the TypeScript SDK's low-level `client.request()` path, rmcp's timer +reset is not tied to registering an `onprogress` callback — the reset is wired +through request options alone. + +For request types beyond `call_tool`, the generic form is two lines on public +rmcp API: + +```rust +let handle = peer.send_cancellable_request(request, options).await?; +let response = handle.await_response().await?; +``` + +### Upload (client→server) caveat + +A fragmented *request* receives at most **one** inbound reset: the server's +`accept` handshake frame — and it reaches the rmcp service loop only after the +whole upload send returns. Size `idle` and `max_total` to cover the full +upload duration, not just inter-frame gaps. + +## Synthetic progress notifications + +Because transfer frames are forwarded to the requester (stripped of their +`cvm` payload, with the request's original `progressToken` restored), +consumers with a custom progress handler observe chunk-granular progress for +oversized responses — usable as transfer-progress UX. Default rmcp handlers +ignore progress for tokens they didn't register, so no action is needed if you +don't want it. Nothing extra goes on the wire; the forwarding is in-process. diff --git a/src/lib.rs b/src/lib.rs index d8aab47..c231863 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,3 +90,10 @@ pub use transport::server::{ // ── rmcp re-export ────────────────────────────────────────────────── #[cfg(feature = "rmcp")] pub use rmcp; + +// ── CEP-22 progress-aware request helpers ─────────────────────────── +#[cfg(feature = "rmcp")] +pub use rmcp_transport::progress::{ + progress_aware_options, PeerRequestOptionsExt, DEFAULT_OVERSIZED_IDLE_TIMEOUT, + DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, +}; diff --git a/src/rmcp_transport/mod.rs b/src/rmcp_transport/mod.rs index 436f1dd..8cc1211 100644 --- a/src/rmcp_transport/mod.rs +++ b/src/rmcp_transport/mod.rs @@ -4,6 +4,7 @@ //! ContextVM transports plug directly into rmcp service APIs. pub mod convert; +pub mod progress; pub mod transport; pub mod worker; @@ -14,4 +15,8 @@ pub use convert::{ internal_to_rmcp_client_rx, internal_to_rmcp_server_rx, rmcp_client_tx_to_internal, rmcp_server_tx_to_internal, }; +pub use progress::{ + progress_aware_options, PeerRequestOptionsExt, DEFAULT_OVERSIZED_IDLE_TIMEOUT, + DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, +}; pub use worker::{NostrClientWorker, NostrServerWorker}; diff --git a/src/rmcp_transport/progress.rs b/src/rmcp_transport/progress.rs new file mode 100644 index 0000000..07914d0 --- /dev/null +++ b/src/rmcp_transport/progress.rs @@ -0,0 +1,170 @@ +//! CEP-22: progress-aware request options for rmcp consumers. +//! +//! Under the rmcp fork, plain [`Peer`] calls such as `call_tool` use +//! `PeerRequestOptions::no_options()` — **no timeout at all**: a stalled +//! oversized response hangs the caller forever. This module closes that gap +//! the way the TS SDK's docs do — by passing request options on the existing +//! call — via an extension trait carrying an options-taking `call_tool` +//! variant, plus a constructor for the recommended progress-aware settings. +//! +//! With CEP-22 enabled, the Nostr transports forward each inbound transfer +//! frame to the requester as a plain progress notification, so an idle +//! timeout built by [`progress_aware_options`] resets on every chunk: a live +//! transfer can run long, a stalled one fails after `idle`, and +//! `max_total` caps the call regardless of progress. +//! +//! ```ignore +//! use contextvm_sdk::{progress_aware_options, PeerRequestOptionsExt}; +//! +//! let result = running_service +//! .peer() +//! .call_tool_with_options( +//! params, +//! progress_aware_options( +//! contextvm_sdk::DEFAULT_OVERSIZED_IDLE_TIMEOUT, +//! contextvm_sdk::DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, +//! ), +//! ) +//! .await?; +//! ``` + +use std::time::Duration; + +use rmcp::model::{ + CallToolRequest, CallToolRequestParams, CallToolResult, ClientRequest, ServerResult, +}; +use rmcp::service::{Peer, PeerRequestOptions, ServiceError}; +use rmcp::RoleClient; + +/// Default requester-side idle timeout for requests that may trigger a CEP-22 +/// oversized transfer: 60 s. +/// +/// TS parity twice over: equals the upstream TS SDK's blanket per-request +/// timeout (`DEFAULT_REQUEST_TIMEOUT_MSEC`), and exceeds the worst-case +/// inter-chunk gap including the 30 s accept wait +/// ([`DEFAULT_ACCEPT_TIMEOUT_MS`](crate::transport::oversized_transfer::DEFAULT_ACCEPT_TIMEOUT_MS)). +pub const DEFAULT_OVERSIZED_IDLE_TIMEOUT: Duration = Duration::from_secs(60); + +/// Default requester-side max-total timeout: 300 s. +/// +/// Aligned with the receiver-side watchdog default +/// ([`DEFAULT_TRANSFER_TIMEOUT_MS`](crate::transport::oversized_transfer::DEFAULT_TRANSFER_TIMEOUT_MS)) +/// so the requester gives up in the same window the receiver reaps state — +/// symmetric failure. (Upstream TS sets no max-total default; here it stays +/// opt-in via [`progress_aware_options`], never baked into plain calls.) +pub const DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT: Duration = Duration::from_secs(300); + +/// Build the recommended [`PeerRequestOptions`] for requests whose responses +/// may arrive as CEP-22 oversized transfers: an `idle` timeout that resets on +/// every progress notification, capped by `max_total`. +/// +/// Equivalent to +/// `PeerRequestOptions::with_timeout(idle).reset_timeout_on_progress().with_max_total_timeout(max_total)`. +/// Named after the mechanism (progress-aware timeouts), not the oversized use +/// case — mirroring the TS SDK, where "oversized" appears in docs but never in +/// the API surface. +/// +/// Sizing notes: +/// - `reset_timeout_on_progress` without an idle timeout is a **no-op** — the +/// fork registers a progress watcher only when *both* are set, which is why +/// this constructor takes `idle` rather than making it optional. +/// - The client→server upload direction receives at most **one** inbound +/// reset (the server's `accept` handshake frame) — and it reaches the rmcp +/// service loop only after the whole upload send returns. Size `idle` and +/// `max_total` to cover the full upload duration, not just the gaps. +/// - Sensible defaults: [`DEFAULT_OVERSIZED_IDLE_TIMEOUT`] / +/// [`DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT`]. They intentionally mirror the +/// transport's `OversizedTransferConfig` numbers but are not read from it — +/// the peer layer has no access to transport config by design; align them +/// manually if you tune the transport. +pub fn progress_aware_options(idle: Duration, max_total: Duration) -> PeerRequestOptions { + PeerRequestOptions::with_timeout(idle) + .reset_timeout_on_progress() + .with_max_total_timeout(max_total) +} + +/// Options-taking call variants for [`Peer`] — the rs-side analog +/// of passing `RequestOptions` inline to the TS SDK's `client.callTool`. +/// +/// Without these, high-level fork calls (`peer.call_tool(..)` etc.) run with +/// `PeerRequestOptions::no_options()`: **no timeout, infinite await**. Pair +/// with [`progress_aware_options`] for any call whose response may be large +/// (CEP-22 fragments every rmcp request's response once the peer advertises +/// support — rmcp stamps a progress token into every outgoing request). +/// +/// For request types without a dedicated variant here, the generic path is +/// two lines on public fork API — no wrapper needed: +/// +/// ```ignore +/// let handle = peer.send_cancellable_request(request, options).await?; +/// let response = handle.await_response().await?; +/// ``` +/// +/// On timeout the call fails with `ServiceError::Timeout { timeout }` (the +/// value identifies which timer fired: `idle` vs `max_total`) and rmcp +/// publishes a `notifications/cancelled` for the request. +pub trait PeerRequestOptionsExt { + /// `call_tool` with explicit [`PeerRequestOptions`] — the direct analog of + /// TS `client.callTool(params, schema, options)`. + fn call_tool_with_options( + &self, + params: CallToolRequestParams, + options: PeerRequestOptions, + ) -> impl std::future::Future> + Send; +} + +impl PeerRequestOptionsExt for Peer { + async fn call_tool_with_options( + &self, + params: CallToolRequestParams, + options: PeerRequestOptions, + ) -> Result { + // Mirrors the fork's `method!` expansion for `call_tool` + // (service/client.rs), with options threaded through + // `send_cancellable_request` instead of the option-less default. + let result = self + .send_cancellable_request( + ClientRequest::CallToolRequest(CallToolRequest::new(params)), + options, + ) + .await? + .await_response() + .await?; + match result { + ServerResult::CallToolResult(result) => Ok(result), + _ => Err(ServiceError::UnexpectedResponse), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Pins the `progress_aware_options` wiring: the constructor must (a) set the + /// idle `timeout`, (b) enable `reset_timeout_on_progress` — without which the + /// fork registers no progress watcher (`send_request_with_option` only arms + /// one when *both* the flag and a timeout are set), so inbound chunks would + /// never reset the timer — and (c) populate `max_total_timeout`. Distinct + /// idle/max-total values also catch an accidental argument swap in the + /// builder chain. End-to-end behavior is covered by the timeout tests in + /// `tests/oversized_timeout_e2e.rs`; this is the fast unit guard on the flags. + #[test] + fn progress_aware_options_sets_reset_and_both_timeouts() { + let idle = Duration::from_millis(250); + let max_total = Duration::from_secs(42); + + let options = progress_aware_options(idle, max_total); + + assert_eq!(options.timeout, Some(idle), "idle timeout must be set"); + assert!( + options.reset_timeout_on_progress, + "reset_timeout_on_progress must be enabled or the fork arms no progress watcher" + ); + assert_eq!( + options.max_total_timeout, + Some(max_total), + "max_total_timeout must be set" + ); + } +} diff --git a/src/transport/client/correlation_store.rs b/src/transport/client/correlation_store.rs index 2b1ddd2..6ee6558 100644 --- a/src/transport/client/correlation_store.rs +++ b/src/transport/client/correlation_store.rs @@ -97,8 +97,8 @@ impl ClientCorrelationStore { } /// Refresh a pending request's registration timestamp without disturbing its - /// `original_id`/`is_initialize`. Used by CEP-22 oversized transfers (OD-2): - /// each inbound frame "touches" the entry so [`sweep_expired`](Self::sweep_expired) + /// `original_id`/`is_initialize`. Used by CEP-22 oversized transfers: each + /// inbound frame "touches" the entry so [`sweep_expired`](Self::sweep_expired) /// does not evict it mid-transfer. Returns `true` if the entry existed. pub async fn touch(&self, event_id: &str) -> bool { let mut cache = self.pending_requests.write().await; diff --git a/src/transport/client/mod.rs b/src/transport/client/mod.rs index 4f87f5c..a88473b 100644 --- a/src/transport/client/mod.rs +++ b/src/transport/client/mod.rs @@ -31,9 +31,9 @@ use crate::relay::{RelayPool, RelayPoolTrait}; use crate::transport::base::BaseTransport; use crate::transport::discovery_tags::{parse_discovered_peer_capabilities, PeerCapabilities}; use crate::transport::oversized_transfer::{ - build_oversized_frames, resolve_safe_chunk_size, send_oversized_transfer, OversizedFrame, - OversizedSenderOptions, OversizedTransferConfig, OversizedTransferReceiver, - NOTIFICATIONS_PROGRESS_METHOD, + build_oversized_frames, progress_token_string, resolve_safe_chunk_size, + send_oversized_transfer, OversizedFrame, OversizedSenderOptions, OversizedTransferConfig, + OversizedTransferReceiver, NOTIFICATIONS_PROGRESS_METHOD, }; const LOG_TARGET: &str = "contextvm_sdk::transport::client"; @@ -66,7 +66,7 @@ pub struct NostrClientTransportConfig { pub discovery_relay_urls: Option>, /// Non-authoritative operational relays probed in parallel with CEP-17 discovery. pub fallback_operational_relay_urls: Option>, - /// CEP-22 oversized payload transfer configuration. Disabled by default. + /// CEP-22 oversized payload transfer configuration. Enabled by default. pub oversized_transfer: OversizedTransferConfig, } @@ -171,6 +171,14 @@ pub struct NostrClientTransport { /// `send()` awaiting the server's `accept` registers a one-shot here before /// publishing `start`; the event loop fires it when the `accept` frame arrives. accept_waiters: Arc>>>, + /// CEP-22: original `_meta.progressToken` JSON values of sent + /// oversized-eligible requests, keyed by their stringified form. Frames + /// stringify tokens on the wire (both SDKs), so the original value — + /// `Number` for rmcp-issued tokens — survives only here; progress forwarded + /// to the requester must restore it for rmcp's watcher lookup to match + /// (`Number(5)` ≠ `String("5")`). LRU-bounded; entries are dropped when + /// their transfer concludes and cleared on [`close`](Self::close). + original_progress_tokens: Arc>>, /// Channel for receiving processed MCP messages from the event loop. message_tx: Option>, message_rx: Option>, @@ -233,6 +241,9 @@ impl NostrClientTransport { (&config.oversized_transfer).into(), ))); let accept_waiters = Arc::new(Mutex::new(HashMap::new())); + let original_progress_tokens = Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"), + ))); Ok(Self { base: BaseTransport { @@ -242,6 +253,7 @@ impl NostrClientTransport { }, oversized_receiver, accept_waiters, + original_progress_tokens, config, server_pubkey, hinted_relay_urls, @@ -303,6 +315,9 @@ impl NostrClientTransport { (&config.oversized_transfer).into(), ))); let accept_waiters = Arc::new(Mutex::new(HashMap::new())); + let original_progress_tokens = Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"), + ))); Ok(Self { base: BaseTransport { @@ -312,6 +327,7 @@ impl NostrClientTransport { }, oversized_receiver, accept_waiters, + original_progress_tokens, config, server_pubkey, hinted_relay_urls, @@ -403,6 +419,8 @@ impl NostrClientTransport { let seen_gift_wrap_ids = self.seen_gift_wrap_ids.clone(); let oversized_receiver = self.oversized_receiver.clone(); let accept_waiters = self.accept_waiters.clone(); + let original_progress_tokens = self.original_progress_tokens.clone(); + let oversized_enabled = self.config.oversized_transfer.enabled; let timeout = self.config.timeout; let token = self.cancellation_token.child_token(); @@ -420,6 +438,8 @@ impl NostrClientTransport { seen_gift_wrap_ids, oversized_receiver, accept_waiters, + original_progress_tokens, + oversized_enabled, timeout, token, ) @@ -458,6 +478,13 @@ impl NostrClientTransport { }; waiters.clear(); } + { + let mut originals = match self.original_progress_tokens.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + originals.clear(); + } self.base.disconnect().await } @@ -491,18 +518,25 @@ impl NostrClientTransport { // CEP-22: only a request carrying a `progressToken` is eligible for oversized // fragmentation (the token addresses the frames); extract it once up front. + // Tokens may be JSON strings or numbers (rmcp issues numbers): the + // stringified form keys all transport state, and the original value is + // recorded so progress forwarded to the requester can restore the token's + // wire type. let oversized_token: Option = if is_request && self.config.oversized_transfer.enabled { - match message { + let original = match message { JsonRpcMessage::Request(req) => req .params .as_ref() .and_then(|p| p.get("_meta")) - .and_then(|m| m.get("progressToken")) - .and_then(|t| t.as_str()) - .map(String::from), + .and_then(|m| m.get("progressToken")), _ => None, + }; + let token = original.and_then(progress_token_string); + if let (Some(token), Some(original)) = (token.as_deref(), original) { + self.record_original_progress_token(token, original); } + token } else { None }; @@ -665,7 +699,7 @@ impl NostrClientTransport { /// /// Builds `start → chunks… → end` frames, registers an `accept` waiter before /// publishing `start` when the server's support is not yet known, drives the - /// §1 [`send_oversized_transfer`] sequencer, and registers the pending request + /// [`send_oversized_transfer`] sequencer, and registers the pending request /// against the **end** frame's event id (the value the server correlates its /// response to). One-shot discovery tags ride the `start` frame only. async fn send_oversized_request( @@ -801,6 +835,98 @@ impl NostrClientTransport { Ok(()) } + /// CEP-22: record the original `_meta.progressToken` value of an + /// outbound request under its stringified form, replacing any stale entry + /// for the same key. See [`Self::original_progress_tokens`]. + fn record_original_progress_token(&self, token: &str, original: &serde_json::Value) { + let mut originals = match self.original_progress_tokens.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + originals.push(token.to_string(), original.clone()); + } + + /// CEP-22: drop — and return — the original `progressToken` value + /// recorded for `token`, once its transfer concludes (delivered or failed). + fn remove_original_progress_token( + originals: &Mutex>, + token: Option<&str>, + ) -> Option { + let token = token?; + let mut originals = match originals.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + originals.pop(token) + } + + /// CEP-22: look up — without removing — the original `progressToken` + /// value recorded for `token`, promoting its LRU recency so an in-flight + /// transfer's record outlives idle ones. + fn original_progress_token( + originals: &Mutex>, + token: &str, + ) -> Option { + let mut originals = match originals.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + originals.get(token).cloned() + } + + /// CEP-22: build the plain `notifications/progress` forwarded to the + /// local consumer for an inbound oversized-transfer frame: `progress` is + /// copied verbatim (plus `total`/`message` when present), the `cvm` frame + /// payload is omitted, and `progressToken` is set to `original_token` — + /// the value recorded at send time, NOT the frame's wire token. The + /// wire stringifies every token, but rmcp's progress-watcher map is keyed + /// by exact JSON type (`Number(5)` ≠ `String("5")`), so only the recorded + /// original resets the requester's idle timer. Returns `None` when the + /// frame has no `progress` (malformed; nothing worth forwarding). + fn stripped_progress_notification( + params: &serde_json::Value, + original_token: &serde_json::Value, + ) -> Option { + let mut stripped = serde_json::Map::new(); + stripped.insert("progressToken".to_string(), original_token.clone()); + stripped.insert("progress".to_string(), params.get("progress")?.clone()); + for key in ["total", "message"] { + if let Some(value) = params.get(key) { + stripped.insert(key.to_string(), value.clone()); + } + } + Some(JsonRpcMessage::Notification(JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: NOTIFICATIONS_PROGRESS_METHOD.to_string(), + params: Some(serde_json::Value::Object(stripped)), + })) + } + + /// CEP-22: forward one stripped progress notification for + /// the oversized frame `notif` onto the consumer channel, restoring the + /// token recorded at send time. Falls back to the wire token for + /// transfers with no record (e.g. a transfer addressed to a token this + /// transport never sent); rmcp ignores tokens it never issued, so the + /// fallback forward is harmless. + fn forward_stripped_progress( + notif: &JsonRpcNotification, + token: &str, + originals: &Mutex>, + tx: &tokio::sync::mpsc::UnboundedSender, + ) { + let Some(params) = notif.params.as_ref() else { + return; + }; + let Some(original) = Self::original_progress_token(originals, token) + .or_else(|| params.get("progressToken").cloned()) + else { + return; + }; + if let Some(stripped) = Self::stripped_progress_notification(params, &original) { + let _ = tx.send(stripped); + } + } + /// Take the message receiver for consuming incoming messages. pub fn take_message_receiver( &mut self, @@ -844,6 +970,8 @@ impl NostrClientTransport { seen_gift_wrap_ids: Arc>>, oversized_receiver: Arc>, accept_waiters: Arc>>>, + original_progress_tokens: Arc>>, + oversized_enabled: bool, timeout: Duration, cancel: CancellationToken, ) { @@ -888,6 +1016,7 @@ impl NostrClientTransport { &seen_gift_wrap_ids, &oversized_receiver, &accept_waiters, + &original_progress_tokens, &relay_pool, ) .await; @@ -902,6 +1031,28 @@ impl NostrClientTransport { "Swept stale pending requests (rmcp handles timeout errors)" ); } + // CEP-22: reap inbound transfers past their hard deadline. + // Local-only (no abort frame is emitted): the requester's + // own timeout fails the call, and late frames are + // orphan-ignored. `remove_expired` no-ops when + // `transfer_timeout_ms` is 0; the sync guard is dropped + // before anything awaits. + if oversized_enabled { + let reaped = { + let mut receiver = match oversized_receiver.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + receiver.remove_expired() + }; + for token in reaped { + tracing::warn!( + target: LOG_TARGET, + token = %token, + "Oversized transfer reaped by watchdog" + ); + } + } } } } @@ -1033,6 +1184,7 @@ impl NostrClientTransport { seen_gift_wrap_ids: &Arc>>, oversized_receiver: &Arc>, accept_waiters: &Arc>>>, + original_progress_tokens: &Arc>>, relay_pool: &Arc, ) { let event = match notification { @@ -1185,11 +1337,13 @@ impl NostrClientTransport { if notif.method == NOTIFICATIONS_PROGRESS_METHOD && OversizedTransferReceiver::is_oversized_frame(¬if) { + // Token extraction accepts string or number — defensive only: + // every known sender stringifies tokens into frames. let token = notif .params .as_ref() .and_then(|p| p.get("progressToken")) - .and_then(|t| t.as_str()); + .and_then(progress_token_string); // Route `accept` frames to the waiting sender by progressToken // (their e-tag is the start-frame id, which is not in `pending`). @@ -1200,7 +1354,7 @@ impl NostrClientTransport { .and_then(OversizedFrame::from_cvm_value) .is_some_and(|f| matches!(f, OversizedFrame::Accept)); if is_accept { - if let Some(token) = token { + if let Some(ref token) = token { let waiter = { let mut waiters = match accept_waiters.lock() { Ok(g) => g, @@ -1210,35 +1364,82 @@ impl NostrClientTransport { }; if let Some(waiter) = waiter { let _ = waiter.send(()); + // The accept is the one inbound frame of a + // client→server upload — forward it (stripped) so + // the requester's idle timer re-arms for the + // response-wait phase. Only for a live waiter: a + // duplicate or stray accept must not poke the timer. + Self::forward_stripped_progress( + ¬if, + token, + original_progress_tokens, + tx, + ); } } return; } - // OD-2: touch the pending entry so the sweep does not evict the + // Touch the pending entry so the sweep does not evict the // request mid-transfer (chunks do not otherwise refresh it). if let Some(ref correlated_id) = e_tag { pending.touch(correlated_id.as_str()).await; } // Feed the frame to the reassembler (process_frame is sync; the - // guard is dropped before any await). - let outcome = { + // guard is dropped before any await or channel send). + let (outcome, tracked) = { let mut receiver = match oversized_receiver.lock() { Ok(g) => g, Err(p) => p.into_inner(), }; - receiver.process_frame(¬if) + let outcome = receiver.process_frame(¬if); + // Zombie guard: forward progress only for transfers still + // tracked after this frame — a late/orphan frame must not + // keep a dead request's idle timer alive. + let tracked = token + .as_deref() + .is_some_and(|token| receiver.is_tracking(token)); + (outcome, tracked) }; match outcome { - // start/chunk consumed — do NOT touch `pending` or validate. - Ok(None) => return, + // start/chunk consumed — forward a stripped (cvm-less) + // progress notification carrying the original token so the + // requester's progress-aware idle timeout resets. + Ok(None) => { + if tracked { + if let Some(ref token) = token { + Self::forward_stripped_progress( + ¬if, + token, + original_progress_tokens, + tx, + ); + } + } + return; + } // end frame: deliver the reassembled (already-validated, may - // exceed 1 MB) message and clear the pending entry. + // exceed 1 MB) message and clear the pending entry. No extra + // progress forward — the response itself resolves the request. Ok(Some(message)) => { if let Some(ref correlated_id) = e_tag { pending.remove(correlated_id.as_str()).await; + } else { + // Matches the TS SDK: an oversized response that + // reassembles without a correlation `e` tag is still + // delivered (rmcp matches it by JSON-RPC id), but the + // missing transport-level correlation is worth a warn. + tracing::warn!( + target: LOG_TARGET, + "Oversized transfer completed without a correlation `e` tag; \ + delivering the reassembled response uncorrelated" + ); } + Self::remove_original_progress_token( + original_progress_tokens, + token.as_deref(), + ); let _ = tx.send(message); return; } @@ -1249,6 +1450,10 @@ impl NostrClientTransport { error = %error, "Inbound oversized transfer failed" ); + Self::remove_original_progress_token( + original_progress_tokens, + token.as_deref(), + ); return; } } @@ -1615,6 +1820,9 @@ mod tests { seen_gift_wrap_ids: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(10).unwrap()))), oversized_receiver: Arc::new(Mutex::new(OversizedTransferReceiver::new())), accept_waiters: Arc::new(Mutex::new(HashMap::new())), + original_progress_tokens: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(10).unwrap(), + ))), message_tx: Some(tokio::sync::mpsc::unbounded_channel().0), message_rx: None, cancellation_token: CancellationToken::new(), @@ -1637,9 +1845,14 @@ mod tests { let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); let tags = t.get_client_capability_tags(); let names = tag_names(&tags); + // The oversized tag (default-on) is pushed last. assert_eq!( names, - vec!["support_encryption", "support_encryption_ephemeral"] + vec![ + "support_encryption", + "support_encryption_ephemeral", + "support_oversized_transfer" + ] ); } @@ -1647,7 +1860,8 @@ mod tests { fn client_capability_tags_encryption_disabled() { let t = make_transport_for_tags(EncryptionMode::Disabled, GiftWrapMode::Optional); let tags = t.get_client_capability_tags(); - assert!(tags.is_empty()); + // No encryption tags; the default-on oversized tag remains. + assert_eq!(tag_names(&tags), vec!["support_oversized_transfer"]); } #[test] @@ -1655,13 +1869,28 @@ mod tests { let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Persistent); let tags = t.get_client_capability_tags(); let names = tag_names(&tags); - assert_eq!(names, vec!["support_encryption"]); + assert_eq!( + names, + vec!["support_encryption", "support_oversized_transfer"] + ); } #[test] - fn client_capability_tags_oversized_disabled_by_default() { + fn client_capability_tags_oversized_enabled_by_default() { let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); - assert!(!t.config.oversized_transfer.enabled); + assert!(t.config.oversized_transfer.enabled); + let names = tag_names(&t.get_client_capability_tags()); + assert!( + names.contains(&"support_oversized_transfer".to_string()), + "oversized tag must be advertised by default" + ); + } + + #[test] + fn client_capability_tags_oversized_opt_out() { + // The opt-out gate still works: disabling suppresses the tag. + let mut t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); + t.config.oversized_transfer = OversizedTransferConfig::default().with_enabled(false); let names = tag_names(&t.get_client_capability_tags()); assert!( !names.contains(&"support_oversized_transfer".to_string()), @@ -1699,6 +1928,161 @@ mod tests { assert_eq!(cfg.oversized_transfer.chunk_size, 1024); } + // ── CEP-22 original progressToken record/restore ───────────── + + #[test] + fn original_progress_token_roundtrip_preserves_numeric_type() { + let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); + // rmcp stamps numeric tokens; the record keys them by stringified form. + t.record_original_progress_token("7", &serde_json::json!(7)); + let restored = NostrClientTransport::remove_original_progress_token( + &t.original_progress_tokens, + Some("7"), + ); + assert_eq!(restored, Some(serde_json::json!(7))); + // Dropped on first take — the transfer concluded. + assert_eq!( + NostrClientTransport::remove_original_progress_token( + &t.original_progress_tokens, + Some("7"), + ), + None + ); + } + + #[test] + fn original_progress_token_string_never_parsed_to_number() { + // A legitimate String("5") token must restore as a string — restoring + // by parsing numeric-looking wire strings would corrupt it. + let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); + t.record_original_progress_token("5", &serde_json::json!("5")); + assert_eq!( + NostrClientTransport::remove_original_progress_token( + &t.original_progress_tokens, + Some("5"), + ), + Some(serde_json::json!("5")) + ); + } + + #[test] + fn remove_original_progress_token_handles_missing() { + let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); + assert_eq!( + NostrClientTransport::remove_original_progress_token(&t.original_progress_tokens, None,), + None + ); + assert_eq!( + NostrClientTransport::remove_original_progress_token( + &t.original_progress_tokens, + Some("unknown"), + ), + None + ); + } + + /// `send()` must record the original token value for every + /// oversized-eligible request — including sub-threshold ones, whose + /// *responses* may still come back fragmented. + #[tokio::test] + async fn send_records_numeric_progress_token_original() { + let mut t = make_transport_for_tags(EncryptionMode::Disabled, GiftWrapMode::Optional); + t.config.oversized_transfer.enabled = true; + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(1), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ "_meta": { "progressToken": 7 } })), + }); + t.send(&request).await.expect("send small request"); + + let recorded = NostrClientTransport::remove_original_progress_token( + &t.original_progress_tokens, + Some("7"), + ); + assert_eq!( + recorded, + Some(serde_json::json!(7)), + "numeric token must be recorded under its stringified form" + ); + } + + /// With oversized transfer disabled (explicit opt-out) nothing is recorded. + #[tokio::test] + async fn send_records_nothing_when_oversized_disabled() { + let mut t = make_transport_for_tags(EncryptionMode::Disabled, GiftWrapMode::Optional); + t.config.oversized_transfer = OversizedTransferConfig::default().with_enabled(false); + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(1), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ "_meta": { "progressToken": 7 } })), + }); + t.send(&request).await.expect("send small request"); + + assert_eq!( + NostrClientTransport::remove_original_progress_token( + &t.original_progress_tokens, + Some("7"), + ), + None + ); + } + + // ── CEP-22 stripped progress construction ──────────────────── + + #[test] + fn stripped_progress_notification_strips_cvm_and_restores_token() { + let params = serde_json::json!({ + "progressToken": "7", + "progress": 3, + "total": 5, + "message": "transferring", + "cvm": { "type": "oversized-transfer", "frameType": "chunk", "data": "x" }, + }); + let stripped = + NostrClientTransport::stripped_progress_notification(¶ms, &serde_json::json!(7)) + .expect("frame carries progress"); + let JsonRpcMessage::Notification(n) = stripped else { + panic!("expected a notification"); + }; + assert_eq!(n.method, NOTIFICATIONS_PROGRESS_METHOD); + let p = n.params.expect("params"); + assert_eq!( + p["progressToken"], + serde_json::json!(7), + "token must be the restored original, not the wire string" + ); + assert_eq!(p["progress"], serde_json::json!(3)); + assert_eq!(p["total"], serde_json::json!(5)); + assert_eq!(p["message"], serde_json::json!("transferring")); + assert!(p.get("cvm").is_none(), "cvm payload must be stripped"); + } + + #[test] + fn stripped_progress_notification_requires_progress_and_omits_absent_fields() { + // No `progress` → nothing worth forwarding. + let malformed = serde_json::json!({ "progressToken": "7", "cvm": {} }); + assert!(NostrClientTransport::stripped_progress_notification( + &malformed, + &serde_json::json!(7) + ) + .is_none()); + + // Absent total/message are omitted, not nulled. + let minimal = serde_json::json!({ "progressToken": "7", "progress": 1 }); + let stripped = + NostrClientTransport::stripped_progress_notification(&minimal, &serde_json::json!("7")) + .expect("progress present"); + let JsonRpcMessage::Notification(n) = stripped else { + panic!("expected a notification"); + }; + let p = n.params.expect("params"); + let keys = p.as_object().expect("object params"); + assert_eq!(keys.len(), 2, "only progressToken + progress: {p}"); + assert_eq!(p["progressToken"], serde_json::json!("7")); + } + #[test] fn client_discovery_tags_sent_once() { let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); diff --git a/src/transport/oversized_transfer/constants.rs b/src/transport/oversized_transfer/constants.rs index e541d01..66f6764 100644 --- a/src/transport/oversized_transfer/constants.rs +++ b/src/transport/oversized_transfer/constants.rs @@ -33,8 +33,9 @@ pub const DEFAULT_MAX_CONCURRENT_TRANSFERS: usize = 64; /// Default hard timeout for an in-flight transfer (milliseconds). /// -/// Not enforced by the pure engine (no timers yet); wired into the -/// transport watchdog once transport integration lands. +/// Measured from `start` admission, never refreshed by chunk activity, and +/// enforced by sweeping +/// [`OversizedTransferReceiver::remove_expired`](super::OversizedTransferReceiver::remove_expired). pub const DEFAULT_TRANSFER_TIMEOUT_MS: u64 = 5 * 60 * 1000; /// Default maximum forward gap between the next expected chunk and an diff --git a/src/transport/oversized_transfer/frame.rs b/src/transport/oversized_transfer/frame.rs index b5242e2..452143e 100644 --- a/src/transport/oversized_transfer/frame.rs +++ b/src/transport/oversized_transfer/frame.rs @@ -109,7 +109,10 @@ impl OversizedFrame { /// Wrap this frame in a `notifications/progress` [`JsonRpcNotification`]. /// /// Builds the outer `params` with `progressToken`, `progress`, an optional - /// human-readable `message` (non-normative UX), and the `cvm` frame. + /// human-readable `message` (non-normative UX), and the `cvm` frame. The + /// token is always emitted as a JSON **string**, even when the originating + /// request carried a numeric one (matching the TS SDK's + /// `String(progressToken)`); see [`progress_token_string`]. pub fn into_progress_notification( &self, progress_token: &str, @@ -135,6 +138,23 @@ impl OversizedFrame { } } +/// Coerce a `progressToken` JSON value to its canonical string form. +/// +/// MCP progress tokens may be JSON strings **or numbers** (rmcp stamps a +/// numeric token into every outgoing request), so token extraction must accept +/// both; all transport-internal keying (correlation routes, accept waiters, +/// reassembly state, frame addressing) uses the stringified form. The wire +/// format is unchanged — frames always carry a string token +/// ([`OversizedFrame::into_progress_notification`]), exactly like the TS SDK's +/// `String(progressToken)`. Returns `None` for any other JSON type. +pub fn progress_token_string(value: &Value) -> Option { + match value { + Value::String(s) => Some(s.clone()), + Value::Number(n) => Some(n.to_string()), + _ => None, + } +} + #[cfg(test)] mod tests { use super::*; @@ -244,4 +264,24 @@ mod tests { let params = notification.params.as_ref().unwrap(); assert!(!params.as_object().unwrap().contains_key("message")); } + + #[test] + fn progress_token_string_accepts_string_and_number() { + assert_eq!( + progress_token_string(&json!("tok-1")), + Some("tok-1".to_string()) + ); + assert_eq!(progress_token_string(&json!(7)), Some("7".to_string())); + assert_eq!(progress_token_string(&json!(0)), Some("0".to_string())); + assert_eq!(progress_token_string(&json!(-3)), Some("-3".to_string())); + assert_eq!(progress_token_string(&json!(7.5)), Some("7.5".to_string())); + } + + #[test] + fn progress_token_string_rejects_other_types() { + assert_eq!(progress_token_string(&json!(null)), None); + assert_eq!(progress_token_string(&json!(true)), None); + assert_eq!(progress_token_string(&json!({ "t": 1 })), None); + assert_eq!(progress_token_string(&json!([1])), None); + } } diff --git a/src/transport/oversized_transfer/mod.rs b/src/transport/oversized_transfer/mod.rs index f517725..828ab12 100644 --- a/src/transport/oversized_transfer/mod.rs +++ b/src/transport/oversized_transfer/mod.rs @@ -8,10 +8,11 @@ //! `sdk/src/transport/oversized-transfer/`. //! //! This module is the **pure engine**: building frames ([`codec`]) and -//! reassembling them ([`receiver`]). It carries no transport, I/O, or timers — -//! those are wired in by the client and server transports once transport -//! integration lands. Until then the module is intentionally unused by the -//! rest of the crate. +//! reassembling them ([`receiver`]). It carries no transport, I/O, or live +//! timers — the client and server transports drive it. The hard per-transfer +//! watchdog (`transfer_timeout_ms`) is tracked from `start` admission and +//! reaped via [`OversizedTransferReceiver::remove_expired`] when the owning +//! transport sweeps. //! //! ``` //! use contextvm_sdk::transport::oversized_transfer::{ @@ -56,26 +57,31 @@ pub use codec::{ }; pub use constants::*; pub use errors::OversizedTransferError; -pub use frame::{CompletionMode, OversizedFrame}; +pub use frame::{progress_token_string, CompletionMode, OversizedFrame}; pub use receiver::{OversizedTransferReceiver, TransferPolicy}; pub use sender::send_oversized_transfer; pub use sizing::{measure_published_event_size, resolve_safe_chunk_size}; /// CEP-22 oversized-transfer configuration shared by both transports. /// -/// Bundles the capability gate plus the sender/receiver tuning knobs (D6) so the +/// Bundles the capability gate plus the sender/receiver tuning knobs so the /// nine numeric defaults don't clutter the flat transport configs. Attached to /// [`NostrServerTransportConfig`](crate::transport::NostrServerTransportConfig) /// and [`NostrClientTransportConfig`](crate::transport::NostrClientTransportConfig) /// via their `with_oversized_transfer` / `with_oversized_enabled` builders. /// -/// **Disabled by default** — until a peer opts in, no `support_oversized_transfer` -/// capability is advertised and the server never learns or activates the feature. +/// **Enabled by default** (TS parity) — opt out with +/// [`with_enabled(false)`](Self::with_enabled) or the transports' +/// `with_oversized_enabled(false)` builders. The negotiation gates make the +/// default safe for non-oversized peers: the server activates only for +/// clients that advertise support, and the client fragments only requests +/// carrying a `progressToken` — a disabled peer just sees one extra +/// `support_oversized_transfer` tag. #[derive(Debug, Clone)] #[non_exhaustive] pub struct OversizedTransferConfig { - /// Master gate. When `false` (default) the capability is neither advertised - /// nor activated, and the server does not learn a client's flag. + /// Master gate, `true` by default. When `false` the capability is neither + /// advertised nor activated, and the server does not learn a client's flag. pub enabled: bool, /// Serialized byte length at or above which the sender switches to oversized transfer. pub threshold: usize, @@ -87,7 +93,8 @@ pub struct OversizedTransferConfig { pub max_transfer_chunks: u64, /// Upper bound on concurrently active receiver-side transfers. pub max_concurrent_transfers: usize, - /// Hard timeout for an in-flight transfer (milliseconds). + /// Hard timeout for an in-flight transfer (milliseconds), measured from + /// admission. `0` disables the receiver-side watchdog. pub transfer_timeout_ms: u64, /// Maximum forward gap between the next expected chunk and an out-of-order /// chunk that will still be buffered. @@ -95,13 +102,19 @@ pub struct OversizedTransferConfig { /// Maximum number of buffered out-of-order chunks. pub max_out_of_order_chunks: usize, /// Timeout a sender waits for an `accept` frame before giving up (milliseconds). + /// + /// Used by the **client** transport only. The client is the sole party that + /// sends a `start` frame with an accept handshake and then waits for the + /// `accept`. On the **server** transport this field is inert: a server is + /// always the *receiver* of the handshake — it emits the `accept`, it never + /// waits for one — so its value is never read server-side. pub accept_timeout_ms: u64, } impl Default for OversizedTransferConfig { fn default() -> Self { Self { - enabled: false, + enabled: true, threshold: DEFAULT_OVERSIZED_THRESHOLD, chunk_size: DEFAULT_CHUNK_SIZE, max_transfer_bytes: DEFAULT_MAX_TRANSFER_BYTES, @@ -117,6 +130,9 @@ impl Default for OversizedTransferConfig { impl OversizedTransferConfig { /// An explicitly enabled config with all other knobs at their defaults. + /// + /// Redundant since `enabled` defaulted to `true` (kept for API + /// stability); equivalent to [`OversizedTransferConfig::default`]. pub fn enabled() -> Self { Self { enabled: true, @@ -187,7 +203,7 @@ impl OversizedTransferConfig { impl From<&OversizedTransferConfig> for TransferPolicy { /// Project the receiver-relevant knobs of an [`OversizedTransferConfig`] into - /// a [`TransferPolicy`] (D6 → receiver admission policy). + /// a [`TransferPolicy`] (the receiver admission policy). fn from(config: &OversizedTransferConfig) -> Self { TransferPolicy { max_transfer_bytes: config.max_transfer_bytes, diff --git a/src/transport/oversized_transfer/receiver.rs b/src/transport/oversized_transfer/receiver.rs index bc4b44f..8f8811f 100644 --- a/src/transport/oversized_transfer/receiver.rs +++ b/src/transport/oversized_transfer/receiver.rs @@ -5,11 +5,14 @@ //! and returns the reassembled [`JsonRpcMessage`] once a transfer completes and //! passes byte-length, SHA-256, and JSON-RPC validation. //! -//! This engine is **pure and synchronous**: it owns no timers. The hard -//! per-transfer watchdog (`transfer_timeout_ms`) and the sender-side -//! accept-waiter are added when the engine is wired into the transport. +//! This engine is **pure and synchronous**: it owns no live timers. The hard +//! per-transfer watchdog deadline (`transfer_timeout_ms`) is measured from +//! `start` admission and enforced by the owning transport calling +//! [`OversizedTransferReceiver::remove_expired`] on its sweep tick; the +//! sender-side accept-waiter lives in the transport. use std::collections::{BTreeMap, HashMap}; +use std::time::{Duration, Instant}; use serde_json::Value; @@ -23,7 +26,7 @@ use super::constants::{ DEFAULT_TRANSFER_TIMEOUT_MS, DIGEST_PREFIX, }; use super::errors::OversizedTransferError; -use super::frame::OversizedFrame; +use super::frame::{progress_token_string, OversizedFrame}; /// Receiver-side admission and out-of-order policy. /// @@ -41,8 +44,10 @@ pub struct TransferPolicy { pub max_out_of_order_window: u64, /// Maximum number of buffered out-of-order chunks. pub max_out_of_order_chunks: usize, - /// Hard per-transfer timeout (milliseconds). Reserved for the transport - /// watchdog; the pure engine does not enforce it. + /// Hard per-transfer timeout (milliseconds), measured from `start` + /// admission and never refreshed by chunk activity (TS parity). + /// Enforced by sweeping [`OversizedTransferReceiver::remove_expired`]; + /// `0` disables the watchdog. pub transfer_timeout_ms: u64, } @@ -72,6 +77,9 @@ struct ActiveTransfer { highest_observed_progress: u64, /// Chunk fragments keyed by the outer `progress` value (canonical index). chunks: BTreeMap, + /// When the `start` frame was admitted. The watchdog deadline is measured + /// from here and never refreshed (a hard cap, not an idle timer). + admitted_at: Instant, } impl ActiveTransfer { @@ -158,6 +166,10 @@ pub struct OversizedTransferReceiver { max_concurrent_transfers: usize, max_out_of_order_window: u64, max_out_of_order_chunks: usize, + /// Hard per-transfer deadline in milliseconds, from `start` admission. + /// `0` disables the watchdog: [`Self::remove_expired`] skips the sweep + /// entirely rather than expiring transfers instantly. + transfer_timeout_ms: u64, transfers: HashMap, } @@ -181,6 +193,7 @@ impl OversizedTransferReceiver { max_concurrent_transfers: policy.max_concurrent_transfers, max_out_of_order_window: policy.max_out_of_order_window, max_out_of_order_chunks: policy.max_out_of_order_chunks, + transfer_timeout_ms: policy.transfer_timeout_ms, transfers: HashMap::new(), } } @@ -190,6 +203,43 @@ impl OversizedTransferReceiver { self.transfers.len() } + /// Whether a transfer keyed by `token` is currently in flight. + /// + /// Lets the transport gate progress forwarding on tracked transfers, so a + /// late or orphan frame (e.g. after watchdog reaping) cannot keep a dead + /// request's timer alive. + pub fn is_tracking(&self, token: &str) -> bool { + self.transfers.contains_key(token) + } + + /// Reap transfers whose age since `start` admission exceeds the policy's + /// `transfer_timeout_ms`, returning the reaped tokens (for logging). + /// + /// The deadline is hard — never refreshed by chunk activity (TS parity): + /// liveness is the requester's idle timer; this sweep is the receiver's + /// memory bound. A `transfer_timeout_ms` of `0` disables the watchdog (no + /// sweep). Reaping is local-only — no abort frame is emitted; the peer's + /// own timeout covers the other side. The token slot is freed: a later + /// `start` re-using a reaped token is admitted as a fresh transfer, while + /// its late chunk/end frames are orphan-ignored. + pub fn remove_expired(&mut self) -> Vec { + if self.transfer_timeout_ms == 0 { + return Vec::new(); + } + let deadline = Duration::from_millis(self.transfer_timeout_ms); + let now = Instant::now(); + let mut reaped = Vec::new(); + self.transfers.retain(|token, transfer| { + if now.duration_since(transfer.admitted_at) > deadline { + reaped.push(token.clone()); + false + } else { + true + } + }); + reaped + } + /// Release all in-flight transfer state. pub fn clear(&mut self) { self.transfers.clear(); @@ -299,6 +349,7 @@ impl OversizedTransferReceiver { next_expected_chunk_progress: None, highest_observed_progress: progress, chunks: BTreeMap::new(), + admitted_at: Instant::now(), }, ); Ok(None) @@ -480,11 +531,7 @@ impl OversizedTransferReceiver { /// Coerce a `progressToken` value to a string (mirrors TS `String(token ?? '')`). fn token_to_string(value: Option<&Value>) -> String { - match value { - Some(Value::String(s)) => s.clone(), - Some(Value::Number(n)) => n.to_string(), - _ => String::new(), - } + value.and_then(progress_token_string).unwrap_or_default() } /// A non-empty token is required on every frame. @@ -631,7 +678,7 @@ mod tests { #[test] fn roundtrip_multibyte_payload() { - // Small chunks force boundaries inside multibyte runs (CEP-22 D4). + // Small chunks force boundaries inside multibyte UTF-8 runs. let message = sample_response(7, "héllo 🦀 wörld 日本語 ☃ même 🚀🚀"); let frames = build(&message, 5); @@ -774,6 +821,55 @@ mod tests { assert_eq!(receiver.active_transfer_count(), 0); } + /// An `accept` on the wrong progress slot is rejected by the receiver, so a + /// transport that wakes the sender's accept-waiter only on a successful + /// `process_frame` leaves the oneshot unfired — the sender stays blocked and + /// falls back to its accept-timeout → abort (see [`super`]'s sibling sender + /// test `missed_accept_returns_abort`). + /// + /// Scope: this pins the engine-level accept validation (`handle_accept` + /// requires `progress > start_progress`). The live *client* upload path wakes + /// its waiter on `cvm.frameType == "accept"` + a matching token alone (the + /// accept's slot is only meaningful to a reassembling receiver), so this models + /// the receiver/reassembly contract, not that path. + #[test] + fn invalid_accept_leaves_oneshot_unfired() { + use tokio::sync::oneshot; + + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, START_PROGRESS, "sha256:abcd", 4, 1)) + .unwrap(); + + // The sender registers a oneshot before publishing `start`; a transport + // wakes it only once the receiver has accepted the accept frame. + let (accept_tx, mut accept_rx) = oneshot::channel::<()>(); + + // An accept on the start slot is invalid (it must advance past start). + let invalid_accept = OversizedFrame::Accept + .into_progress_notification(TOKEN, START_PROGRESS, None) + .unwrap(); + + // Fire-on-Ok: wake the waiter only if the receiver accepts the frame. + match receiver.process_frame(&invalid_accept) { + Ok(_) => { + let _ = accept_tx.send(()); + } + Err(error) => assert!(matches!(error, OversizedTransferError::Sequence(_))), + } + + // The accept was rejected, so the oneshot was never fired: still `Empty` + // (the sender — held alive here — has not been signalled). + assert!( + matches!( + accept_rx.try_recv(), + Err(oneshot::error::TryRecvError::Empty) + ), + "an invalid accept must not fire the sender's accept-waiter" + ); + assert_eq!(receiver.active_transfer_count(), 0); + } + // ── integrity ─────────────────────────────────────────────────── #[test] @@ -1080,4 +1176,121 @@ mod tests { serde_json::to_value(&message).unwrap() ); } + + // ── watchdog: admitted_at / remove_expired / is_tracking ──────────── + + fn watchdog_policy(transfer_timeout_ms: u64) -> TransferPolicy { + TransferPolicy { + transfer_timeout_ms, + ..TransferPolicy::default() + } + } + + /// Backdate a tracked transfer's admission so deadline checks are + /// deterministic (no sleeping in unit tests). + fn backdate_admission(receiver: &mut OversizedTransferReceiver, token: &str, ms: u64) { + receiver + .transfers + .get_mut(token) + .expect("transfer must be tracked") + .admitted_at -= Duration::from_millis(ms); + } + + #[test] + fn remove_expired_reaps_past_deadline_and_orphans_late_frames() { + let mut receiver = OversizedTransferReceiver::with_policy(watchdog_policy(50)); + let mut frames = build(&sample_response(1, "payload"), 4).into_ordered(); + let rest = frames.split_off(1); + receiver.process_frame(&frames[0]).unwrap(); + assert!(receiver.is_tracking(TOKEN)); + + backdate_admission(&mut receiver, TOKEN, 100); + let reaped = receiver.remove_expired(); + assert_eq!(reaped, vec![TOKEN.to_string()]); + assert!(!receiver.is_tracking(TOKEN)); + assert_eq!(receiver.active_transfer_count(), 0); + + // Late chunk/end frames of the reaped transfer are orphan-ignored: + // no error, and nothing is ever surfaced. + for frame in rest { + assert!(receiver.process_frame(&frame).unwrap().is_none()); + } + } + + #[test] + fn remove_expired_skips_unexpired_transfers() { + let mut receiver = OversizedTransferReceiver::with_policy(watchdog_policy(50)); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 4, 1)) + .unwrap(); + receiver + .process_frame(&start_frame("tok-stale", 1, "sha256:abcd", 4, 1)) + .unwrap(); + + backdate_admission(&mut receiver, "tok-stale", 100); + let reaped = receiver.remove_expired(); + assert_eq!(reaped, vec!["tok-stale".to_string()]); + assert!(receiver.is_tracking(TOKEN), "fresh transfer must survive"); + assert!(!receiver.is_tracking("tok-stale")); + } + + #[test] + fn remove_expired_zero_timeout_disables_watchdog() { + // 0 means "no watchdog" (sweep skipped), NOT instant expiry. + let mut receiver = OversizedTransferReceiver::with_policy(watchdog_policy(0)); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 4, 1)) + .unwrap(); + backdate_admission(&mut receiver, TOKEN, 200); + + assert!(receiver.remove_expired().is_empty()); + assert!(receiver.is_tracking(TOKEN)); + } + + #[test] + fn reaped_token_is_readmittable_as_fresh_transfer() { + // remove_expired frees the slot; a later `start` re-using the token is + // a fresh transfer (duplicate-start would error if state lingered), and + // it runs to completion. + let mut receiver = OversizedTransferReceiver::with_policy(watchdog_policy(50)); + let message = sample_response(7, "again"); + let mut first = build(&message, 4).into_ordered(); + first.truncate(2); // start + one chunk, then the sender stalls + for frame in &first { + receiver.process_frame(frame).unwrap(); + } + + backdate_admission(&mut receiver, TOKEN, 100); + assert_eq!(receiver.remove_expired(), vec![TOKEN.to_string()]); + + let reconstructed = run_to_completion(&mut receiver, build(&message, 4)); + assert_eq!( + serde_json::to_value(&reconstructed).unwrap(), + serde_json::to_value(&message).unwrap() + ); + assert!( + !receiver.is_tracking(TOKEN), + "completed transfer is released" + ); + } + + #[test] + fn is_tracking_reflects_transfer_lifecycle() { + let mut receiver = OversizedTransferReceiver::new(); + assert!(!receiver.is_tracking(TOKEN)); + + let frames = build(&sample_response(1, "lifecycle"), 4); + let ordered = frames.into_ordered(); + let (end, mid) = ordered.split_last().expect("frames are never empty"); + + for frame in mid { + receiver.process_frame(frame).unwrap(); + assert!(receiver.is_tracking(TOKEN), "tracked while in flight"); + } + receiver + .process_frame(end) + .unwrap() + .expect("end frame completes the transfer"); + assert!(!receiver.is_tracking(TOKEN), "released after completion"); + } } diff --git a/src/transport/oversized_transfer/sender.rs b/src/transport/oversized_transfer/sender.rs index e3dbb36..a649065 100644 --- a/src/transport/oversized_transfer/sender.rs +++ b/src/transport/oversized_transfer/sender.rs @@ -9,8 +9,8 @@ //! `prepare_mcp_message` + `publish_event` (registering a pending entry between //! the two), the server via `base.send_mcp_message`. Those signatures differ, so //! the publish step is injected as a closure and this module stays transport-free -//! (D11 / OD-5: there is no active `abort` frame in v1 — on a missed `accept` the -//! sender fails and cleans up locally, letting the peer's own timeout fire). +//! (there is no active `abort` frame in v1 — on a missed `accept` the sender +//! fails and cleans up locally, letting the peer's own timeout fire). use std::future::Future; use std::time::Duration; diff --git a/src/transport/server/mod.rs b/src/transport/server/mod.rs index 251b79c..7455869 100644 --- a/src/transport/server/mod.rs +++ b/src/transport/server/mod.rs @@ -30,8 +30,9 @@ use crate::relay::{RelayPool, RelayPoolTrait}; use crate::transport::base::BaseTransport; use crate::transport::discovery_tags::learn_peer_capabilities; use crate::transport::oversized_transfer::{ - build_oversized_frames, resolve_safe_chunk_size, OversizedFrame, OversizedSenderOptions, - OversizedTransferConfig, OversizedTransferReceiver, TransferPolicy, ACCEPT_PROGRESS, + build_oversized_frames, progress_token_string, resolve_safe_chunk_size, OversizedFrame, + OversizedSenderOptions, OversizedTransferConfig, OversizedTransferReceiver, TransferPolicy, + ACCEPT_PROGRESS, }; const LOG_TARGET: &str = "contextvm_sdk::transport::server"; @@ -103,7 +104,7 @@ pub struct NostrServerTransportConfig { pub publish_relay_list: bool, /// Optional NIP-01 profile metadata (kind 0) to publish at startup. pub profile_metadata: Option, - /// CEP-22 oversized payload transfer configuration. Disabled by default. + /// CEP-22 oversized payload transfer configuration. Enabled by default. pub oversized_transfer: OversizedTransferConfig, } @@ -1076,6 +1077,35 @@ impl NostrServerTransport { }) } + /// CEP-22: one watchdog sweep over the per-peer reassembly engines. Reaps + /// transfers past their hard deadline — local-only (no abort frame is + /// emitted): the requester's own timeout fails the call, and late frames + /// are orphan-ignored — then drops now-empty receivers so long-gone peers + /// stop pinning LRU slots (admission recreates them on demand). + async fn sweep_oversized_receivers( + oversized_receiver: &Arc>>, + ) { + let mut receivers = oversized_receiver.write().await; + let mut empty_peers: Vec = Vec::new(); + for (peer, receiver) in receivers.iter_mut() { + for token in receiver.remove_expired() { + tracing::warn!( + target: LOG_TARGET, + client_pubkey = %peer, + token = %token, + "Oversized transfer reaped by watchdog" + ); + } + if receiver.active_transfer_count() == 0 { + empty_peers.push(peer.clone()); + } + } + // Keys collected first, popped after: never mutate mid-iteration. + for peer in empty_peers { + receivers.pop(&peer); + } + } + #[allow(clippy::too_many_arguments)] async fn event_loop( relay_pool: Arc, @@ -1097,6 +1127,15 @@ impl NostrServerTransport { ) { let mut notifications = relay_pool.notifications(); + // CEP-22: receiver-side watchdog sweep. Same clamp formula as the + // client's correlation sweep; the arm is disabled entirely when the + // feature is off or the deadline is 0 (no watchdog). + let watchdog_enabled = oversized_enabled && transfer_policy.transfer_timeout_ms != 0; + let sweep_interval = (Duration::from_millis(transfer_policy.transfer_timeout_ms) / 2) + .clamp(Duration::from_secs(1), Duration::from_secs(30)); + let mut sweep_timer = + tokio::time::interval_at(tokio::time::Instant::now() + sweep_interval, sweep_interval); + loop { let notification = tokio::select! { _ = cancel.cancelled() => { @@ -1106,6 +1145,10 @@ impl NostrServerTransport { ); break; } + _ = sweep_timer.tick(), if watchdog_enabled => { + Self::sweep_oversized_receivers(&oversized_receiver).await; + continue; + } result = notifications.recv() => { match result { Ok(n) => n, @@ -1372,7 +1415,7 @@ impl NostrServerTransport { let discovered = learn_peer_capabilities(&inner_tags); session.supports_encryption |= discovered.supports_encryption; session.supports_ephemeral_encryption |= discovered.supports_ephemeral_encryption; - // CEP-22 (OD-4): snapshot the flag BEFORE the learning gate mutates + // CEP-22: snapshot the flag BEFORE the learning gate mutates // it — the very `start` frame carries the client's support tag, so // without this snapshot the first transfer would never get an `accept`. let client_already_supported = session.supports_oversized_transfer; @@ -1382,7 +1425,7 @@ impl NostrServerTransport { // CEP-22: intercept oversized-transfer frames before request // correlation/dispatch. A disabled server forwards raw progress - // notifications as before (OD-6). + // notifications as before. if oversized_enabled { if let JsonRpcMessage::Notification(ref n) = mcp_msg { if OversizedTransferReceiver::is_oversized_frame(n) { @@ -1414,14 +1457,18 @@ impl NostrServerTransport { if let JsonRpcMessage::Request(ref req) = mcp_msg { let original_id = req.id.clone(); - // Extract progress token from _meta if present. + // Extract progress token from _meta if present. String or + // number (rmcp issues numbers): without numeric acceptance + // the response eligibility gate in `send_response` never + // opens for rmcp clients. Normalized to its stringified form + // for routing and frame addressing (the wire keeps emitting + // string tokens). let progress_token = req .params .as_ref() .and_then(|p| p.get("_meta")) .and_then(|m| m.get("progressToken")) - .and_then(|t| t.as_str()) - .map(String::from); + .and_then(progress_token_string); // Duplicate into session fields (kept for backward compat). session @@ -1483,7 +1530,7 @@ impl NostrServerTransport { /// CEP-22 server inbound: process one oversized-transfer frame. /// /// Emits an `accept` on the opening frame when the client's support is not yet - /// known (OD-4), feeds the frame to this peer's reassembler, and — on the + /// known, feeds the frame to this peer's reassembler, and — on the /// `end` frame — registers a response route and dispatches the reassembled /// request as a synthetic [`IncomingRequest`] (keyed by the end frame's real /// carrying event id, collision-free against the reserved sentinels). @@ -1506,12 +1553,13 @@ impl NostrServerTransport { tx: &tokio::sync::mpsc::UnboundedSender, ) { // The outer progressToken keys the transfer (needed for accept + route). + // String or number — defensive only: every known sender stringifies + // tokens into frames. let token = frame .params .as_ref() .and_then(|p| p.get("progressToken")) - .and_then(|t| t.as_str()) - .map(String::from); + .and_then(progress_token_string); // 1. Emit `accept` on the opening frame if support is not yet known. let is_start = frame @@ -1598,7 +1646,7 @@ impl NostrServerTransport { is_encrypted, }); } - // D11: clean up locally, let the peer's own timeout fire. + // Clean up locally, let the peer's own timeout fire. Err(error) => { tracing::warn!( target: LOG_TARGET, @@ -1630,18 +1678,21 @@ impl NostrServerTransport { Err(_) => return, }; let event_id_parsed = EventId::from_hex(start_event_id).unwrap_or(EventId::all_zeros()); - let accept = - match OversizedFrame::Accept.into_progress_notification(token, ACCEPT_PROGRESS, None) { - Ok(n) => JsonRpcMessage::Notification(n), - Err(error) => { - tracing::error!( - target: LOG_TARGET, - error = %error, - "Failed to build oversized-transfer accept frame" - ); - return; - } - }; + let accept = match OversizedFrame::Accept.into_progress_notification( + token, + ACCEPT_PROGRESS, + Some("oversized request accepted"), + ) { + Ok(n) => JsonRpcMessage::Notification(n), + Err(error) => { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Failed to build oversized-transfer accept frame" + ); + return; + } + }; let tags = BaseTransport::create_response_tags(&client_pk, &event_id_parsed); let base = BaseTransport { relay_pool: Arc::clone(relay_pool), @@ -2278,14 +2329,15 @@ mod tests { } #[test] - fn test_oversized_disabled_by_default() { + fn test_oversized_enabled_by_default() { let config = NostrServerTransportConfig::default(); - assert!(!config.oversized_transfer.enabled); + assert!(config.oversized_transfer.enabled); } #[test] fn test_oversized_support_tags_helper() { - let mut config = NostrServerTransportConfig::default(); + // Start from an explicit opt-out: the default is now enabled. + let mut config = NostrServerTransportConfig::default().with_oversized_enabled(false); assert!(oversized_support_tags(&config).is_empty()); config.oversized_transfer.enabled = true; let names = first_tag_values(&oversized_support_tags(&config)); diff --git a/tests/conformance_cep22_wire_format.rs b/tests/conformance_cep22_wire_format.rs new file mode 100644 index 0000000..5fa8c1c --- /dev/null +++ b/tests/conformance_cep22_wire_format.rs @@ -0,0 +1,323 @@ +//! Conformance tests: CEP-22 oversized-transfer wire format. +//! +//! A CEP-22 frame is an MCP `notifications/progress` JSON-RPC notification whose +//! `params.cvm` object carries the transfer envelope. These tests pin the exact +//! on-wire JSON — method, progress-slot layout, camelCase field names, the +//! `oversized-transfer` type tag, and the `sha256:` digest format — so the +//! serialization cannot drift from the spec and stays byte-compatible with the +//! TypeScript SDK, which emits the identical shape. +//! +//! Frames are serialized through `JsonRpcMessage` (the type both transports +//! publish, `#[serde(untagged)]`), so the asserted JSON is exactly what lands in +//! a kind-25910 event's `content`. No transport or I/O — pure serialization. + +use serde_json::Value; + +use contextvm_sdk::transport::oversized_transfer::{ + build_oversized_frames, sha256_digest, BuiltOversizedFrames, OversizedFrame, + OversizedSenderOptions, ACCEPT_PROGRESS, START_PROGRESS, +}; +use contextvm_sdk::{JsonRpcMessage, JsonRpcNotification}; + +const TOKEN: &str = "tok-1"; + +// ── helpers ─────────────────────────────────────────────────────────────────── + +/// Serialize a frame exactly as a transport publishes it. `JsonRpcMessage` is +/// `#[serde(untagged)]`, so this is the literal kind-25910 `content`. +fn wire(notif: &JsonRpcNotification) -> Value { + serde_json::to_value(JsonRpcMessage::Notification(notif.clone())) + .expect("frame must serialize to JSON") +} + +/// The `params.cvm` object of a frame's wire form. +fn cvm(notif: &JsonRpcNotification) -> Value { + wire(notif)["params"]["cvm"].clone() +} + +/// The `params.progress` slot of a frame's wire form. +fn progress(notif: &JsonRpcNotification) -> Option { + wire(notif)["params"]["progress"].as_u64() +} + +/// The canonical 3-chunk transfer of `"hello world"` (11 bytes, chunk size 4). +fn three_chunk_transfer(handshake: bool) -> BuiltOversizedFrames { + let opts = OversizedSenderOptions::new(TOKEN) + .with_chunk_size(4) + .with_accept_handshake(handshake); + build_oversized_frames("hello world", &opts).expect("build frames") +} + +/// A standalone `accept` frame on its reserved slot (the server emits this; the +/// codec's `build_oversized_frames` lays out start/chunk/end but never accept). +fn accept_frame() -> JsonRpcNotification { + OversizedFrame::Accept + .into_progress_notification(TOKEN, ACCEPT_PROGRESS, None) + .expect("build accept frame") +} + +/// A standalone `abort` frame. +fn abort_frame() -> JsonRpcNotification { + OversizedFrame::Abort { + reason: Some("peer cancelled".to_string()), + } + .into_progress_notification(TOKEN, 9, None) + .expect("build abort frame") +} + +// ── frame envelope ────────────────────────────────────────────────────────── + +#[test] +fn frame_is_a_notifications_progress_notification() { + let frames = three_chunk_transfer(false); + let w = wire(&frames.start); + + assert_eq!( + w["jsonrpc"], + Value::String("2.0".to_string()), + "frame must be JSON-RPC 2.0" + ); + assert_eq!( + w["method"], + Value::String("notifications/progress".to_string()), + "frame method must be notifications/progress" + ); + + let params = &w["params"]; + assert!(params.is_object(), "params must be an object"); + assert_eq!( + params["progressToken"], + Value::String(TOKEN.to_string()), + "params must carry the progressToken" + ); + assert!( + params["progress"].is_number(), + "params.progress must be a number" + ); + assert!(params["cvm"].is_object(), "params.cvm must be an object"); +} + +// ── cvm.type ────────────────────────────────────────────────────────────────── + +#[test] +fn every_frame_carries_cvm_type_oversized_transfer() { + let frames = three_chunk_transfer(false); + let accept = accept_frame(); + let abort = abort_frame(); + + for notif in [ + &frames.start, + &frames.chunks[0], + &frames.end, + &accept, + &abort, + ] { + assert_eq!( + cvm(notif)["type"], + Value::String("oversized-transfer".to_string()), + "every cvm object must carry type=oversized-transfer" + ); + } +} + +// ── cvm.frameType ───────────────────────────────────────────────────────────── + +#[test] +fn cvm_frame_type_discriminates_each_variant_in_lowercase() { + let frames = three_chunk_transfer(false); + + assert_eq!( + cvm(&frames.start)["frameType"], + Value::String("start".into()) + ); + assert_eq!( + cvm(&accept_frame())["frameType"], + Value::String("accept".into()) + ); + assert_eq!( + cvm(&frames.chunks[0])["frameType"], + Value::String("chunk".into()) + ); + assert_eq!(cvm(&frames.end)["frameType"], Value::String("end".into())); + assert_eq!( + cvm(&abort_frame())["frameType"], + Value::String("abort".into()) + ); + + // The discriminator key is camelCase `frameType`, never snake_case. + let start_cvm = cvm(&frames.start); + let obj = start_cvm.as_object().unwrap(); + assert!( + obj.contains_key("frameType"), + "must use camelCase frameType" + ); + assert!( + !obj.contains_key("frame_type"), + "snake_case frame_type must not appear on the wire" + ); +} + +// ── start frame fields ──────────────────────────────────────────────────────── + +#[test] +fn start_frame_fields_are_camelcase_and_correctly_typed() { + // "hello world" is 11 bytes; chunk size 4 → 3 chunks. + let c = cvm(&three_chunk_transfer(false).start); + + assert_eq!( + c["completionMode"], + Value::String("render".to_string()), + "v1 completion mode must be render" + ); + assert_eq!( + c["totalBytes"].as_u64(), + Some(11), + "totalBytes must equal the payload's UTF-8 byte length" + ); + assert_eq!( + c["totalChunks"].as_u64(), + Some(3), + "totalChunks must equal the chunk count" + ); + assert!( + c["digest"].as_str().unwrap().starts_with("sha256:"), + "digest must carry the sha256: prefix" + ); + + // Field names are camelCase, never snake_case. + let obj = c.as_object().unwrap(); + for camel in ["completionMode", "digest", "totalBytes", "totalChunks"] { + assert!( + obj.contains_key(camel), + "start cvm missing camelCase key {camel}" + ); + } + for snake in ["completion_mode", "total_bytes", "total_chunks"] { + assert!( + !obj.contains_key(snake), + "snake_case key {snake} must not appear on the wire" + ); + } +} + +// ── chunk frame fields ──────────────────────────────────────────────────────── + +#[test] +fn chunk_frames_carry_data_that_reconstructs_the_payload() { + let frames = three_chunk_transfer(false); + + let mut reassembled = String::new(); + for chunk in &frames.chunks { + let data = cvm(chunk)["data"] + .as_str() + .expect("chunk cvm.data must be a string") + .to_string(); + reassembled.push_str(&data); + } + assert_eq!( + reassembled, "hello world", + "concatenated chunk data must reproduce the exact payload" + ); +} + +// ── progress slots ──────────────────────────────────────────────────────────── + +#[test] +fn canonical_progress_slots_match_spec() { + assert_eq!(START_PROGRESS, 1, "start frame occupies progress slot 1"); + assert_eq!(ACCEPT_PROGRESS, 2, "accept frame occupies progress slot 2"); +} + +#[test] +fn progress_slots_without_handshake() { + // start=1; with no accept, chunks reuse the reserved slot 2; end = start + N + 1. + let frames = three_chunk_transfer(false); + assert_eq!(frames.chunks.len(), 3); + + assert_eq!(progress(&frames.start), Some(1)); + assert_eq!(progress(&frames.chunks[0]), Some(2)); + assert_eq!(progress(&frames.chunks[1]), Some(3)); + assert_eq!(progress(&frames.chunks[2]), Some(4)); + assert_eq!( + progress(&frames.end), + Some(5), + "end = start(1) + chunks(3) + 1" + ); +} + +#[test] +fn progress_slots_with_handshake() { + // start=1; accept reserved at slot 2; first chunk shifts to 3; end follows. + let frames = three_chunk_transfer(true); + assert_eq!(frames.chunks.len(), 3); + + assert_eq!(progress(&frames.start), Some(1)); + assert_eq!(progress(&accept_frame()), Some(2)); + assert_eq!(progress(&frames.chunks[0]), Some(3)); + assert_eq!(progress(&frames.chunks[1]), Some(4)); + assert_eq!(progress(&frames.chunks[2]), Some(5)); + assert_eq!(progress(&frames.end), Some(6), "end follows the last chunk"); +} + +// ── digest format ───────────────────────────────────────────────────────────── + +#[test] +fn digest_is_sha256_prefix_plus_lowercase_hex() { + let digest = cvm(&three_chunk_transfer(false).start)["digest"] + .as_str() + .unwrap() + .to_string(); + + assert!(digest.starts_with("sha256:"), "must start with sha256:"); + let hex = &digest["sha256:".len()..]; + assert_eq!(hex.len(), 64, "SHA-256 hex must be 64 characters"); + assert!( + hex.chars() + .all(|ch| ch.is_ascii_hexdigit() && !ch.is_ascii_uppercase()), + "digest hex must be lowercase" + ); +} + +#[test] +fn digest_is_over_the_full_payload_not_per_chunk() { + // Known-answer vector: SHA-256("abc"). Even fragmented into 3 single-byte + // chunks, the start digest is the hash of the WHOLE payload. + let abc = build_oversized_frames( + "abc", + &OversizedSenderOptions::new(TOKEN).with_chunk_size(1), + ) + .expect("build frames"); + assert_eq!( + abc.chunks.len(), + 3, + "chunk size 1 must yield 3 chunks for abc" + ); + assert_eq!( + cvm(&abc.start)["digest"].as_str(), + Some("sha256:ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"), + "digest must be SHA-256 of the full payload \"abc\", independent of chunking" + ); + + // For an arbitrary multi-chunk payload, the start digest equals the hash of + // the payload reconstructed from the chunk data (i.e. the full message). + let payload = "the quick brown fox jumps"; + let frames = build_oversized_frames( + payload, + &OversizedSenderOptions::new(TOKEN).with_chunk_size(5), + ) + .expect("build frames"); + let reconstructed: String = frames + .chunks + .iter() + .map(|c| cvm(c)["data"].as_str().unwrap().to_string()) + .collect(); + assert_eq!( + reconstructed, payload, + "chunks must reconstruct the payload" + ); + assert_eq!( + cvm(&frames.start)["digest"].as_str(), + Some(sha256_digest(payload).as_str()), + "digest must hash the full payload" + ); +} diff --git a/tests/e2e_happy_path.rs b/tests/e2e_happy_path.rs index 505470e..a6003ca 100644 --- a/tests/e2e_happy_path.rs +++ b/tests/e2e_happy_path.rs @@ -318,14 +318,14 @@ async fn run_e2e_scenario(mode: EncryptionMode) { // ── Tests ───────────────────────────────────────────────────────────────── -// NOTE: EncryptionMode::Disabled is intentionally NOT tested here. -// MockRelayPool broadcasts all events to all receivers, including the publisher. -// In Disabled mode (plaintext kind 25910), the server receives its own responses -// and the RMCP handler rejects them. In encrypted modes, the server naturally -// can't decrypt events gift-wrapped for the client, so they're filtered out. -// Real Nostr relays do not echo events back to the publisher, so this is a -// mock-only limitation. Disabled mode is tested at the transport level in -// tests/transport_integration.rs. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn e2e_happy_path_encryption_disabled() { + // Plaintext full-stack works since MockRelayPool started respecting + // per-subscription filters (commit a0f9413): the server's own responses + // are p-tagged to the client, so they no longer echo back into the + // server's subscription as they did with the old broadcast-to-all mock. + run_e2e_scenario(EncryptionMode::Disabled).await; +} #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn e2e_happy_path_encryption_optional() { diff --git a/tests/oversized_timeout_e2e.rs b/tests/oversized_timeout_e2e.rs new file mode 100644 index 0000000..4cf5996 --- /dev/null +++ b/tests/oversized_timeout_e2e.rs @@ -0,0 +1,1045 @@ +//! CEP-22: rmcp-level oversized-transfer timeout e2e tests. +//! +//! Covers progress-aware idle/max-total timeout semantics, stripped-progress +//! forwarding with token-type restoration, watchdog reaping, and the +//! default-on roundtrip. +//! +//! Declared in `Cargo.toml` with `required-features = ["rmcp", "test-utils"]` +//! (same as `e2e_happy_path`) so plain `cargo test` skips it and stays green. + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use contextvm_sdk::core::constants::CTXVM_MESSAGES_KIND; +use contextvm_sdk::core::types::EncryptionMode; +use contextvm_sdk::relay::mock::MockRelayPool; +use contextvm_sdk::transport::base::BaseTransport; +use contextvm_sdk::transport::client::{NostrClientTransport, NostrClientTransportConfig}; +use contextvm_sdk::transport::oversized_transfer::{ + build_oversized_frames, OversizedFrame, OversizedSenderOptions, OversizedTransferConfig, +}; +use contextvm_sdk::transport::server::{NostrServerTransport, NostrServerTransportConfig}; +use contextvm_sdk::{ + progress_aware_options, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, + PeerRequestOptionsExt, RelayPoolTrait, +}; +use nostr_sdk::prelude::*; +use rmcp::handler::server::router::tool::ToolRouter; +use rmcp::handler::server::wrapper::Parameters; +use rmcp::model::{ + CallToolRequestParams, CallToolResult, Content, ErrorData, Implementation, RawContent, + ServerCapabilities, +}; +use rmcp::service::ServiceError; +use rmcp::{schemars, tool, tool_handler, tool_router, ClientHandler, ServerHandler, ServiceExt}; + +// ── harness ────────────────────────────────────────────────────────────────── + +/// Let spawned event loops call `notifications()` before we publish anything. +async fn let_event_loops_start() { + tokio::time::sleep(Duration::from_millis(10)).await; +} + +/// A started client transport over `client_pool` with the given config — plus +/// its message receiver. +async fn start_client_with( + client_pool: MockRelayPool, + config: NostrClientTransportConfig, +) -> ( + NostrClientTransport, + tokio::sync::mpsc::UnboundedReceiver, +) { + let mut client = NostrClientTransport::with_relay_pool( + config, + Arc::new(client_pool) as Arc, + ) + .await + .expect("create client transport"); + let rx = client.take_message_receiver().expect("client rx"); + client.start().await.expect("client start"); + let_event_loops_start().await; + (client, rx) +} + +/// [`start_client_with`] for the common case: plaintext, default timeouts, +/// the given oversized config. +async fn start_client( + client_pool: MockRelayPool, + server_pubkey: &PublicKey, + oversized: OversizedTransferConfig, +) -> ( + NostrClientTransport, + tokio::sync::mpsc::UnboundedReceiver, +) { + start_client_with( + client_pool, + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + ) + .await +} + +/// A plaintext `BaseTransport` over the (greybox) server's pool, used to +/// hand-publish frames to the client. +fn greybox_server_base(server_pool: &Arc) -> BaseTransport { + BaseTransport { + relay_pool: Arc::clone(server_pool) as Arc, + encryption_mode: EncryptionMode::Disabled, + is_connected: true, + } +} + +/// Publish one frame as a plaintext kind-25910 event with the given tags. +async fn publish_frame( + base: &BaseTransport, + recipient: &PublicKey, + tags: &[Tag], + frame: JsonRpcNotification, +) { + base.send_mcp_message( + &JsonRpcMessage::Notification(frame), + recipient, + CTXVM_MESSAGES_KIND, + tags.to_vec(), + Some(false), + None, + ) + .await + .expect("publish frame"); +} + +/// Poll the shared store until an event matching `pred` lands; return its id. +async fn poll_for_event( + pool: &MockRelayPool, + what: &str, + pred: impl Fn(&Event) -> bool, +) -> EventId { + for _ in 0..200 { + if let Some(event) = pool + .stored_events() + .await + .iter() + .find(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND) && pred(e)) + { + return event.id; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!("{what} never reached the relay store"); +} + +/// Count stored kind-25910 events carrying an oversized `cvm` frame. +async fn count_oversized_frames(pool: &MockRelayPool) -> usize { + pool.stored_events() + .await + .iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND)) + .filter(|e| { + serde_json::from_str::(&e.content) + .ok() + .is_some_and(|v| v.get("params").and_then(|p| p.get("cvm")).is_some()) + }) + .count() +} + +/// `true` when the event carries an oversized frame of the given `frameType`. +fn is_frame_of_type(event: &Event, frame_type: &str) -> bool { + serde_json::from_str::(&event.content) + .ok() + .and_then(|v| { + v.get("params") + .and_then(|p| p.get("cvm")) + .and_then(|c| c.get("frameType")) + .and_then(|f| f.as_str().map(|f| f == frame_type)) + }) + .unwrap_or(false) +} + +/// Receive the next client message within `ms`, panicking on timeout/close. +async fn recv_within( + rx: &mut tokio::sync::mpsc::UnboundedReceiver, + ms: u64, + what: &str, +) -> JsonRpcMessage { + tokio::time::timeout(Duration::from_millis(ms), rx.recv()) + .await + .unwrap_or_else(|_| panic!("timed out waiting for {what}")) + .expect("client channel closed") +} + +/// Drain client messages for up to `ms`, returning the first *response* and +/// skipping stripped progress forwards. `None` when the window closes first. +async fn try_recv_response( + rx: &mut tokio::sync::mpsc::UnboundedReceiver, + ms: u64, +) -> Option { + let deadline = tokio::time::Instant::now() + Duration::from_millis(ms); + loop { + match tokio::time::timeout_at(deadline, rx.recv()).await { + Err(_) => return None, + Ok(None) => panic!("client channel closed"), + Ok(Some(msg)) if msg.is_response() => return Some(msg), + Ok(Some(_)) => continue, + } + } +} + +/// Assert `msg` is a stripped progress forward: right method, token restored +/// to `expected_token`, the expected `progress` slot, and no `cvm` payload. +fn assert_stripped_forward( + msg: JsonRpcMessage, + expected_token: &serde_json::Value, + expected_progress: u64, +) { + let JsonRpcMessage::Notification(n) = msg else { + panic!("expected a stripped progress notification"); + }; + assert_eq!(n.method, "notifications/progress"); + let params = n.params.expect("forwarded progress has params"); + assert_eq!( + ¶ms["progressToken"], expected_token, + "token must be restored to the original JSON value, got {params}" + ); + assert_eq!(params["progress"], serde_json::json!(expected_progress)); + assert!( + params.get("cvm").is_none(), + "cvm payload must be stripped, got {params}" + ); +} + +// ── rmcp full-stack fixtures ───────────────────────────────────────────────── + +/// Wraps a pool so every publish lands `publish_delay` apart — deterministic +/// inter-frame gaps for an oversized response (à la `transport_integration`'s +/// `TestRelayPool::with_publish_delay`). +struct DelayedRelayPool { + inner: Arc, + publish_delay: Duration, +} + +#[async_trait] +impl RelayPoolTrait for DelayedRelayPool { + async fn connect(&self, relay_urls: &[String]) -> contextvm_sdk::Result<()> { + self.inner.connect(relay_urls).await + } + + async fn disconnect(&self) -> contextvm_sdk::Result<()> { + self.inner.disconnect().await + } + + async fn publish_event(&self, event: &Event) -> contextvm_sdk::Result { + tokio::time::sleep(self.publish_delay).await; + self.inner.publish_event(event).await + } + + async fn publish(&self, builder: EventBuilder) -> contextvm_sdk::Result { + tokio::time::sleep(self.publish_delay).await; + self.inner.publish(builder).await + } + + async fn sign(&self, builder: EventBuilder) -> contextvm_sdk::Result { + self.inner.sign(builder).await + } + + async fn signer(&self) -> contextvm_sdk::Result> { + self.inner.signer().await + } + + fn notifications(&self) -> tokio::sync::broadcast::Receiver { + self.inner.notifications() + } + + async fn public_key(&self) -> contextvm_sdk::Result { + self.inner.public_key().await + } + + async fn subscribe(&self, filters: Vec) -> contextvm_sdk::Result<()> { + self.inner.subscribe(filters).await + } + + async fn publish_to( + &self, + urls: &[String], + builder: EventBuilder, + ) -> contextvm_sdk::Result { + self.inner.publish_to(urls, builder).await + } + + async fn fetch_events( + &self, + filters: Vec, + timeout: Duration, + ) -> contextvm_sdk::Result> { + self.inner.fetch_events(filters, timeout).await + } +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +struct BigParams { + len: usize, +} + +/// rmcp server with one tool: `big(len)` returns a `len`-byte text, driving +/// the response over the oversized threshold on demand. +#[derive(Clone)] +struct BigServer { + tool_router: ToolRouter, +} + +impl BigServer { + fn new() -> Self { + Self { + tool_router: Self::tool_router(), + } + } +} + +#[tool_router] +impl BigServer { + #[tool(description = "Return a text payload of `len` bytes")] + fn big( + &self, + Parameters(BigParams { len }): Parameters, + ) -> Result { + Ok(CallToolResult::success(vec![Content::text( + "B".repeat(len), + )])) + } +} + +#[tool_handler] +impl ServerHandler for BigServer { + fn get_info(&self) -> rmcp::model::ServerInfo { + rmcp::model::ServerInfo::new(ServerCapabilities::builder().enable_tools().build()) + .with_server_info(Implementation::new("oversized-e2e-server", "0.1.0")) + } +} + +#[derive(Clone, Default)] +struct DemoClient; +impl ClientHandler for DemoClient {} + +fn first_text(result: &CallToolResult) -> String { + result + .content + .iter() + .find_map(|c| match &c.raw { + RawContent::Text(t) => Some(t.text.clone()), + _ => None, + }) + .unwrap_or_default() +} + +fn call_params(name: &'static str, args: serde_json::Value) -> CallToolRequestParams { + let mut params = CallToolRequestParams::new(name); + if let Ok(v) = serde_json::from_value(args) { + params = params.with_arguments(v); + } + params +} + +/// Greybox: wait for the rmcp client's `tools/call` request event; return its +/// id (for response e-tagging) and its `_meta.progressToken` in wire-string +/// form (rmcp stamps JSON numbers; frames carry the stringified token). +async fn wait_for_rmcp_request(pool: &MockRelayPool) -> (EventId, String) { + for _ in 0..500 { + for event in pool.stored_events().await { + if event.kind != Kind::Custom(CTXVM_MESSAGES_KIND) { + continue; + } + let Ok(v) = serde_json::from_str::(&event.content) else { + continue; + }; + if v.get("method").and_then(|m| m.as_str()) != Some("tools/call") { + continue; + } + let token = match v + .get("params") + .and_then(|p| p.get("_meta")) + .and_then(|m| m.get("progressToken")) + { + Some(serde_json::Value::Number(n)) => n.to_string(), + Some(serde_json::Value::String(s)) => s.clone(), + _ => continue, + }; + return (event.id, token); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!("rmcp tools/call request never reached the relay store"); +} + +/// Assert the requester published a `notifications/cancelled` carrying +/// `reason` (the fork's timeout-cancel publication). +async fn assert_cancelled_with_reason(pool: &MockRelayPool, reason: &str) { + for _ in 0..200 { + for event in pool.stored_events().await { + if event.kind != Kind::Custom(CTXVM_MESSAGES_KIND) { + continue; + } + let Ok(v) = serde_json::from_str::(&event.content) else { + continue; + }; + if v.get("method").and_then(|m| m.as_str()) == Some("notifications/cancelled") + && v.get("params") + .and_then(|p| p.get("reason")) + .and_then(|r| r.as_str()) + == Some(reason) + { + return; + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!("no notifications/cancelled with reason {reason:?} observed"); +} + +/// Greybox harness for the stalled/trickle timeout tests: a stateless rmcp +/// client (initialize emulated locally, so the test-driven "server" only ever +/// sees `tools/call`), with the +/// `call_tool_with_options` future spawned and timed from issue to settle. +async fn spawn_greybox_call( + client_pool: MockRelayPool, + server_pubkey: &PublicKey, + idle: Duration, + max_total: Duration, +) -> tokio::task::JoinHandle<(Result, Duration)> { + let client_transport = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_stateless(true) + .with_oversized_transfer(OversizedTransferConfig::enabled()), + Arc::new(client_pool) as Arc, + ) + .await + .expect("create client transport"); + + let client = DemoClient + .serve(client_transport) + .await + .expect("client init (stateless)"); + + tokio::spawn(async move { + let started = tokio::time::Instant::now(); + let result = client + .peer() + .call_tool_with_options( + call_params("big", serde_json::json!({ "len": 1 })), + progress_aware_options(idle, max_total), + ) + .await; + // Keep the running service alive until the call settles. + drop(client); + (result, started.elapsed()) + }) +} + +// ── harness smoke test ─────────────────────────────────────────────────────── + +/// Smoke test pinning the target's harness wiring: feature gates plus a linked +/// mock relay pair with distinct signing identities. +#[test] +fn harness_mock_relay_pair_is_linked() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + assert_ne!( + client_pool.mock_public_key(), + server_pool.mock_public_key(), + "paired mock pools must have distinct signing identities" + ); +} + +// ── stripped-progress forwarding ───────────────────────────────────────────── + +/// Inbound start/chunk frames of an oversized response are forwarded to the +/// local consumer as stripped progress notifications whose `progressToken` is +/// restored to the JSON **number** recorded at send time (the wire carries the +/// stringified token, as every real sender emits); the `end` frame yields only +/// the reassembled response. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_progress_token_restored() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let (client, mut client_rx) = start_client( + client_pool, + &server_pubkey, + OversizedTransferConfig::enabled(), + ) + .await; + + // Request with the JSON number 7 as its token — the rmcp shape. The + // transport records the original keyed by "7". Small request → single event. + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("e4-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ "_meta": { "progressToken": 7 } })), + }); + client.send(&request).await.expect("send request"); + + // Greybox server: pick up the request event for response correlation. + let request_event_id = poll_for_event(&server_pool, "request event", |e| { + serde_json::from_str::(&e.content) + .ok() + .is_some_and(|v| v.get("method").and_then(|m| m.as_str()) == Some("tools/call")) + }) + .await; + + // A 3-chunk response whose frames carry the realistic WIRE-STRING token + // "7" (`into_progress_notification` stringifies; TS does the same). + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("e4-1"), + result: serde_json::json!({ "blob": "Z".repeat(90) }), + }); + let serialized = serde_json::to_string(&response).unwrap(); + let frames = build_oversized_frames( + &serialized, + &OversizedSenderOptions::new("7").with_chunk_size(serialized.len().div_ceil(3)), + ) + .unwrap(); + assert_eq!(frames.chunks.len(), 3, "harness wants exactly 3 chunks"); + + let base = greybox_server_base(&server_pool); + let tags = BaseTransport::create_response_tags(&client_pubkey, &request_event_id); + for frame in frames.into_ordered() { + publish_frame(&base, &client_pubkey, &tags, frame).await; + } + + // Forward scope: start + each chunk (progress slots 1..=4, no handshake), + // each restored to Number(7) — a verbatim wire clone would surface + // String("7") and fail here. + for expected_progress in 1u64..=4 { + let msg = recv_within(&mut client_rx, 1000, "stripped progress forward").await; + assert_stripped_forward(msg, &serde_json::json!(7), expected_progress); + } + + // The end frame yields exactly the reassembled response… + let msg = recv_within(&mut client_rx, 1000, "reassembled response").await; + assert!(msg.is_response()); + assert_eq!(msg.id(), Some(&serde_json::json!("e4-1"))); + + let extra = tokio::time::timeout(Duration::from_millis(150), client_rx.recv()).await; + assert!( + extra.is_err(), + "no extra forward expected for the end frame" + ); +} + +/// The server's `accept` for a client upload forwards exactly one stripped +/// progress notification (re-arming the requester's idle timer for the +/// response-wait phase), with the original JSON-number token restored from the +/// recorded value, and the blocked oversized send completes. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn accept_frame_forwards_one_progress_reset() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + // Low threshold so a ~2 KB request fragments; server support is unknown + // (no discovery yet) so the send blocks on the accept handshake. + let (client, mut client_rx) = start_client( + client_pool, + &server_pubkey, + OversizedTransferConfig::enabled() + .with_threshold(600) + .with_chunk_size(600), + ) + .await; + let client = Arc::new(client); + + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("e5-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ + "_meta": { "progressToken": 8 }, + "blob": "Q".repeat(2000), + })), + }); + let send_task = { + let client = Arc::clone(&client); + tokio::spawn(async move { client.send(&request).await }) + }; + + // Greybox server: wait for the start frame, echo `accept` e-tagged to it. + // The wire token is the stringified echo — exactly what a real server + // emits (`into_progress_notification`). + let start_event_id = poll_for_event(&server_pool, "start frame", |e| { + is_frame_of_type(e, "start") + }) + .await; + let accept = OversizedFrame::Accept + .into_progress_notification("8", 2, None) + .expect("build accept frame"); + let tags = BaseTransport::create_response_tags(&client_pubkey, &start_event_id); + publish_frame( + &greybox_server_base(&server_pool), + &client_pubkey, + &tags, + accept, + ) + .await; + + // The blocked send unblocks and completes the upload. + send_task + .await + .expect("send task join") + .expect("oversized send completes after accept"); + + // Exactly one stripped forward surfaced for the accept (slot 2), token + // restored to the original JSON number. + let msg = recv_within(&mut client_rx, 1000, "accept progress forward").await; + assert_stripped_forward(msg, &serde_json::json!(8), 2); + + let extra = tokio::time::timeout(Duration::from_millis(150), client_rx.recv()).await; + assert!( + extra.is_err(), + "exactly one forward expected for the accept handshake" + ); +} + +// ── receiver-side watchdog ─────────────────────────────────────────────────── + +/// The client watchdog reaps a transfer stalled past its hard deadline; the +/// late remainder of the transfer is orphan-ignored (nothing delivered), and a +/// fresh transfer re-using the reaped token completes — proving the slot was +/// actually freed (duplicate-`start` would error if state lingered). +/// +/// Timing: 200 ms deadline, 1 s sweep tick (correlation TTL 2 s → clamp +/// floor), 3 s stall = 3 sweep ticks and 15× the deadline. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn watchdog_reaps_stalled_inbound_transfer_client() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let (_client, mut client_rx) = start_client_with( + client_pool, + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_timeout(Duration::from_secs(2)) + .with_oversized_transfer( + OversizedTransferConfig::enabled().with_transfer_timeout_ms(200), + ), + ) + .await; + + // An unsolicited 3-chunk inbound message keyed by token "w6" (delivery + // does not require a pending request; frames only need the client p-tag). + let payload = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("w6-1"), + result: serde_json::json!({ "blob": "Y".repeat(90) }), + }); + let serialized = serde_json::to_string(&payload).unwrap(); + let build = || { + build_oversized_frames( + &serialized, + &OversizedSenderOptions::new("w6").with_chunk_size(serialized.len().div_ceil(3)), + ) + .expect("build frames") + }; + + let base = greybox_server_base(&server_pool); + let tags = BaseTransport::create_recipient_tags(&client_pubkey); + + // start + first chunk, then stall past the deadline and ≥ 3 sweep ticks. + let mut stalled = build().into_ordered(); + let rest = stalled.split_off(2); + for frame in stalled { + publish_frame(&base, &client_pubkey, &tags, frame).await; + } + tokio::time::sleep(Duration::from_secs(3)).await; + + // The late remainder hits a reaped slot: orphan-ignored, nothing surfaces. + for frame in rest { + publish_frame(&base, &client_pubkey, &tags, frame).await; + } + assert!( + try_recv_response(&mut client_rx, 500).await.is_none(), + "a reaped transfer must never deliver a message" + ); + + // The same token is re-admittable; a fresh full transfer completes. + for frame in build().into_ordered() { + publish_frame(&base, &client_pubkey, &tags, frame).await; + } + let delivered = try_recv_response(&mut client_rx, 1500) + .await + .expect("fresh same-token transfer must deliver after the reap"); + assert_eq!(delivered.id(), Some(&serde_json::json!("w6-1"))); +} + +/// Mirror of the client watchdog test against the server transport — its sweep +/// arm reaps the stalled per-peer transfer (and drops the now-empty receiver), +/// the late remainder is orphan-ignored, and a fresh same-token upload +/// reassembles into an `IncomingRequest` on the server receiver. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn watchdog_reaps_stalled_inbound_transfer_server() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + // 200 ms hard deadline → 1 s sweep tick (clamp floor). + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer( + OversizedTransferConfig::enabled().with_transfer_timeout_ms(200), + ), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + let_event_loops_start().await; + + // Greybox client upload: handshake-layout frames (the server emits an + // accept to the unknown client; the test ignores it). + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("w7-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ + "_meta": { "progressToken": "w7" }, + "blob": "Q".repeat(300), + })), + }); + let serialized = serde_json::to_string(&request).unwrap(); + let build = || { + build_oversized_frames( + &serialized, + &OversizedSenderOptions::new("w7") + .with_chunk_size(96) + .with_accept_handshake(true), + ) + .expect("build frames") + }; + + let base = BaseTransport { + relay_pool: Arc::new(client_pool) as Arc, + encryption_mode: EncryptionMode::Disabled, + is_connected: true, + }; + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + + // start + first chunk, then stall past the deadline and ≥ 3 sweep ticks. + let mut stalled = build().into_ordered(); + let rest = stalled.split_off(2); + for frame in stalled { + publish_frame(&base, &server_pubkey, &tags, frame).await; + } + tokio::time::sleep(Duration::from_secs(3)).await; + + // Late remainder → reaped slot → orphan-ignored; no request surfaces. + for frame in rest { + publish_frame(&base, &server_pubkey, &tags, frame).await; + } + let nothing = tokio::time::timeout(Duration::from_millis(500), server_rx.recv()).await; + assert!( + nothing.is_err(), + "a reaped transfer must never deliver a request" + ); + + // Fresh same-token upload completes (the swept-empty receiver was dropped + // and is recreated on admission). + for frame in build().into_ordered() { + publish_frame(&base, &server_pubkey, &tags, frame).await; + } + let incoming = tokio::time::timeout(Duration::from_millis(1500), server_rx.recv()) + .await + .expect("fresh same-token transfer must deliver after the reap") + .expect("server channel closed"); + assert_eq!(incoming.message.method(), Some("tools/call")); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("w7-1"))); +} + +// ── progress-aware request timeouts ────────────────────────────────────────── + +/// Full-stack: a `call_tool_with_options` whose oversized response outlasts the +/// idle timeout several times over still succeeds, because every forwarded +/// chunk resets the idle timer. Without the forwarding seam this exact call +/// fails with `Timeout{400ms}`. +/// +/// Timing: 150 ms publish delay vs 400 ms idle (≥ 2.5×); ~240 KB payload = ≥ 5 +/// chunks at the 48 000 B default chunk size, pinned above the 65 535 B +/// single-event NIP-44 cap so an unfragmented path cannot pass. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_response_progress_resets_idle_timeout() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + // The server publishes through a 150 ms-delay pool: response frames land + // one delay apart, so the full transfer takes ≥ 5 × 150 ms > idle. + let delayed: Arc = Arc::new(DelayedRelayPool { + inner: Arc::clone(&server_pool), + publish_delay: Duration::from_millis(150), + }); + let server_transport = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(OversizedTransferConfig::enabled()), + delayed, + ) + .await + .expect("create server transport"); + + let client_transport = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(OversizedTransferConfig::enabled()), + Arc::new(client_pool) as Arc, + ) + .await + .expect("create client transport"); + + let server_handle = tokio::spawn(async move { + let running = BigServer::new() + .serve(server_transport) + .await + .expect("server serve failed"); + let _ = running.waiting().await; + }); + tokio::time::sleep(Duration::from_millis(20)).await; + + let client = tokio::time::timeout(Duration::from_secs(10), DemoClient.serve(client_transport)) + .await + .expect("client startup timed out") + .expect("client init failed"); + + let result = client + .peer() + .call_tool_with_options( + call_params("big", serde_json::json!({ "len": 240_000 })), + progress_aware_options(Duration::from_millis(400), Duration::from_secs(10)), + ) + .await + .expect("oversized call must succeed via per-chunk idle resets"); + assert_eq!(first_text(&result).len(), 240_000); + + // Fragmentation actually happened (not a single-event fluke). + let frames = count_oversized_frames(&server_pool).await; + assert!(frames >= 5, "expected ≥5 oversized frames, got {frames}"); + + server_handle.abort(); +} + +/// Greybox, paused time: start + 2 chunks then silence — the idle timer, reset +/// by each forwarded frame, trips ~idle after the LAST chunk (not after request +/// issue), with `Timeout{timeout == idle}` and a published +/// `notifications/cancelled` carrying the idle reason. +#[tokio::test(start_paused = true)] +async fn oversized_stalled_transfer_trips_idle_timeout() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let idle = Duration::from_millis(400); + let call = spawn_greybox_call(client_pool, &server_pubkey, idle, Duration::from_secs(10)).await; + + let (request_event_id, wire_token) = wait_for_rmcp_request(&server_pool).await; + + // A 5-chunk response; publish start + 2 chunks 200 ms apart, then stall. + let payload = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(0), + result: serde_json::json!({ "blob": "S".repeat(200) }), + }); + let serialized = serde_json::to_string(&payload).unwrap(); + let frames = build_oversized_frames( + &serialized, + &OversizedSenderOptions::new(&wire_token).with_chunk_size(serialized.len().div_ceil(5)), + ) + .expect("build frames"); + let base = greybox_server_base(&server_pool); + let tags = BaseTransport::create_response_tags(&client_pubkey, &request_event_id); + for frame in frames.into_ordered().into_iter().take(3) { + publish_frame(&base, &client_pubkey, &tags, frame).await; + tokio::time::sleep(Duration::from_millis(200)).await; + } + + let (result, elapsed) = call.await.expect("join call task"); + match result { + Err(ServiceError::Timeout { timeout }) => assert_eq!(timeout, idle, "idle timer fired"), + other => panic!("expected idle Timeout, got {other:?}"), + } + // Two 200 ms-spaced resets pushed expiry well past issue+idle: the error + // lands ~idle after the LAST frame (≈ 0.6 s publish phase + 0.4 s idle). + assert!( + elapsed >= Duration::from_millis(700), + "resets must precede expiry; elapsed {elapsed:?}" + ); + assert_cancelled_with_reason(&server_pool, "request timeout").await; +} + +/// Greybox, paused time: chunks trickle every 150 ms — idle (400 ms) never +/// fires, but the max-total cap (1 s) does, despite continuous progress, with +/// `Timeout{timeout == max_total}` and the +/// max-total cancellation reason. +#[tokio::test(start_paused = true)] +async fn oversized_trickle_trips_max_total_timeout() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let idle = Duration::from_millis(400); + let max_total = Duration::from_secs(1); + let call = spawn_greybox_call(client_pool, &server_pubkey, idle, max_total).await; + + let (request_event_id, wire_token) = wait_for_rmcp_request(&server_pool).await; + + // ≥ 12 chunks available; publish start + chunks every 150 ms — the + // trickle outlasts max_total while every gap stays under idle. + let payload = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(0), + result: serde_json::json!({ "blob": "T".repeat(500) }), + }); + let serialized = serde_json::to_string(&payload).unwrap(); + let frames = build_oversized_frames( + &serialized, + &OversizedSenderOptions::new(&wire_token).with_chunk_size(40), + ) + .expect("build frames"); + let base = greybox_server_base(&server_pool); + let tags = BaseTransport::create_response_tags(&client_pubkey, &request_event_id); + for frame in frames.into_ordered().into_iter().take(10) { + publish_frame(&base, &client_pubkey, &tags, frame).await; + tokio::time::sleep(Duration::from_millis(150)).await; + } + + let (result, elapsed) = call.await.expect("join call task"); + match result { + Err(ServiceError::Timeout { timeout }) => { + assert_eq!(timeout, max_total, "max-total timer fired") + } + other => panic!("expected max-total Timeout, got {other:?}"), + } + // Fired at ~max_total despite continuous progress (publishes kept going + // past it; the call settled on its own cap). + assert!( + elapsed >= Duration::from_millis(900) && elapsed <= Duration::from_millis(1400), + "max-total should cap the call at ~1 s; elapsed {elapsed:?}" + ); + assert_cancelled_with_reason(&server_pool, "maximum total timeout exceeded").await; +} + +// ── default-on e2e roundtrip ───────────────────────────────────────────────── + +/// `true` when the (plaintext) event carries a tag whose name is `name`. +fn event_has_tag(event: &Event, name: &str) -> bool { + event + .tags + .iter() + .any(|t| t.clone().to_vec().first().map(String::as_str) == Some(name)) +} + +/// With **no oversized config at all** — both transports on +/// defaults — a > 65 535-byte tool result roundtrips through the full rmcp +/// stack. That size exceeds the single-event NIP-44 plaintext cap, so the +/// roundtrip succeeding proves the server fragmented, which it only does after +/// learning support from the client's default-advertised tag. Capability +/// discovery is asserted on the wire (both directions); the transport's +/// `discovered_server_capabilities()` accessor itself is consumed by +/// `serve()`, and its transport-level behavior is already pinned by +/// `oversized_response_roundtrip_server_to_client`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_default_on_e2e_roundtrip() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + // Default configs: oversized transfer untouched (default-on is the point). + // Plaintext mode so the stored events' discovery tags are inspectable. + let server_transport = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default().with_encryption_mode(EncryptionMode::Disabled), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + let client_transport = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled), + Arc::new(client_pool) as Arc, + ) + .await + .expect("create client transport"); + + let server_handle = tokio::spawn(async move { + let running = BigServer::new() + .serve(server_transport) + .await + .expect("server serve failed"); + let _ = running.waiting().await; + }); + tokio::time::sleep(Duration::from_millis(20)).await; + + let client = tokio::time::timeout(Duration::from_secs(10), DemoClient.serve(client_transport)) + .await + .expect("client startup timed out") + .expect("client init failed"); + + // Plain `call_tool` — no helper, no options: the pure default experience. + // (External timeout only bounds the test; plain calls have none.) + let result = tokio::time::timeout( + Duration::from_secs(15), + client.call_tool(call_params("big", serde_json::json!({ "len": 80_000 }))), + ) + .await + .expect("default-on oversized roundtrip timed out") + .expect("call_tool failed"); + assert_eq!( + first_text(&result).len(), + 80_000, + "the >65 535-byte payload must roundtrip intact" + ); + + // Fragmentation actually happened (a single event cannot carry it). + let frames = count_oversized_frames(&server_pool).await; + assert!( + frames >= 3, + "expected at least start+chunk+end cvm frames, got {frames}" + ); + + // Both sides advertised by default: the client's first request and the + // server's first response each carried the discovery tag — the learning + // inputs for the response-fragmentation gate and the client capability + // snapshot respectively. + let events = server_pool.stored_events().await; + assert!( + events + .iter() + .any(|e| e.pubkey == client_pubkey && event_has_tag(e, "support_oversized_transfer")), + "client's first request must advertise support_oversized_transfer" + ); + assert!( + events + .iter() + .any(|e| e.pubkey == server_pubkey && event_has_tag(e, "support_oversized_transfer")), + "server's first response must advertise support_oversized_transfer" + ); + + server_handle.abort(); +} diff --git a/tests/transport_integration.rs b/tests/transport_integration.rs index 55fdbea..cdada83 100644 --- a/tests/transport_integration.rs +++ b/tests/transport_integration.rs @@ -3650,7 +3650,7 @@ async fn server_close_stops_event_loop() { ); } -// ── CEP-22 PR 3 (M3): oversized transfer end-to-end wiring ─────────────────── +// ── CEP-22: oversized transfer end-to-end wiring ───────────────────────────── /// C→S: a request whose serialized size exceeds the threshold is fragmented into /// multiple kind-25910 frames and reassembled into exactly one IncomingRequest. @@ -3726,6 +3726,143 @@ async fn oversized_request_roundtrip_client_to_server() { assert!(second.is_err(), "only one reassembled request expected"); } +/// CEP-22 parity (TS T3.1): one-shot discovery-tag placement across an oversized +/// *request*'s frames. The client advertises `support_oversized_transfer` on the +/// `start` frame only; the continuation (`chunk`/`end`) frames carry the bare +/// recipient tags. Encryption is Disabled so the plaintext frame tags are directly +/// inspectable — and `support_oversized_transfer` is advertised independently of +/// encryption mode (unlike `support_encryption`; see `get_client_capability_tags`), +/// so Disabled is a valid mode for this assertion. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_continuation_frames_carry_no_discovery_tags() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(6000) + .with_chunk_size(6000); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // The client's first (and only) request is oversized, so the one-shot discovery + // tags ride its `start` frame. + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("disco-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ + "_meta": { "progressToken": "tok-disco" }, + "blob": "A".repeat(10000), + })), + }); + client.send(&request).await.expect("send oversized request"); + + // Confirm the transfer reassembled (so every frame has landed in the store). + let incoming = tokio::time::timeout(Duration::from_millis(1000), server_rx.recv()) + .await + .expect("timeout waiting for reassembled request") + .expect("server channel closed"); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("disco-1"))); + + // Classify every cvm-bearing frame in the store by its `cvm.frameType`. The + // client authors start/chunk/end; the server's `accept` frame matches none of + // the three and is naturally excluded. + let events = server_pool.stored_events().await; + let mut start_frames: Vec<&Event> = Vec::new(); + let mut chunk_frames: Vec<&Event> = Vec::new(); + let mut end_frames: Vec<&Event> = Vec::new(); + for e in events + .iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND)) + { + let frame_type = serde_json::from_str::(&e.content) + .ok() + .as_ref() + .and_then(|v| v.get("params")) + .and_then(|p| p.get("cvm")) + .and_then(|c| c.get("frameType")) + .and_then(|f| f.as_str()) + .map(str::to_string); + match frame_type.as_deref() { + Some("start") => start_frames.push(e), + Some("chunk") => chunk_frames.push(e), + Some("end") => end_frames.push(e), + _ => {} + } + } + + assert_eq!( + start_frames.len(), + 1, + "expected exactly one start frame, got {}", + start_frames.len() + ); + assert!( + !chunk_frames.is_empty(), + "expected at least one chunk frame for a fragmented request" + ); + assert_eq!( + end_frames.len(), + 1, + "expected exactly one end frame, got {}", + end_frames.len() + ); + + // The start frame advertises oversized support… + assert!( + has_tag_name(start_frames[0], tags::SUPPORT_OVERSIZED_TRANSFER), + "the start frame must carry support_oversized_transfer" + ); + // …and every continuation frame (chunk/end) must NOT repeat it. + for &frame in chunk_frames.iter().chain(end_frames.iter()) { + assert!( + !has_tag_name(frame, tags::SUPPORT_OVERSIZED_TRANSFER), + "continuation frames must not carry support_oversized_transfer" + ); + } +} + +/// Receive the next non-notification client message, skipping the stripped +/// progress forwards that precede an oversized response (their shape is pinned +/// by `oversized_progress_token_restored` in `oversized_timeout_e2e.rs`). +async fn recv_skipping_progress_forwards( + rx: &mut tokio::sync::mpsc::UnboundedReceiver, +) -> JsonRpcMessage { + loop { + let msg = tokio::time::timeout(Duration::from_millis(1000), rx.recv()) + .await + .expect("timeout waiting for client message") + .expect("client channel closed"); + if !matches!(msg, JsonRpcMessage::Notification(_)) { + return msg; + } + } +} + /// S→C: a large response is fragmented by the server and reassembled by the /// client into a single response delivered on its receiver. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -3790,11 +3927,9 @@ async fn oversized_response_roundtrip_server_to_client() { .await .expect("server send oversized response"); - // The client reassembles and delivers the full response. - let client_msg = tokio::time::timeout(Duration::from_millis(1000), client_rx.recv()) - .await - .expect("timeout waiting for reassembled response") - .expect("client channel closed"); + // The client reassembles and delivers the full response (preceded by + // stripped progress forwards, skipped here). + let client_msg = recv_skipping_progress_forwards(&mut client_rx).await; assert!(client_msg.is_response()); assert_eq!(client_msg.id(), Some(&serde_json::json!("rsp-1"))); if let JsonRpcMessage::Response(r) = client_msg { @@ -3803,8 +3938,8 @@ async fn oversized_response_roundtrip_server_to_client() { panic!("expected a response"); } - // Test gap 6: the start frame of the oversized response carried the server's - // discovery tags, so the client must have learned the server supports oversized + // The start frame of the oversized response carried the server's discovery + // tags, so the client must have learned the server supports oversized // transfer by the time the reassembled response is delivered. assert!( client @@ -3814,7 +3949,203 @@ async fn oversized_response_roundtrip_server_to_client() { ); } -// ── CEP-22 PR 3 (M4): remaining §7 oversized-transfer tests ────────────────── +// ── CEP-22: number-or-string progressToken plumbing ────────────────────────── + +/// Collect the `params` of every cvm-bearing (oversized-transfer) frame stored +/// in `pool`. +async fn stored_oversized_frame_params(pool: &MockRelayPool) -> Vec { + pool.stored_events() + .await + .iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND)) + .filter_map(|e| serde_json::from_str::(&e.content).ok()) + .filter_map(|v| v.get("params").cloned()) + .filter(|p| p.get("cvm").is_some()) + .collect() +} + +/// C→S: rmcp stamps *numeric* progressTokens; a request carrying one must +/// still fragment (string-only extraction returned `None` and silently fell +/// back to the single-event path). On the wire every frame carries the +/// stringified token; the reassembled request preserves the original number. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_request_roundtrip_numeric_token() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(6000) + .with_chunk_size(6000); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // The token is the JSON number 7 — the shape rmcp emits. + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("big-num-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ + "_meta": { "progressToken": 7 }, + "blob": "A".repeat(10000), + })), + }); + client.send(&request).await.expect("send oversized request"); + + // Fragmentation must have happened, and every frame (start/chunks/end plus + // the server's accept echo) must address the transfer by the *stringified* + // token — numbers never appear on the wire. + let frame_params = stored_oversized_frame_params(&server_pool).await; + assert!( + frame_params.len() > 1, + "numeric-token oversized request must publish multiple frames, got {}", + frame_params.len() + ); + for params in &frame_params { + assert_eq!( + params.get("progressToken"), + Some(&serde_json::json!("7")), + "frames must carry the stringified token, got {params}" + ); + } + + // Exactly one reassembled request arrives, original numeric token intact. + let incoming = tokio::time::timeout(Duration::from_millis(1000), server_rx.recv()) + .await + .expect("timeout waiting for reassembled request") + .expect("server channel closed"); + assert_eq!(incoming.message.method(), Some("tools/call")); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("big-num-1"))); + if let JsonRpcMessage::Request(r) = &incoming.message { + assert_eq!( + r.params.as_ref().unwrap()["_meta"]["progressToken"], + serde_json::json!(7), + "reassembly must reproduce the original numeric token" + ); + } else { + panic!("expected a request"); + } + + let second = tokio::time::timeout(Duration::from_millis(100), server_rx.recv()).await; + assert!(second.is_err(), "only one reassembled request expected"); +} + +/// S→C: a request whose `_meta.progressToken` is numeric must still open +/// the server's response fragmentation gate (string-only extraction left +/// `route.progress_token = None`, so responses to rmcp clients never +/// fragmented). Response frames carry the stringified token on the wire. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_response_roundtrip_numeric_token() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(6000) + .with_chunk_size(6000); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + let mut client_rx = client.take_message_receiver().expect("client rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // Small request (single event) carrying the JSON number 7 as its token. + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("rsp-num-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ "_meta": { "progressToken": 7 } })), + }); + client.send(&request).await.expect("send request"); + + let incoming = tokio::time::timeout(Duration::from_millis(1000), server_rx.recv()) + .await + .expect("timeout waiting for request") + .expect("server channel closed"); + + // Server replies with a payload well over the threshold. + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("rsp-num-1"), + result: serde_json::json!({ "blob": "B".repeat(10000) }), + }); + server + .send_response(&incoming.event_id, response) + .await + .expect("server send oversized response"); + + // The client reassembles and delivers the full response (preceded by + // stripped progress forwards, skipped here). + let client_msg = recv_skipping_progress_forwards(&mut client_rx).await; + assert!(client_msg.is_response()); + assert_eq!(client_msg.id(), Some(&serde_json::json!("rsp-num-1"))); + if let JsonRpcMessage::Response(r) = client_msg { + assert_eq!(r.result["blob"], serde_json::json!("B".repeat(10000))); + } else { + panic!("expected a response"); + } + + // The response was actually fragmented, addressed by the stringified token. + let frame_params = stored_oversized_frame_params(&server_pool).await; + assert!( + frame_params.len() > 1, + "numeric-token response must fragment, got {} frames", + frame_params.len() + ); + for params in &frame_params { + assert_eq!( + params.get("progressToken"), + Some(&serde_json::json!("7")), + "response frames must carry the stringified token, got {params}" + ); + } +} + +// ── CEP-22: remaining oversized-transfer tests ─────────────────────────────── /// Start an oversized-enabled server over `server_pool`, returning the live /// transport (keep alive) and its request receiver. @@ -4176,8 +4507,13 @@ async fn oversized_gate_off_sends_single_event() { .with_relay_urls(vec!["wss://mock.relay".to_string()]) .with_server_pubkey(server_pubkey.to_hex()) .with_encryption_mode(EncryptionMode::Disabled) - // Disabled gate; threshold present only to prove it is above-threshold. - .with_oversized_transfer(OversizedTransferConfig::default().with_threshold(256)), + // Explicitly disabled gate (the default flipped to enabled); + // threshold present only to prove it is above-threshold. + .with_oversized_transfer( + OversizedTransferConfig::default() + .with_enabled(false) + .with_threshold(256), + ), as_pool(client_pool), ) .await @@ -4378,7 +4714,7 @@ async fn oversized_decision_accounts_for_encryption_inflation() { ); } -// ── CEP-22 PR 3 gap 4: server accept-frame emission shape ──────────────────── +// ── CEP-22: server accept-frame emission shape ─────────────────────────────── /// Poll the shared event store for the server's emitted `accept` frame (a /// plaintext kind-25910 notification authored by the server whose `cvm.frameType` @@ -4464,4 +4800,9 @@ async fn server_emits_accept_frame_with_expected_shape() { Some("tok-acc"), "accept frame must echo the transfer's progressToken" ); + assert_eq!( + params["message"].as_str(), + Some("oversized request accepted"), + "accept frame must carry the TS-matching human-readable message" + ); }