Skip to content

Implement Server-to-Client Bidirectional Communication (Notifications, Sampling, Elicitation) #87

@avrabe

Description

@avrabe

Summary

Implement full bidirectional communication support to pass the remaining 5 conformance tests:

  • tools-call-with-logging - Server sends notifications/message to client
  • tools-call-with-progress - Server sends notifications/progress to client
  • tools-call-sampling - Server makes sampling/createMessage request to client
  • tools-call-elicitation - Server makes elicitation/create request to client
  • elicitation-sep1034-defaults - Elicitation with default values

Current State

Conformance: 21/26 tests passing (81%)

The framework currently supports only client → server request/response patterns. The MCP spec also requires:

  1. Server → Client notifications (fire-and-forget)
  2. Server → Client requests (with response correlation)

MCP Specification Requirements

1. Logging Notifications (notifications/message)

Servers must be able to send log messages during tool execution:

{
  "jsonrpc": "2.0",
  "method": "notifications/message",
  "params": {
    "level": "info",
    "logger": "tool_name",
    "data": { "message": "Processing step 1..." }
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/server/utilities/logging

2. Progress Notifications (notifications/progress)

Servers must report progress during long-running operations:

{
  "jsonrpc": "2.0",
  "method": "notifications/progress",
  "params": {
    "progressToken": "unique-token",
    "progress": 50,
    "total": 100
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/server/utilities/progress

3. Sampling Requests (sampling/createMessage)

Servers can request LLM completions from clients:

// Server → Client Request
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "sampling/createMessage",
  "params": {
    "messages": [{ "role": "user", "content": { "type": "text", "text": "Summarize this" } }],
    "maxTokens": 100
  }
}

// Client → Server Response
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "role": "assistant",
    "content": { "type": "text", "text": "Here's the summary..." },
    "model": "claude-3-sonnet",
    "stopReason": "endTurn"
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/client/sampling

4. Elicitation Requests (elicitation/create)

Servers can request structured user input from clients:

// Server → Client Request
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "elicitation/create",
  "params": {
    "message": "Please provide your name",
    "requestedSchema": { "type": "object", "properties": { "name": { "type": "string" } } }
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation

Implementation Plan

Phase 1: Core Infrastructure

1.1 Define ToolContext Trait

Create a context object that tool handlers can use to communicate with clients:

// mcp-server/src/context.rs
use async_trait::async_trait;

/// Context provided to tool handlers for bidirectional communication
#[async_trait]
pub trait ToolContext: Send + Sync {
    /// Send a log notification to the client
    async fn send_log(&self, level: LogLevel, logger: Option<&str>, data: Value) -> Result<()>;
    
    /// Send a progress notification to the client
    async fn send_progress(&self, token: &str, progress: u64, total: Option<u64>) -> Result<()>;
    
    /// Request LLM sampling from the client (blocks until response)
    async fn request_sampling(&self, params: CreateMessageRequest) -> Result<CreateMessageResult>;
    
    /// Request user input from the client (blocks until response)
    async fn request_elicitation(&self, params: ElicitationRequest) -> Result<ElicitationResult>;
    
    /// Get the progress token for this request (if provided by client)
    fn progress_token(&self) -> Option<&str>;
}

1.2 Transport Layer Changes

Add notification/request sending capability to transports:

// mcp-transport/src/lib.rs
#[async_trait]
pub trait TransportSender: Send + Sync {
    /// Send a notification (no response expected)
    async fn send_notification(&self, method: &str, params: Value) -> Result<()>;
    
    /// Send a request and await response (with timeout)
    async fn send_request(&self, method: &str, params: Value, timeout: Duration) -> Result<Value>;
}

1.3 Session-Aware Request Handling

For HTTP/SSE transport, notifications must be sent on the correct session's SSE stream:

// Track active sessions and their notification channels
pub struct SessionManager {
    sessions: DashMap<SessionId, mpsc::Sender<Notification>>,
    pending_requests: DashMap<RequestId, oneshot::Sender<Value>>,
}

Phase 2: Backend API Changes

2.1 Update McpBackend Trait

Option A: Add context parameter (breaking change)

async fn call_tool(
    &self,
    ctx: &dyn ToolContext,
    request: CallToolRequestParam,
) -> Result<CallToolResult, Self::Error>;

Option B: Use context injection via task-local storage (non-breaking)

// Tool handlers access context via:
let ctx = mcp_context::current();
ctx.send_progress("token", 50, Some(100)).await?;

2.2 Protocol Types

Add missing types to mcp-protocol/src/model.rs:

/// Sampling request parameters
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateMessageRequest {
    pub messages: Vec<SamplingMessage>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub model_preferences: Option<ModelPreferences>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub system_prompt: Option<String>,
    #[serde(rename = "maxTokens")]
    pub max_tokens: u32,
}

/// Sampling response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateMessageResult {
    pub role: String,
    pub content: SamplingContent,
    pub model: String,
    #[serde(rename = "stopReason")]
    pub stop_reason: String,
}

Phase 3: Macro Support

3.1 Update #[mcp_tool] Macro

Add context support to the tool macro:

#[mcp_tool(
    name = "process_data",
    description = "Process data with progress reporting"
)]
async fn process_data(ctx: ToolContext, input: String) -> Result<String> {
    for i in 0..100 {
        ctx.send_progress("processing", i, Some(100)).await?;
        // ... processing logic
    }
    Ok("Done".to_string())
}

3.2 Context-Aware Tool Signature Detection

The macro should detect whether the first parameter is ToolContext and adjust code generation accordingly.

Phase 4: Testing

4.1 Unit Tests

#[tokio::test]
async fn test_tool_context_send_log() {
    let (ctx, receiver) = MockToolContext::new();
    ctx.send_log(LogLevel::Info, Some("test"), json!({"msg": "hello"})).await.unwrap();
    
    let notification = receiver.recv().await.unwrap();
    assert_eq!(notification.method, "notifications/message");
}

#[tokio::test]
async fn test_tool_context_request_sampling() {
    let (ctx, handler) = MockToolContext::with_sampling_handler(|req| {
        CreateMessageResult {
            role: "assistant".to_string(),
            content: SamplingContent::text("Response"),
            model: "test-model".to_string(),
            stop_reason: "endTurn".to_string(),
        }
    });
    
    let result = ctx.request_sampling(CreateMessageRequest { ... }).await.unwrap();
    assert_eq!(result.role, "assistant");
}

4.2 Integration Tests

#[tokio::test]
async fn test_conformance_logging() {
    let server = start_test_server().await;
    let client = TestClient::connect(&server).await;
    
    // Call tool that sends log notifications
    let (result, notifications) = client.call_tool_with_notifications(
        "test_tool_with_logging",
        json!({})
    ).await;
    
    assert!(!notifications.is_empty());
    assert_eq!(notifications[0].method, "notifications/message");
}

4.3 Conformance Test Coverage

Update conformance-server example to use new APIs:

"test_tool_with_logging" => {
    ctx.send_log(LogLevel::Info, Some("tool"), json!({"step": "starting"})).await?;
    ctx.send_log(LogLevel::Info, Some("tool"), json!({"step": "completed"})).await?;
    Ok(CallToolResult::text("Logged successfully"))
}

"test_tool_with_progress" => {
    for i in 0..=100 {
        ctx.send_progress("progress", i, Some(100)).await?;
    }
    Ok(CallToolResult::text("Progress complete"))
}

"test_sampling" => {
    let response = ctx.request_sampling(CreateMessageRequest {
        messages: vec![SamplingMessage::user("Summarize: test data")],
        max_tokens: 100,
        ..Default::default()
    }).await?;
    Ok(CallToolResult::text(format!("LLM said: {}", response.content.text())))
}

Phase 5: Documentation

  • Update McpBackend trait documentation
  • Add "Bidirectional Communication" guide to docs
  • Update macro documentation with context examples
  • Add migration guide for existing tools

Files to Modify/Create

New Files

  • mcp-server/src/context.rs - ToolContext trait and implementations
  • mcp-server/src/session.rs - Session management for request correlation
  • mcp-protocol/src/sampling.rs - Sampling types
  • mcp-protocol/src/elicitation.rs - Elicitation types (if not already complete)

Modified Files

  • mcp-server/src/backend.rs - Update trait signature
  • mcp-server/src/handler.rs - Wire up context to tool calls
  • mcp-transport/src/streamable_http.rs - Add notification sending
  • mcp-transport/src/stdio.rs - Add notification sending
  • mcp-macros/src/tool.rs - Support context parameter
  • examples/conformance-server/src/main.rs - Implement remaining tools

Success Criteria

$ npx @modelcontextprotocol/conformance server --url http://localhost:3000/mcp

=== SUMMARY ===
✓ All 26 scenarios passed
Total: 26 passed, 0 failed

Breaking Changes

This will be a breaking change for existing McpBackend implementations if we choose Option A (explicit context parameter). We should:

  1. Document the migration path clearly
  2. Consider a deprecation period with Option B (task-local storage)
  3. Update all examples and documentation

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions