|
28 | 28 | from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client |
29 | 29 | from mcp.server import Server, ServerRequestContext |
30 | 30 | from mcp.server.streamable_http import ( |
| 31 | + GET_STREAM_KEY, |
31 | 32 | MCP_PROTOCOL_VERSION_HEADER, |
32 | 33 | MCP_SESSION_ID_HEADER, |
33 | 34 | SESSION_ID_PATTERN, |
@@ -2143,3 +2144,44 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(context_ |
2143 | 2144 |
|
2144 | 2145 | assert "content-type" in headers_data |
2145 | 2146 | assert headers_data["content-type"] == "application/json" |
| 2147 | + |
| 2148 | + |
| 2149 | +@pytest.mark.anyio |
| 2150 | +async def test_standalone_stream_teardown_mid_listen_is_not_an_error(caplog: pytest.LogCaptureFixture) -> None: |
| 2151 | + """Tearing down the standalone stream under its parked writer produces no error log. |
| 2152 | +
|
| 2153 | + Cleanup closes the send side first, so a writer parked in receive() ends on a clean |
| 2154 | + end-of-stream. This pins that close ordering: reversing it would wake the parked writer |
| 2155 | + with ClosedResourceError on every disconnect. (The timing window where teardown lands |
| 2156 | + between dequeues is handled by the writer's ClosedResourceError arm, which cannot be |
| 2157 | + forced deterministically from the public surface.) |
| 2158 | + """ |
| 2159 | + session_manager = StreamableHTTPSessionManager( |
| 2160 | + app=_create_server(), |
| 2161 | + security_settings=TransportSecuritySettings(enable_dns_rebinding_protection=False), |
| 2162 | + ) |
| 2163 | + app = Starlette(routes=[Mount("/mcp", app=session_manager.handle_request)]) |
| 2164 | + notified = anyio.Event() |
| 2165 | + |
| 2166 | + async def message_handler( |
| 2167 | + message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception, |
| 2168 | + ) -> None: |
| 2169 | + if isinstance(message, types.ResourceUpdatedNotification): |
| 2170 | + notified.set() |
| 2171 | + |
| 2172 | + async with session_manager.run(): |
| 2173 | + async with ( |
| 2174 | + make_client(app) as http_client, |
| 2175 | + streamable_http_client(f"{BASE_URL}/mcp", http_client=http_client) as (read_stream, write_stream), |
| 2176 | + ClientSession(read_stream, write_stream, message_handler=message_handler) as session, |
| 2177 | + ): |
| 2178 | + await session.initialize() |
| 2179 | + # Prove the standalone GET writer is live: a notification with no |
| 2180 | + # related request rides the GET stream to the client. |
| 2181 | + await session.call_tool("test_tool_with_standalone_notification", {}) |
| 2182 | + with anyio.fail_after(5): |
| 2183 | + await notified.wait() |
| 2184 | + # Tear the standalone stream down while the writer is parked on it. |
| 2185 | + (transport,) = session_manager._server_instances.values() # pyright: ignore[reportPrivateUsage] |
| 2186 | + await transport._clean_up_memory_streams(GET_STREAM_KEY) # pyright: ignore[reportPrivateUsage] |
| 2187 | + assert "Error in standalone SSE writer" not in caplog.text |
0 commit comments