Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@ required-features = ["rmcp", "test-utils"]
name = "transport_integration"
required-features = ["test-utils"]

[[test]]
name = "oversized_timeout_e2e"
required-features = ["rmcp", "test-utils"]

[dev-dependencies]
# `start_paused` deterministic-time tests (tokio's "full" does not include test-util).
tokio = { version = "1", features = ["test-util"] }
tokio-test = "0.4"
anyhow = "1"
schemars = "0.8"
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ The in-repo Rust SDK guides live in [`docs/README.md`](docs/README.md):
Encryption uses **NIP-44** for payload encryption and **NIP-59** (Gift Wrap) for
metadata-private delivery. Server announcements (kinds 11316–11320) are always public.

Messages too large for a single relay event are fragmented into ordered frames
and reassembled by the receiver (CEP-22 oversized transfer, enabled by default;
it adds no event kind — frames ride inside `notifications/progress` messages).
See [docs/oversized-transfer.md](docs/oversized-transfer.md) for the timeout
model and tuning.

### Server Transport Config

| Field | Default | Description |
Expand All @@ -221,6 +227,7 @@ metadata-private delivery. Server announcements (kinds 11316–11320) are always
| `bootstrap_relay_urls` | `None` | Additional relays for publishing announcements (CEP-6/17) |
| `publish_relay_list` | `true` | Whether to publish kind 10002 relay list metadata |
| `profile_metadata` | `None` | Profile metadata for kind 0 publication (CEP-23) |
| `oversized_transfer` | enabled | CEP-22 oversized payload transfer config ([guide](docs/oversized-transfer.md)) |

### Client Transport Config

Expand All @@ -233,6 +240,7 @@ metadata-private delivery. Server announcements (kinds 11316–11320) are always
| `timeout` | `30s` | Response timeout |
| `discovery_relay_urls` | `None` (bootstrap relays) | Relays for CEP-17 kind 10002 discovery |
| `fallback_operational_relay_urls` | `None` | Relays probed in parallel with CEP-17 discovery |
| `oversized_transfer` | enabled | CEP-22 oversized payload transfer config ([guide](docs/oversized-transfer.md)) |

When `relay_urls` is empty, `start()` runs automatic relay resolution: configured relays > nprofile hints > CEP-17 kind 10002 discovery > fallback probing > bootstrap defaults.

