diff --git a/docs/mcp-session-architecture.md b/docs/mcp-session-architecture.md new file mode 100644 index 00000000..2cd3ce33 --- /dev/null +++ b/docs/mcp-session-architecture.md @@ -0,0 +1,337 @@ +# MCP Session Architecture + +## Understanding the Layers + +There are distinct layers in the MCP server architecture: + +### Layer 1: HTTP Server (Network Layer) + +- **ONE** `http.Server` instance per VS Code extension +- Listens on a **single port** (e.g., `http://localhost:45678/mcp`) +- Receives all incoming HTTP requests +- Routes requests based on session ID +- Lives for the entire lifetime of the extension + +### Layer 2: MCP SDK Server Instances (Protocol Layer) + +- Handles MCP protocol logic (tool registration, request/response) +- **Current (Stateless)**: ONE shared instance +- **New (Stateful)**: MULTIPLE instances (one per session) + +### Layer 3: Transport Layer + +- Manages request/response streaming +- **Current (Stateless)**: New transport per HTTP request +- **New (Stateful)**: One transport per session, reused across requests + +## Current Architecture (Stateless) + +``` +┌─────────────────────────────────────────────────────┐ +│ VS Code Extension Process │ +│ │ +│ ┌────────────────────────────────────────────┐ │ +│ │ McpServer Class Instance │ │ +│ │ │ │ +│ │ http.Server (Port 45678) ◄───────────────┼─────┼─── Client Request 1 +│ │ │ │ │ +│ │ │ │ │ +│ │ ├─► Create Transport 1 │ │ +│ │ │ Connect SdkMcpServer ──────┐ │ │ +│ │ │ Handle Request │ │ │ +│ │ │ Close Transport │ │ │ +│ │ │ │ │ │ +│ │ ├─► Create Transport 2 ◄──────┼───┼─────┼─── Client Request 2 +│ │ │ Connect SdkMcpServer ──┐ │ │ │ (parallel) +│ │ │ Handle Request │ │ │ │ +│ │ │ Close Transport │ │ │ │ +│ │ │ │ │ │ │ +│ │ SHARED SdkMcpServer Instance ◄───┴───┴───┼─────┼─── ⚠️ RACE CONDITION! +│ │ (ONE instance, multiple connections) │ │ +│ │ │ │ +│ └────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────┘ +``` + +**Problem**: Multiple transports trying to connect to the **same** SDK server instance simultaneously. + +## New Architecture (Stateful with Sessions) + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ VS Code Extension Process │ +│ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ McpServer Class Instance │ │ +│ │ │ │ +│ │ http.Server (Port 45678) ◄─────────────────────────────┼──┼─── Initialize Request +│ │ │ │ │ (no session ID) +│ │ │ │ │ +│ │ ├─► Detect Initialize Request │ │ +│ │ │ Create NEW SdkMcpServer for Session A │ │ +│ │ │ Create Transport A │ │ +│ │ │ Connect Server A to Transport A (ONCE) │ │ +│ │ │ Store in Maps │ │ +│ │ │ Return session ID: "session-abc" │ │ +│ │ │ │ │ +│ │ ├─► Request with session-abc ◄──────────────────┼──┼─── Follow-up Request 1 +│ │ │ Lookup Transport A │ │ +│ │ │ Reuse (no new connection) │ │ +│ │ │ │ │ +│ │ ├─► Request with session-abc ◄──────────────────┼──┼─── Follow-up Request 2 +│ │ │ Lookup Transport A │ │ (parallel) +│ │ │ Reuse (no new connection) │ │ +│ │ │ │ │ +│ │ ├─► Initialize Request (different client) ◄──────┼──┼─── New Session +│ │ │ Create NEW SdkMcpServer for Session B │ │ +│ │ │ Create Transport B │ │ +│ │ │ Connect Server B to Transport B (ONCE) │ │ +│ │ │ Store in Maps │ │ +│ │ │ Return session ID: "session-xyz" │ │ +│ │ │ │ │ +│ │ Session Storage: │ │ +│ │ ┌─────────────────────────────────────────────┐ │ │ +│ │ │ transports Map │ │ │ +│ │ │ "session-abc" → Transport A ──┐ │ │ │ +│ │ │ "session-xyz" → Transport B ──┼─┐ │ │ │ +│ │ └──────────────────────────────────┼─┼─────────┘ │ │ +│ │ │ │ │ │ +│ │ ┌─────────────────────────────────┼─┼─────────┐ │ │ +│ │ │ servers Map │ │ │ │ │ +│ │ │ "session-abc" → SdkMcpServer A│ │ │ │ │ +│ │ │ "session-xyz" → SdkMcpServer B │ │ │ │ +│ │ └────────────────────────────────────┼─────────┘ │ │ +│ │ │ │ │ +│ │ SdkMcpServer Instance A ◄───────────┘ │ │ +│ │ (tools registered, connected to Transport A) │ │ +│ │ │ │ +│ │ SdkMcpServer Instance B ◄──────────────────────────┐ │ │ +│ │ (tools registered, connected to Transport B) │ │ │ +│ │ │ │ │ +│ └───────────────────────────────────────────────────────┘ │ +│ │ +└──────────────────────────────────────────────────────────────┘ +``` + +**Solution**: Each session has its own isolated SDK server + transport pair. + +## Key Points + +### 1. HTTP Server (Always ONE) + +```typescript +class McpServer { + private httpServer: http.Server | null = null; // ← ONE instance + + async start(port: number) { + // Create ONE HTTP server that listens on ONE port + this.httpServer = http.createServer(async (req, res) => { + // Route to appropriate session based on session ID + }); + + this.httpServer.listen(port); + } +} +``` + +- **Never changes**: Always one HTTP server per extension instance +- **Port**: Single port shared by all sessions +- **Routing**: Uses `mcp-session-id` header to route to correct session + +### 2. SDK Server Instances (ONE → MANY) + +**Current (Stateless)**: + +```typescript +class McpServer { + private server: SdkMcpServer; // ← Shared by all requests + + constructor() { + this.server = new SdkMcpServer({...}); + // Register tools once + } +} +``` + +**New (Stateful)**: + +```typescript +class McpServer { + private servers: Map; // ← One per session + + private createServer(): SdkMcpServer { + const server = new SdkMcpServer({...}); + // Register tools on this instance + return server; + } + + async handleInitialize() { + const server = this.createServer(); // New instance! + const sessionId = randomUUID(); + this.servers.set(sessionId, server); + } +} +``` + +### 3. Request Flow Examples + +#### Example 1: Single Client, Multiple Requests + +``` +Client 1 (VS Code Copilot) + │ + ├─► POST /mcp (initialize) ──┐ + │ No session ID │ + │ ← Response: session-abc │ + │ │ Same Session + ├─► POST /mcp (list tools) ──┤ Same Server Instance + │ Session: session-abc │ Same Transport + │ │ + ├─► POST /mcp (call tool) ──┤ + │ Session: session-abc │ + │ │ + └─► POST /mcp (call tool) ──┘ + Session: session-abc +``` + +**Result**: + +- 4 HTTP requests → ONE HTTP server +- 1 session → ONE SDK server instance +- 1 session → ONE transport (reused 4 times) + +#### Example 2: Multiple Clients (Parallel Sessions) + +``` +Client 1 (VS Code) Client 2 (Windsurf) + │ │ + ├─► POST /mcp (init) ├─► POST /mcp (init) + │ ← session-abc │ ← session-xyz + │ │ + │ Different Sessions │ + │ Different SDK Servers │ + │ Isolated from each other │ + │ │ + ├─► POST /mcp (tool) ├─► POST /mcp (tool) + │ session-abc │ session-xyz + │ │ + └─► POST /mcp (tool) └─► POST /mcp (tool) + session-abc session-xyz +``` + +**Result**: + +- 6 HTTP requests → ONE HTTP server (handles all) +- 2 sessions → TWO SDK server instances +- 2 transports (one per session) + +#### Example 3: Parallel Requests in Same Session + +``` +Client (parallel tool calls) + │ + ├──┬─► POST /mcp (tool A) ─┐ + │ │ session-abc │ + │ │ ├─► Same Transport + │ └─► POST /mcp (tool B) ─┘ Queued internally + │ session-abc + │ (parallel) +``` + +**Result**: + +- Both requests arrive at HTTP server simultaneously +- Both lookup same transport from `transports.get("session-abc")` +- Transport handles queueing internally +- No race condition because server is already connected + +## Why This Architecture? + +### Network Constraints + +- **ONE port per service**: Can't have multiple HTTP servers on same port +- **Solution**: One HTTP server routes to multiple sessions + +### Protocol Isolation + +- **Sessions must be isolated**: Different clients shouldn't interfere +- **Solution**: Separate SDK server instance per session + +### Connection Stability + +- **Avoid reconnection overhead**: `server.connect()` should happen once +- **Solution**: Create connection during initialization, reuse transport + +## Memory Implications + +### Current (Stateless) + +``` +Memory per request: +- Transport: ~1KB +- Connection overhead: ~100ms +- Total: Minimal but inefficient (created/destroyed constantly) +``` + +### New (Stateful) + +``` +Memory per session: +- SDK Server instance: ~50KB +- Transport: ~1KB +- Event store (optional): ~10KB +- Total: ~60KB per active session + +Typical usage: +- 1-2 active sessions (one per IDE) +- ~120KB total +- Sessions cleaned up when idle +``` + +**Trade-off**: Slightly more memory for much better performance and new capabilities. + +## Implementation Details + +### HTTP Request Handler Pseudocode + +```typescript +this.httpServer = http.createServer(async (req, res) => { + // Extract session ID from header + const sessionId = req.headers["mcp-session-id"]; + + if (sessionId && this.transports.has(sessionId)) { + // EXISTING SESSION: Reuse transport + const transport = this.transports.get(sessionId); + await transport.handleRequest(req, res, body); + } else if (!sessionId && isInitializeRequest(body)) { + // NEW SESSION: Create server + transport + const server = this.createServer(); // New SdkMcpServer + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: sessionId => { + this.servers.set(sessionId, server); + this.transports.set(sessionId, transport); + }, + }); + + await server.connect(transport); // Connect ONCE + await transport.handleRequest(req, res, body); + } else { + // ERROR: Invalid request + res.status(400).json({ error: "Invalid session" }); + } +}); +``` + +## Summary + +| Layer | Current (Stateless) | New (Stateful) | +| --------------- | ------------------- | --------------------------- | +| **HTTP Server** | 1 instance | 1 instance (no change) | +| **Port** | 1 port | 1 port (no change) | +| **SDK Servers** | 1 shared instance | N instances (1 per session) | +| **Transports** | 1 per request | 1 per session | +| **Connections** | N per request | 1 per session | + +The HTTP server is just a **router** - it receives requests and dispatches them to the appropriate session's SDK server instance based on the session ID. diff --git a/docs/mcp.md b/docs/mcp.md index 953d26a5..46fc52b5 100644 --- a/docs/mcp.md +++ b/docs/mcp.md @@ -152,7 +152,7 @@ The MCP server provides tools for: - `getLogs` - Retrieve server or debug logs. - `showOutputPanel` - Display output panel in VS Code. -For detailed documentation on each tool including parameters, return types, and examples, see [MCP Tool Reference](mcp-tools.md). +For detailed documentation on each tool including parameters, return types, and examples, see the [MCP Tool Reference](mcp-tools.md). For technical details about the MCP server's session-based architecture, see the [MCP Session Architecture](mcp-session-architecture.md) reference. ## Agent Skills @@ -190,6 +190,46 @@ In addition to MCP tools, the extension provides agent skills that can be regist - [deephaven-docs-searching/SKILL.md](https://github.com/deephaven/vscode-deephaven/tree/main/skills/deephaven-docs-searching/SKILL.md) - For querying Deephaven documentation - Install the skill(s) according to your AI assistant's documentation. +## Troubleshooting + +### Session Not Found (404) + +**Symptom**: The MCP server returns a `404` error with a message about the session not being found. + +**Causes**: + +- The session expired or was cleaned up (e.g., after extension restart or VS Code reload). +- The client is sending a stale session ID from a previous connection. + +**Resolution**: Restart the AI assistant session or MCP client. Most clients will automatically re-initialize and obtain a new session ID. + +### Session Initialization Failures (400) + +**Symptom**: Requests fail with a `400 Bad Request` error. + +**Causes**: + +- A request was sent without a session ID but was not an `initialize` request. +- The client is not following the MCP session protocol. + +**Resolution**: Ensure the client sends an `initialize` request first to establish a session before sending other requests. + +### Stale Sessions After Extension Restart + +**Symptom**: MCP tools stop responding or return errors after the Deephaven extension restarts. + +**Cause**: When the extension restarts, all active sessions are terminated. Clients holding old session IDs will receive errors. + +**Resolution**: Restart the AI assistant session or reload the MCP configuration so the client re-initializes with a fresh session. + +### Port Changes After Workspace Switch + +**Symptom**: MCP connection fails after switching VS Code workspaces. + +**Cause**: Each workspace uses an auto-allocated port. Switching workspaces changes the port. + +**Resolution**: Check the `MCP:` status bar item for the current port and update your MCP configuration accordingly. You may also need to restart the AI assistant session. + ## Tool Response Format All MCP tools follow a consistent response structure: diff --git a/src/mcp/McpServer.spec.ts b/src/mcp/McpServer.spec.ts new file mode 100644 index 00000000..2a9cfdea --- /dev/null +++ b/src/mcp/McpServer.spec.ts @@ -0,0 +1,439 @@ +import * as http from 'http'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { McpTool, McpToolSpec } from '../types'; +import type { OutputChannelWithHistory } from '../util'; + +/* eslint-disable @typescript-eslint/naming-convention */ +// HTTP headers and MCP protocol headers must use their spec-defined names + +vi.mock('vscode'); + +// Mock all tool creators to return minimal stub tools +vi.mock('./tools', () => ({ + createAddRemoteFileSourcesTool: vi.fn(() => + makeStubTool('addRemoteFileSources') + ), + createConnectToServerTool: vi.fn(() => makeStubTool('connectToServer')), + createGetColumnStatsTool: vi.fn(() => makeStubTool('getColumnStats')), + createGetLogsTool: vi.fn(() => makeStubTool('getLogs')), + createGetTableDataTool: vi.fn(() => makeStubTool('getTableData')), + createGetTableStatsTool: vi.fn(() => makeStubTool('getTableStats')), + createListConnectionsTool: vi.fn(() => makeStubTool('listConnections')), + createListVariablesTool: vi.fn(() => makeStubTool('listVariables')), + createListRemoteFileSourcesTool: vi.fn(() => + makeStubTool('listRemoteFileSources') + ), + createListServersTool: vi.fn(() => makeStubTool('listServers')), + createOpenFilesInEditorTool: vi.fn(() => makeStubTool('openFilesInEditor')), + createOpenVariablePanelsTool: vi.fn(() => makeStubTool('openVariablePanels')), + createRemoveRemoteFileSourcesTool: vi.fn(() => + makeStubTool('removeRemoteFileSources') + ), + createRunCodeFromUriTool: vi.fn(() => makeStubTool('runCodeFromUri')), + createRunCodeTool: vi.fn(() => makeStubTool('runCode')), + createSetEditorConnectionTool: vi.fn(() => + makeStubTool('setEditorConnection') + ), + createShowOutputPanelTool: vi.fn(() => makeStubTool('showOutputPanel')), +})); + +vi.mock('./tools/connectToServer', () => ({ + createConnectToServerTool: vi.fn(() => makeStubTool('connectToServer')), +})); + +/** Create a minimal stub tool that echoes back its args */ +function makeStubTool(name: string): McpTool { + return { + name, + spec: { + title: name, + description: `Stub tool: ${name}`, + inputSchema: {}, + outputSchema: {}, + }, + handler: vi.fn().mockResolvedValue({ + content: [{ type: 'text', text: `stub result for ${name}` }], + }), + }; +} + +/** Create a minimal mock for OutputChannelWithHistory */ +function makeMockOutputChannel(): OutputChannelWithHistory { + return { + appendLine: vi.fn(), + append: vi.fn(), + show: vi.fn(), + clear: vi.fn(), + dispose: vi.fn(), + hide: vi.fn(), + replace: vi.fn(), + name: 'mock', + } as unknown as OutputChannelWithHistory; +} + +// Helper: send an HTTP POST to the MCP server and collect the full response +function postToMcp( + port: number, + body: unknown, + headers: Record = {} +): Promise<{ + status: number; + headers: http.IncomingMessage['headers']; + body: string; +}> { + return new Promise((resolve, reject) => { + const payload = JSON.stringify(body); + const req = http.request( + { + hostname: '127.0.0.1', + port, + path: '/mcp', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + // StreamableHTTPServerTransport checks for acceptable response formats. + // Include both JSON and SSE to satisfy the transport's Accept check. + Accept: 'application/json, text/event-stream', + 'Content-Length': Buffer.byteLength(payload), + ...headers, + }, + }, + res => { + let data = ''; + res.on('data', chunk => (data += chunk)); + res.on('end', () => + resolve({ + status: res.statusCode ?? 0, + headers: res.headers, + body: data, + }) + ); + } + ); + req.on('error', reject); + req.write(payload); + req.end(); + }); +} + +const INITIALIZE_REQUEST = { + jsonrpc: '2.0', + id: 1, + method: 'initialize', + params: { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test-client', version: '0.0.1' }, + }, +}; + +const LIST_TOOLS_REQUEST = { + jsonrpc: '2.0', + id: 2, + method: 'tools/list', + params: {}, +}; + +describe('McpServer', () => { + // Dynamic import so mocks above apply + let McpServer: (typeof import('./McpServer'))['McpServer']; + let server: import('./McpServer').McpServer; + let port: number; + + beforeEach(async () => { + ({ McpServer } = await import('./McpServer')); + + server = new McpServer( + {} as any, // coreJsApiCache + makeMockOutputChannel() as any, // outputChannel + makeMockOutputChannel() as any, // outputChannelDebug + {} as any, // panelService + {} as any, // pythonDiagnostics + {} as any, // pythonWorkspace + {} as any // serverManager + ); + + port = await server.start(); + }); + + afterEach(async () => { + await server.stop(); + }); + + // ───────────────────────────────────────────────────────────────────────── + // 1. Initialize request creates a new session + // ───────────────────────────────────────────────────────────────────────── + describe('session initialization', () => { + it('should accept an initialize request and return a session ID', async () => { + const res = await postToMcp(port, INITIALIZE_REQUEST); + + expect(res.status).toBe(200); + // The session ID is returned either in the response header or body + const sessionId = + (res.headers['mcp-session-id'] as string | undefined) ?? + ((): string | undefined => { + try { + return JSON.parse(res.body)?.sessionId as string | undefined; + } catch { + return undefined; + } + })(); + + expect(sessionId).toBeDefined(); + expect(typeof sessionId).toBe('string'); + expect(sessionId!.length).toBeGreaterThan(0); + }); + + it('should reject a non-initialize request with no session ID', async () => { + const res = await postToMcp(port, LIST_TOOLS_REQUEST); + + expect(res.status).toBe(400); + const body = JSON.parse(res.body); + expect(body.error).toBeDefined(); + expect(body.error.code).toBe(-32600); + }); + + it('should reject unsupported methods (PUT, DELETE, etc.) with 405', async () => { + const res = await new Promise<{ status: number }>((resolve, reject) => { + const req = http.request( + { hostname: '127.0.0.1', port, path: '/mcp', method: 'PUT' }, + res => resolve({ status: res.statusCode ?? 0 }) + ); + req.on('error', reject); + req.end(); + }); + + expect(res.status).toBe(405); + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // 2. Session persistence — same session ID reuses the session + // ───────────────────────────────────────────────────────────────────────── + describe('session persistence', () => { + it('should reuse an existing session for subsequent requests', async () => { + // Step 1: initialize + const initRes = await postToMcp(port, INITIALIZE_REQUEST); + expect(initRes.status).toBe(200); + const sessionId = initRes.headers['mcp-session-id'] as string; + expect(sessionId).toBeDefined(); + + // Step 2: send tools/list with the same session ID + const toolsRes = await postToMcp(port, LIST_TOOLS_REQUEST, { + 'mcp-session-id': sessionId, + }); + + expect(toolsRes.status).toBe(200); + const toolsBody = JSON.parse(toolsRes.body); + // Should have a result (not an error) + expect(toolsBody.error).toBeUndefined(); + expect(toolsBody.result).toBeDefined(); + }); + + it('should return all registered tools via tools/list', async () => { + const initRes = await postToMcp(port, INITIALIZE_REQUEST); + const sessionId = initRes.headers['mcp-session-id'] as string; + + const toolsRes = await postToMcp(port, LIST_TOOLS_REQUEST, { + 'mcp-session-id': sessionId, + }); + + const toolsBody = JSON.parse(toolsRes.body); + expect(toolsBody.result?.tools).toBeInstanceOf(Array); + // All 17 stub tools should be present + expect(toolsBody.result.tools.length).toBe(17); + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // 3. Session isolation — different session IDs get isolated server instances + // ───────────────────────────────────────────────────────────────────────── + describe('session isolation', () => { + it('should create independent sessions for separate initialize requests', async () => { + const [res1, res2] = await Promise.all([ + postToMcp(port, INITIALIZE_REQUEST), + postToMcp(port, INITIALIZE_REQUEST), + ]); + + expect(res1.status).toBe(200); + expect(res2.status).toBe(200); + + const session1 = res1.headers['mcp-session-id'] as string; + const session2 = res2.headers['mcp-session-id'] as string; + + expect(session1).toBeDefined(); + expect(session2).toBeDefined(); + // Sessions should be distinct + expect(session1).not.toBe(session2); + }); + + it('should reject requests using an unknown session ID', async () => { + const res = await postToMcp(port, LIST_TOOLS_REQUEST, { + 'mcp-session-id': 'non-existent-session-id', + }); + + // Either 400 (bad session) or the server sends back an error JSON + expect([400, 200]).toContain(res.status); + if (res.status === 200) { + const body = JSON.parse(res.body); + // If 200, should contain an error in the JSON-RPC response + expect(body.error).toBeDefined(); + } + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // 4. Parallel requests — no race conditions + // ───────────────────────────────────────────────────────────────────────── + describe('parallel requests', () => { + it('should handle parallel tool calls on the same session without errors', async () => { + const initRes = await postToMcp(port, INITIALIZE_REQUEST); + const sessionId = initRes.headers['mcp-session-id'] as string; + + // Fire 5 concurrent tools/list requests on the same session + const results = await Promise.all( + Array.from({ length: 5 }, (_, i) => + postToMcp( + port, + { ...LIST_TOOLS_REQUEST, id: i + 10 }, + { 'mcp-session-id': sessionId } + ) + ) + ); + + // All should succeed without server errors + for (const res of results) { + expect(res.status).toBe(200); + const body = JSON.parse(res.body); + expect(body.error).toBeUndefined(); + expect(body.result).toBeDefined(); + } + }); + + it('should handle parallel requests across different sessions without errors', async () => { + // Initialize two sessions simultaneously + const [init1, init2] = await Promise.all([ + postToMcp(port, INITIALIZE_REQUEST), + postToMcp(port, INITIALIZE_REQUEST), + ]); + + const session1 = init1.headers['mcp-session-id'] as string; + const session2 = init2.headers['mcp-session-id'] as string; + + // Then fire requests on both sessions in parallel + const [res1a, res1b, res2a, res2b] = await Promise.all([ + postToMcp( + port, + { ...LIST_TOOLS_REQUEST, id: 21 }, + { 'mcp-session-id': session1 } + ), + postToMcp( + port, + { ...LIST_TOOLS_REQUEST, id: 22 }, + { 'mcp-session-id': session1 } + ), + postToMcp( + port, + { ...LIST_TOOLS_REQUEST, id: 23 }, + { 'mcp-session-id': session2 } + ), + postToMcp( + port, + { ...LIST_TOOLS_REQUEST, id: 24 }, + { 'mcp-session-id': session2 } + ), + ]); + + for (const res of [res1a, res1b, res2a, res2b]) { + expect(res.status).toBe(200); + const body = JSON.parse(res.body); + expect(body.error).toBeUndefined(); + expect(body.result).toBeDefined(); + } + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // 5. Tools functionality + // ───────────────────────────────────────────────────────────────────────── + describe('MCP tools availability', () => { + it('should have addRemoteFileSources tool registered', async () => { + const initRes = await postToMcp(port, INITIALIZE_REQUEST); + const sessionId = initRes.headers['mcp-session-id'] as string; + + const toolsRes = await postToMcp(port, LIST_TOOLS_REQUEST, { + 'mcp-session-id': sessionId, + }); + const tools: { name: string }[] = + JSON.parse(toolsRes.body).result?.tools ?? []; + expect(tools.some(t => t.name === 'addRemoteFileSources')).toBe(true); + }); + + it('should have listServers tool registered', async () => { + const initRes = await postToMcp(port, INITIALIZE_REQUEST); + const sessionId = initRes.headers['mcp-session-id'] as string; + + const toolsRes = await postToMcp(port, LIST_TOOLS_REQUEST, { + 'mcp-session-id': sessionId, + }); + const tools: { name: string }[] = + JSON.parse(toolsRes.body).result?.tools ?? []; + expect(tools.some(t => t.name === 'listServers')).toBe(true); + }); + + it('should have runCode tool registered', async () => { + const initRes = await postToMcp(port, INITIALIZE_REQUEST); + const sessionId = initRes.headers['mcp-session-id'] as string; + + const toolsRes = await postToMcp(port, LIST_TOOLS_REQUEST, { + 'mcp-session-id': sessionId, + }); + const tools: { name: string }[] = + JSON.parse(toolsRes.body).result?.tools ?? []; + expect(tools.some(t => t.name === 'runCode')).toBe(true); + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // 6. Session cleanup + // ───────────────────────────────────────────────────────────────────────── + describe('session cleanup on stop()', () => { + it('should clear all sessions when stop() is called', async () => { + // Create two sessions + await Promise.all([ + postToMcp(port, INITIALIZE_REQUEST), + postToMcp(port, INITIALIZE_REQUEST), + ]); + + // Access internal Maps for verification + const mcpServer = server as unknown as { + transports: Map; + servers: Map; + }; + + expect(mcpServer.transports.size).toBe(2); + expect(mcpServer.servers.size).toBe(2); + + await server.stop(); + + expect(mcpServer.transports.size).toBe(0); + expect(mcpServer.servers.size).toBe(0); + }); + }); + + // ───────────────────────────────────────────────────────────────────────── + // 7. Port allocation + // ───────────────────────────────────────────────────────────────────────── + describe('port management', () => { + it('should return the allocated port via getPort()', () => { + expect(server.getPort()).toBe(port); + expect(typeof port).toBe('number'); + expect(port).toBeGreaterThan(0); + }); + + it('should return null from getPort() after stop()', async () => { + await server.stop(); + expect(server.getPort()).toBeNull(); + }); + }); +}); diff --git a/src/mcp/McpServer.ts b/src/mcp/McpServer.ts index 72de9733..0b031cfd 100644 --- a/src/mcp/McpServer.ts +++ b/src/mcp/McpServer.ts @@ -1,7 +1,9 @@ import * as vscode from 'vscode'; +import { randomUUID } from 'node:crypto'; import type { dh as DhcType } from '@deephaven/jsapi-types'; import { McpServer as SdkMcpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; import * as http from 'http'; import type { IAsyncCacheService, @@ -39,9 +41,10 @@ import { createConnectToServerTool } from './tools/connectToServer'; * Provides tools for AI assistants (like GitHub Copilot) to interact with Deephaven. */ export class McpServer extends DisposableBase { - private server: SdkMcpServer; private httpServer: http.Server | null = null; private port: number | null = null; + private transports: Map = new Map(); + private servers: Map = new Map(); constructor( readonly coreJsApiCache: IAsyncCacheService, @@ -55,42 +58,161 @@ export class McpServer extends DisposableBase { readonly serverManager: IServerManager ) { super(); + } - // Create an MCP server - this.server = new SdkMcpServer({ + private createServer(): SdkMcpServer { + const server = new SdkMcpServer({ name: MCP_SERVER_NAME, version: '1.0.0', }); - this.registerTool(createAddRemoteFileSourcesTool()); - this.registerTool(createConnectToServerTool(this)); - this.registerTool(createGetColumnStatsTool(this)); - this.registerTool(createGetLogsTool(this)); - this.registerTool(createGetTableDataTool(this)); - this.registerTool(createGetTableStatsTool(this)); - this.registerTool(createListConnectionsTool(this)); - this.registerTool(createListVariablesTool(this)); - this.registerTool(createListRemoteFileSourcesTool(this)); - this.registerTool(createListServersTool(this)); - this.registerTool(createOpenFilesInEditorTool()); - this.registerTool(createOpenVariablePanelsTool(this)); - this.registerTool(createRemoveRemoteFileSourcesTool()); - this.registerTool(createRunCodeFromUriTool(this)); - this.registerTool(createRunCodeTool(this)); - this.registerTool(createShowOutputPanelTool(this)); + this.registerToolsOnServer(server); + + return server; + } + + private registerToolsOnServer(server: SdkMcpServer): void { + this.registerTool(server, createAddRemoteFileSourcesTool()); + this.registerTool(server, createConnectToServerTool(this)); + this.registerTool(server, createGetColumnStatsTool(this)); + this.registerTool(server, createGetLogsTool(this)); + this.registerTool(server, createGetTableDataTool(this)); + this.registerTool(server, createGetTableStatsTool(this)); + this.registerTool(server, createListConnectionsTool(this)); + this.registerTool(server, createListVariablesTool(this)); + this.registerTool(server, createListRemoteFileSourcesTool(this)); + this.registerTool(server, createListServersTool(this)); + this.registerTool(server, createOpenFilesInEditorTool()); + this.registerTool(server, createOpenVariablePanelsTool(this)); + this.registerTool(server, createRemoveRemoteFileSourcesTool()); + this.registerTool(server, createRunCodeFromUriTool(this)); + this.registerTool(server, createRunCodeTool(this)); + this.registerTool(server, createShowOutputPanelTool(this)); + } + + private registerTool( + server: SdkMcpServer, + { name, spec, handler }: McpTool + ): void { + server.registerTool(name, spec, handler); + } + + private handleInvalidPath(res: http.ServerResponse): void { + res.writeHead(404, { contentType: 'text/plain' }); + res.end('Not found'); + } + + private handleInvalidMethod(res: http.ServerResponse): void { + res.writeHead(405, { + contentType: 'text/plain', + allow: 'GET, POST', + }); + res.end('Method Not Allowed'); + } + + private handleSessionNotFound( + res: http.ServerResponse, + sessionId: string + ): void { + // eslint-disable-next-line @typescript-eslint/naming-convention + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32600, + message: `Bad Request: session ID ${sessionId} not found`, + }, + id: null, + }) + ); + } + + private async handleExistingSession( + req: http.IncomingMessage, + res: http.ServerResponse, + requestBody: unknown, + sessionId: string + ): Promise { + if (!this.transports.has(sessionId)) { + this.handleSessionNotFound(res, sessionId); + return; + } + + // Existing session — reuse transport + const transport = this.transports.get(sessionId)!; + await transport.handleRequest(req, res, requestBody); } - private registerTool({ - name, - spec, - handler, - }: McpTool): void { - this.server.registerTool(name, spec, handler); + private handleInvalidSessionIdForRequestType( + res: http.ServerResponse, + message: string + ): void { + // eslint-disable-next-line @typescript-eslint/naming-convention + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32600, + message, + }, + id: null, + }) + ); + } + + private async handleNewSession( + req: http.IncomingMessage, + res: http.ServerResponse, + requestBody: unknown + ): Promise { + // New session — create isolated server + transport pair + const server = this.createServer(); + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: (): string => randomUUID(), + enableJsonResponse: true, + onsessioninitialized: (sid): void => { + this.transports.set(sid, transport); + this.servers.set(sid, server); + }, + }); + + transport.onclose = async (): Promise => { + try { + const sid = transport.sessionId; + if (sid) { + this.transports.delete(sid); + const closingServer = this.servers.get(sid); + this.servers.delete(sid); + await closingServer?.close(); + } + } catch (error) { + this.outputChannelDebug.appendLine( + `[McpServer] Error during session cleanup: ${error instanceof Error ? error.message : String(error)}` + ); + } + }; + + await server.connect(transport); + await transport.handleRequest(req, res, requestBody); + } + + private handleRequestError(res: http.ServerResponse, error: unknown): void { + res.writeHead(500, { contentType: 'application/json' }); + res.end( + JSON.stringify({ + error: `Failed to process request: ${error instanceof Error ? error.message : String(error)}`, + }) + ); } /** * Start the MCP server on an HTTP endpoint. - * Creates a new transport for each request (stateless operation). + * Uses stateful session management: each initialize request creates a new + * isolated server+transport pair stored by session ID. Subsequent requests + * from the same session reuse the existing transport, eliminating race + * conditions from multiple concurrent requests. * * @param preferredPort Optional port to try first. If not provided or unavailable, will auto-allocate. * @returns The actual port the server is listening on @@ -102,50 +224,59 @@ export class McpServer extends DisposableBase { this.httpServer = http.createServer(async (req, res) => { if (req.url !== '/mcp') { - res.writeHead(404, { contentType: 'text/plain' }); - res.end('Not found'); + this.handleInvalidPath(res); + return; } - // Only accept POST requests since we don't currenlty support SSE. TBD - // whether we need SSE in the future. - if (req.method !== 'POST') { - res.writeHead(405, { - contentType: 'text/plain', - allow: 'POST', - }); - res.end('Method Not Allowed'); + // Accept GET (for SSE) and POST (for JSON-RPC) requests. + // Other methods are not supported by the MCP protocol. + if (req.method !== 'GET' && req.method !== 'POST') { + this.handleInvalidMethod(res); return; } - // Collect the request body + // Collect body only for POST requests let body = ''; - req.on('data', chunk => { - body += chunk.toString(); - }); + if (req.method === 'POST') { + req.on('data', chunk => { + body += chunk.toString(); + }); + } req.on('end', async () => { try { - const requestBody = JSON.parse(body); + // Parse body if present, otherwise undefined for GET requests + const requestBody = body ? JSON.parse(body) : undefined; + const sessionId = req.headers['mcp-session-id'] as string | undefined; + const hasSessionId = sessionId != null; - // Create a new transport for each request - const transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: undefined, - enableJsonResponse: true, - }); + // Only POST requests can be an initialize request + const isInitializeReq = requestBody + ? isInitializeRequest(requestBody) + : false; - res.on('close', () => { - transport.close(); - }); + // Validate: initialize requests must NOT have a session ID, + // and non-initialize requests MUST have a session ID. + if (hasSessionId === isInitializeReq) { + this.handleInvalidSessionIdForRequestType( + res, + hasSessionId + ? 'Bad Request: initialize request must not include mcp-session-id header' + : 'Bad Request: include mcp-session-id header for existing sessions, or send an initialize request to start a new session' + ); + return; + } - await this.server.connect(transport); - await transport.handleRequest(req, res, requestBody); + sessionId == null + ? await this.handleNewSession(req, res, requestBody) + : await this.handleExistingSession( + req, + res, + requestBody, + sessionId + ); } catch (error) { - res.writeHead(500, { contentType: 'application/json' }); - res.end( - JSON.stringify({ - error: `Failed to process request: ${error instanceof Error ? error.message : String(error)}`, - }) - ); + this.handleRequestError(res, error); } }); }); @@ -209,6 +340,20 @@ export class McpServer extends DisposableBase { return; } + // Close all active sessions before shutting down the HTTP server + for (const [sid, transport] of this.transports) { + try { + await transport.close(); + await this.servers.get(sid)?.close(); + } catch (error) { + this.outputChannelDebug.appendLine( + `[McpServer] Error closing session ${sid}: ${error instanceof Error ? error.message : String(error)}` + ); + } + } + this.transports.clear(); + this.servers.clear(); + const { resolve, reject, promise } = withResolvers(); this.httpServer.close(err => { diff --git a/src/services/DhcService.ts b/src/services/DhcService.ts index c679b47b..3c400245 100644 --- a/src/services/DhcService.ts +++ b/src/services/DhcService.ts @@ -461,6 +461,10 @@ export class DhcService extends DisposableBase implements IDhcService { } async getConnection(): Promise { + if (this.cn == null) { + await this.initSession(); + } + return this.cn; } diff --git a/src/services/ServerManager.ts b/src/services/ServerManager.ts index c4ade86c..334b5029 100644 --- a/src/services/ServerManager.ts +++ b/src/services/ServerManager.ts @@ -52,6 +52,7 @@ export class ServerManager implements IServerManager { ) { this._configService = configService; this._connectionMap = new URLMap(); + this._pendingConnectionMap = new URLMap>(); this._coreClientCache = coreClientCache; this._dhcServiceFactory = dhcServiceFactory; this._dheClientCache = dheClientCache; @@ -70,6 +71,9 @@ export class ServerManager implements IServerManager { private readonly _configService: IConfigService; private readonly _connectionMap: URLMap; + private readonly _pendingConnectionMap: URLMap< + Promise + >; private readonly _coreClientCache: URLMap; private readonly _dhcServiceFactory: IDhcServiceFactory; private readonly _dheClientCache: URLMap; @@ -115,6 +119,7 @@ export class ServerManager implements IServerManager { await Promise.all([ this._connectionMap.dispose(), + this._pendingConnectionMap.dispose(), this._serverMap.dispose(), this._uriConnectionsMap.dispose(), this._workerURLToServerURLMap.dispose(), @@ -200,18 +205,52 @@ export class ServerManager implements IServerManager { const serverState = this._serverMap.get(serverUrl); if (serverState == null) { - return null; + throw new Error(`Server with URL '${serverUrl}' not found.`); } - // DHE supports multiple connections, but DHC does not. - if ( - !this._dheServiceCache.has(serverUrl) && - serverState.connectionCount > 0 - ) { - logger.info('Already connected to server:', serverUrl); - return null; + // We only support 1 connection for DHC servers in the extension + if (serverState.type === 'DHC' && serverState.connectionCount > 0) { + logger.info('Already connected to server:', serverUrl.href); + return this._connectionMap.getOrThrow(serverUrl); + } + + if (this._pendingConnectionMap.has(serverUrl)) { + logger.debug('Connection already in progress:', serverUrl.href); + return this._pendingConnectionMap.getOrThrow(serverUrl); + } + + const connectionPromise = this._doConnectToServer( + serverState, + workerConsoleType, + operateAsAnotherUser + ); + + // We only support 1 connection for DHC servers in the extension, but the + // count doesn't get updated until the connection is established, so we need + // to mark pending connections to prevent multiple simultaneous connection + // attempts to the same DHC server. + if (serverState.type === 'DHC') { + this._pendingConnectionMap.set( + serverUrl, + connectionPromise.then(result => { + this._pendingConnectionMap.delete(serverUrl); + return result; + }) + ); } + return connectionPromise; + }; + + private _doConnectToServer = async ( + serverState: ServerState, + workerConsoleType?: ConsoleType, + operateAsAnotherUser: boolean = false + ): Promise => { + let serverUrl = serverState.url; + + logger.debug('Connecting to server:', serverUrl.href); + let tagId: UniqueID | undefined; let placeholderUrl: URL | undefined;