-
Notifications
You must be signed in to change notification settings - Fork 23
Support SSE + streaming with MCP notifications. #1025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3de659c to
12d245d
Compare
| // Check if it's a notification by looking for "method" field | ||
| if (jsonData.contains("\"method\"")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parse into a Document and then extract the "method" field. You're looking for a top-level JSON key named "method" but the string "method" can ostensibly appear anywhere in the payload.
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Show resolved
Hide resolved
| StringBuilder dataBuffer = new StringBuilder(); | ||
|
|
||
| for (String line : lines) { | ||
| if (line.startsWith("data: ")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The space after data: is optional
| String responseSessionId = response.headers().firstValue("Mcp-Session-Id"); | ||
| if (responseSessionId != null) { | ||
| sessionId = responseSessionId; | ||
| LOG.debug("Stored session ID from response: {}", responseSessionId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only allowed during initialize but we are accepting it during any responses.
| if (isNotification(jsonData)) { | ||
| // This is a notification - parse as JsonRpcRequest and forward | ||
| JsonRpcRequest notification = | ||
| JSON_CODEC.deserializeShape(jsonData, JsonRpcRequest.builder()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are double parsing the JSON here. You can convert the Document to JsonRpcRequest.
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/McpServerProxy.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/McpService.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/McpService.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/McpService.java
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/McpService.java
Outdated
Show resolved
Hide resolved
- Respond to list/changed notifications by invalidation currently cached tools. - Track session ID header values when response recieved from initialize method call. - Forward notifications to notificaiton writers. - Handle SSE format (when handling text/event-stream), read line by line.
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Outdated
Show resolved
Hide resolved
mcp/mcp-server/src/main/java/software/amazon/smithy/java/mcp/server/HttpMcpProxy.java
Outdated
Show resolved
Hide resolved
|
|
||
| for (String line : lines) { | ||
| if (line.startsWith("data:")) { | ||
| dataBuffer.append(line.substring(5).replaceFirst("^ ", "")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaceFirst compiles and evaluates a regular expression, which is expensive. Can we do a cheaper if (line.length() > 6 && Charater.isWhitespace(line.charAt(5)) check instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a different approach. We can't use Charater.isWhitespace because the SSE spec only allows a U+0020 space. https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
The proxy implementations do not currently support SSE formatting.
Description of changes:
Added support for reading streamed HTTP responses with SSE formatting (e.g. data: prefix). Responses are read from the buffer, line by line and notifications processed.
Notifications are passed to a new notification listener for JsonRPCRequest. The current one used JsonRPCRequest.
When a notification/list-changed response is received, we invalidate the local cache and make sure the proxies to do a full refresh the next time list/tools is called.
If an MCP server includes Mcp-Session-Id in its response we cache this and provide it forward for subsequent requests. This is the spec for this https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#session-management.
Tests pass and run locally to ensure proxy behaviour works as expected.