Expand Down
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ For most native Rust applications, the primary entry points are `NostrServerTran
- Encryption guide: plaintext, encrypted, and gift-wrap behavior
- Stateless mode guide: client-side initialize emulation and when to use it
- Discovery guide: public discovery helpers and event kinds
- Oversized transfer guide: CEP-22 fragmentation, the three-timer model, and progress-aware request options

### Bridging existing MCP applications

Expand Down
128 changes: 128 additions & 0 deletions docs/oversized-transfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Oversized Transfer Guide (CEP-22)

Nostr relays cap event sizes (commonly around 64 KiB on the wire, with a
65,535-byte NIP-44 plaintext ceiling for encrypted payloads), while MCP
results — file contents, tool output, resource reads — routinely exceed that.
CEP-22 closes the gap: a JSON-RPC message whose published form would not fit
in a single event is split into an ordered sequence of frames carried inside
MCP `notifications/progress` events, then reassembled and validated
(byte-length + SHA-256) by the receiver before it surfaces as one ordinary
message. Fragmentation is transparent to MCP consumers in both directions:
client→server requests and server→client responses.

## Enabled by default

Oversized transfer is **enabled by default** (matching the TypeScript SDK).
The negotiation gates make this safe with peers that don't support it:

- the **client** fragments only requests that carry a `progressToken` (rmcp
stamps one into every outgoing request), and advertises
`support_oversized_transfer` on its first message;
- the **server** fragments responses only for clients that advertised the
capability, and advertises it on announcements and its first response.

A peer without CEP-22 support just sees one extra discovery tag. To opt out:

```rust
// Whole-config form:
let config = NostrClientTransportConfig::default()
.with_oversized_enabled(false);
// Same builder exists on NostrServerTransportConfig.
```

## Configuration

`OversizedTransferConfig` is attached to both transport configs via
`with_oversized_transfer(..)` (or the `with_oversized_enabled(..)` shorthand):

| Field | Default | Description |
|----------------------------|--------------|-----------------------------------------------------------------------------|
| `enabled` | `true` | Master gate: advertise + activate the capability |
| `threshold` | `48_000` | Published byte size at/above which the sender fragments |
| `chunk_size` | `48_000` | Upper bound on per-chunk payload bytes (shrunk automatically so every published frame stays under `threshold`) |
| `max_transfer_bytes` | `104_857_600` (100 MiB) | Receiver cap on a reassembled payload |
| `max_transfer_chunks` | `10_000` | Receiver cap on chunk count |
| `max_concurrent_transfers` | `64` | Receiver cap on simultaneously active transfers |
| `transfer_timeout_ms` | `300_000` | Receiver-side hard deadline per transfer, from admission; `0` disables the watchdog |
| `max_out_of_order_window` | `21` | How far ahead of the contiguous frontier a chunk may arrive and still be buffered |
| `max_out_of_order_chunks` | `42` | Cap on buffered out-of-order chunks |
| `accept_timeout_ms` | `30_000` | How long an uploading client waits for the server's `accept` handshake |

The decision to fragment is made on the **final published event size**
(signed, JSON-escaped, and gift-wrapped when encryption is on), so `threshold`
is a real wire budget, not a payload-length heuristic.

## The three timers

Three independent timers govern a transfer; knowing who owns each one makes
timeout behavior predictable:

1. **Requester idle timeout** (rmcp, per request — opt-in). Fails the call if
no progress arrives for `idle`. The transports forward every inbound
transfer frame to the requester as a plain progress notification, so a
*live* transfer resets this timer chunk by chunk while a *stalled* one
fails after `idle`.
2. **Requester max-total timeout** (rmcp, per request — opt-in). Hard cap on
the whole call regardless of progress — a trickling transfer cannot hold a
request open forever.
3. **Receiver watchdog** (`transfer_timeout_ms`, transport-owned). A hard
memory bound on inbound reassembly state, measured from `start` admission
and never refreshed by activity. Reaping is local-only — no abort frame is
emitted; the requester's own timers fail the other side. A reaped token is
re-admittable by a fresh `start`.

The first two exist only when you pass request options — **a plain rmcp
`call_tool` has no timeout at all** (infinite await). That is the main
consumer footgun this SDK papers over:

## Recommended MCP client usage

```rust
use std::time::Duration;
use contextvm_sdk::{progress_aware_options, PeerRequestOptionsExt};

let result = running_client
.peer()
.call_tool_with_options(
params,
progress_aware_options(Duration::from_secs(60), Duration::from_secs(300)),
)
.await?;
```

`progress_aware_options(idle, max_total)` builds
`PeerRequestOptions::with_timeout(idle).reset_timeout_on_progress().with_max_total_timeout(max_total)`.
The defaults `DEFAULT_OVERSIZED_IDLE_TIMEOUT` (60 s) and
`DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT` (300 s) intentionally mirror the
transport's numbers (60 s covers the worst-case accept wait; 300 s matches the
receiver watchdog so both sides give up in the same window) — but the peer
layer cannot read transport config, so re-align them manually if you tune
`OversizedTransferConfig`.

Unlike the TypeScript SDK's low-level `client.request()` path, rmcp's timer
reset is not tied to registering an `onprogress` callback — the reset is wired
through request options alone.

For request types beyond `call_tool`, the generic form is two lines on public
rmcp API:

```rust
let handle = peer.send_cancellable_request(request, options).await?;
let response = handle.await_response().await?;
```

### Upload (client→server) caveat

A fragmented *request* receives at most **one** inbound reset: the server's
`accept` handshake frame — and it reaches the rmcp service loop only after the
whole upload send returns. Size `idle` and `max_total` to cover the full
upload duration, not just inter-frame gaps.

## Synthetic progress notifications

Because transfer frames are forwarded to the requester (stripped of their
`cvm` payload, with the request's original `progressToken` restored),
consumers with a custom progress handler observe chunk-granular progress for
oversized responses — usable as transfer-progress UX. Default rmcp handlers
ignore progress for tokens they didn't register, so no action is needed if you
don't want it. Nothing extra goes on the wire; the forwarding is in-process.
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,10 @@ pub use transport::server::{
// ── rmcp re-export ──────────────────────────────────────────────────
#[cfg(feature = "rmcp")]
pub use rmcp;

// ── CEP-22 progress-aware request helpers ───────────────────────────
#[cfg(feature = "rmcp")]
pub use rmcp_transport::progress::{
progress_aware_options, PeerRequestOptionsExt, DEFAULT_OVERSIZED_IDLE_TIMEOUT,
DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT,
};
5 changes: 5 additions & 0 deletions src/rmcp_transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! ContextVM transports plug directly into rmcp service APIs.

pub mod convert;
pub mod progress;
pub mod transport;
pub mod worker;

Expand All @@ -14,4 +15,8 @@ pub use convert::{
internal_to_rmcp_client_rx, internal_to_rmcp_server_rx, rmcp_client_tx_to_internal,
rmcp_server_tx_to_internal,
};
pub use progress::{
progress_aware_options, PeerRequestOptionsExt, DEFAULT_OVERSIZED_IDLE_TIMEOUT,
DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT,
};
pub use worker::{NostrClientWorker, NostrServerWorker};
170 changes: 170 additions & 0 deletions src/rmcp_transport/progress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
//! CEP-22: progress-aware request options for rmcp consumers.
//!
//! Under the rmcp fork, plain [`Peer`] calls such as `call_tool` use
//! `PeerRequestOptions::no_options()` — **no timeout at all**: a stalled
//! oversized response hangs the caller forever. This module closes that gap
//! the way the TS SDK's docs do — by passing request options on the existing
//! call — via an extension trait carrying an options-taking `call_tool`
//! variant, plus a constructor for the recommended progress-aware settings.
//!
//! With CEP-22 enabled, the Nostr transports forward each inbound transfer
//! frame to the requester as a plain progress notification, so an idle
//! timeout built by [`progress_aware_options`] resets on every chunk: a live
//! transfer can run long, a stalled one fails after `idle`, and
//! `max_total` caps the call regardless of progress.
//!
//! ```ignore
//! use contextvm_sdk::{progress_aware_options, PeerRequestOptionsExt};
//!
//! let result = running_service
//! .peer()
//! .call_tool_with_options(
//! params,
//! progress_aware_options(
//! contextvm_sdk::DEFAULT_OVERSIZED_IDLE_TIMEOUT,
//! contextvm_sdk::DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT,
//! ),
//! )
//! .await?;
//! ```

use std::time::Duration;

use rmcp::model::{
CallToolRequest, CallToolRequestParams, CallToolResult, ClientRequest, ServerResult,
};
use rmcp::service::{Peer, PeerRequestOptions, ServiceError};
use rmcp::RoleClient;

/// Default requester-side idle timeout for requests that may trigger a CEP-22
/// oversized transfer: 60 s.
///
/// TS parity twice over: equals the upstream TS SDK's blanket per-request
/// timeout (`DEFAULT_REQUEST_TIMEOUT_MSEC`), and exceeds the worst-case
/// inter-chunk gap including the 30 s accept wait
/// ([`DEFAULT_ACCEPT_TIMEOUT_MS`](crate::transport::oversized_transfer::DEFAULT_ACCEPT_TIMEOUT_MS)).
pub const DEFAULT_OVERSIZED_IDLE_TIMEOUT: Duration = Duration::from_secs(60);

/// Default requester-side max-total timeout: 300 s.
///
/// Aligned with the receiver-side watchdog default
/// ([`DEFAULT_TRANSFER_TIMEOUT_MS`](crate::transport::oversized_transfer::DEFAULT_TRANSFER_TIMEOUT_MS))
/// so the requester gives up in the same window the receiver reaps state —
/// symmetric failure. (Upstream TS sets no max-total default; here it stays
/// opt-in via [`progress_aware_options`], never baked into plain calls.)
pub const DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT: Duration = Duration::from_secs(300);

/// Build the recommended [`PeerRequestOptions`] for requests whose responses
/// may arrive as CEP-22 oversized transfers: an `idle` timeout that resets on
/// every progress notification, capped by `max_total`.
///
/// Equivalent to
/// `PeerRequestOptions::with_timeout(idle).reset_timeout_on_progress().with_max_total_timeout(max_total)`.
/// Named after the mechanism (progress-aware timeouts), not the oversized use
/// case — mirroring the TS SDK, where "oversized" appears in docs but never in
/// the API surface.
///
/// Sizing notes:
/// - `reset_timeout_on_progress` without an idle timeout is a **no-op** — the
/// fork registers a progress watcher only when *both* are set, which is why
/// this constructor takes `idle` rather than making it optional.
/// - The client→server upload direction receives at most **one** inbound
/// reset (the server's `accept` handshake frame) — and it reaches the rmcp
/// service loop only after the whole upload send returns. Size `idle` and
/// `max_total` to cover the full upload duration, not just the gaps.
/// - Sensible defaults: [`DEFAULT_OVERSIZED_IDLE_TIMEOUT`] /
/// [`DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT`]. They intentionally mirror the
/// transport's `OversizedTransferConfig` numbers but are not read from it —
/// the peer layer has no access to transport config by design; align them
/// manually if you tune the transport.
pub fn progress_aware_options(idle: Duration, max_total: Duration) -> PeerRequestOptions {
PeerRequestOptions::with_timeout(idle)
.reset_timeout_on_progress()
.with_max_total_timeout(max_total)
}

/// Options-taking call variants for [`Peer<RoleClient>`] — the rs-side analog
/// of passing `RequestOptions` inline to the TS SDK's `client.callTool`.
///
/// Without these, high-level fork calls (`peer.call_tool(..)` etc.) run with
/// `PeerRequestOptions::no_options()`: **no timeout, infinite await**. Pair
/// with [`progress_aware_options`] for any call whose response may be large
/// (CEP-22 fragments every rmcp request's response once the peer advertises
/// support — rmcp stamps a progress token into every outgoing request).
///
/// For request types without a dedicated variant here, the generic path is
/// two lines on public fork API — no wrapper needed:
///
/// ```ignore
/// let handle = peer.send_cancellable_request(request, options).await?;
/// let response = handle.await_response().await?;
/// ```
///
/// On timeout the call fails with `ServiceError::Timeout { timeout }` (the
/// value identifies which timer fired: `idle` vs `max_total`) and rmcp
/// publishes a `notifications/cancelled` for the request.
pub trait PeerRequestOptionsExt {
/// `call_tool` with explicit [`PeerRequestOptions`] — the direct analog of
/// TS `client.callTool(params, schema, options)`.
fn call_tool_with_options(
&self,
params: CallToolRequestParams,
options: PeerRequestOptions,
) -> impl std::future::Future<Output = Result<CallToolResult, ServiceError>> + Send;
}

impl PeerRequestOptionsExt for Peer<RoleClient> {
async fn call_tool_with_options(
&self,
params: CallToolRequestParams,
options: PeerRequestOptions,
) -> Result<CallToolResult, ServiceError> {
// Mirrors the fork's `method!` expansion for `call_tool`
// (service/client.rs), with options threaded through
// `send_cancellable_request` instead of the option-less default.
let result = self
.send_cancellable_request(
ClientRequest::CallToolRequest(CallToolRequest::new(params)),
options,
)
.await?
.await_response()
.await?;
match result {
ServerResult::CallToolResult(result) => Ok(result),
_ => Err(ServiceError::UnexpectedResponse),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

/// Pins the `progress_aware_options` wiring: the constructor must (a) set the
/// idle `timeout`, (b) enable `reset_timeout_on_progress` — without which the
/// fork registers no progress watcher (`send_request_with_option` only arms
/// one when *both* the flag and a timeout are set), so inbound chunks would
/// never reset the timer — and (c) populate `max_total_timeout`. Distinct
/// idle/max-total values also catch an accidental argument swap in the
/// builder chain. End-to-end behavior is covered by the timeout tests in
/// `tests/oversized_timeout_e2e.rs`; this is the fast unit guard on the flags.
#[test]
fn progress_aware_options_sets_reset_and_both_timeouts() {
let idle = Duration::from_millis(250);
let max_total = Duration::from_secs(42);

let options = progress_aware_options(idle, max_total);

assert_eq!(options.timeout, Some(idle), "idle timeout must be set");
assert!(
options.reset_timeout_on_progress,
"reset_timeout_on_progress must be enabled or the fork arms no progress watcher"
);
assert_eq!(
options.max_total_timeout,
Some(max_total),
"max_total_timeout must be set"
);
}
}
4 changes: 2 additions & 2 deletions src/transport/client/correlation_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ impl ClientCorrelationStore {
}

/// Refresh a pending request's registration timestamp without disturbing its
/// `original_id`/`is_initialize`. Used by CEP-22 oversized transfers (OD-2):
/// each inbound frame "touches" the entry so [`sweep_expired`](Self::sweep_expired)
/// `original_id`/`is_initialize`. Used by CEP-22 oversized transfers: each
/// inbound frame "touches" the entry so [`sweep_expired`](Self::sweep_expired)
/// does not evict it mid-transfer. Returns `true` if the entry existed.
pub async fn touch(&self, event_id: &str) -> bool {
let mut cache = self.pending_requests.write().await;
Expand Down
Loading
Loading