|
18 | 18 | import anyio |
19 | 19 | import httpx |
20 | 20 | import pytest |
| 21 | +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream |
21 | 22 | from httpx_sse import ServerSentEvent |
22 | 23 | from starlette.applications import Starlette |
23 | 24 | from starlette.requests import Request |
24 | 25 | from starlette.routing import Mount |
| 26 | +from starlette.types import Message, Scope |
25 | 27 |
|
26 | 28 | from mcp import MCPError, types |
27 | 29 | from mcp.client.session import ClientSession |
@@ -2231,11 +2233,10 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(context_ |
2231 | 2233 | async def test_standalone_stream_teardown_mid_listen_is_not_an_error(caplog: pytest.LogCaptureFixture) -> None: |
2232 | 2234 | """Tearing down the standalone stream under its parked writer produces no error log. |
2233 | 2235 |
|
2234 | | - Cleanup closes the send side first, so a writer parked in receive() ends on a clean |
2235 | | - end-of-stream. This pins that close ordering: reversing it would wake the parked writer |
2236 | | - with ClosedResourceError on every disconnect. (The timing window where teardown lands |
2237 | | - between dequeues is handled by the writer's ClosedResourceError arm, which cannot be |
2238 | | - forced deterministically from the public surface.) |
| 2236 | + SDK-defined teardown behavior, driven through the full client/server path: the writer |
| 2237 | + is parked in receive() when teardown lands, and ends quietly. The companion test |
| 2238 | + test_standalone_stream_teardown_between_dequeues_is_not_an_error forces the other |
| 2239 | + teardown window, which this path cannot reach deterministically. |
2239 | 2240 | """ |
2240 | 2241 | session_manager = StreamableHTTPSessionManager( |
2241 | 2242 | app=_create_server(), |
@@ -2267,3 +2268,100 @@ async def message_handler( |
2267 | 2268 | (transport,) = session_manager._server_instances.values() # pyright: ignore[reportPrivateUsage] |
2268 | 2269 | await transport._clean_up_memory_streams(GET_STREAM_KEY) # pyright: ignore[reportPrivateUsage] |
2269 | 2270 | assert "Error in standalone SSE writer" not in caplog.text |
| 2271 | + |
| 2272 | + |
| 2273 | +@pytest.mark.anyio |
| 2274 | +async def test_standalone_stream_teardown_between_dequeues_is_not_an_error( |
| 2275 | + caplog: pytest.LogCaptureFixture, |
| 2276 | +) -> None: |
| 2277 | + """Teardown landing while the standalone writer is between dequeues produces no error log. |
| 2278 | +
|
| 2279 | + SDK-defined: after teardown, the writer's next dequeue hits its own closed stream |
| 2280 | + (ClosedResourceError), which is expected disconnect noise, not an error. The public |
| 2281 | + surface cannot force this window (the in-process client consumes SSE without |
| 2282 | + backpressure, so the writer is always parked in receive() when teardown runs), so this |
| 2283 | + drives the transport's ASGI entry point directly with a gated `send`. |
| 2284 | +
|
| 2285 | + Steps: |
| 2286 | + 1. A GET establishes the standalone SSE stream; the gated ASGI send keeps the |
| 2287 | + response from consuming any SSE data. |
| 2288 | + 2. An event sent into the standalone stream rendezvouses with the writer's receive(), |
| 2289 | + which then blocks forwarding it to the un-consumed SSE stream -- the |
| 2290 | + between-dequeues window. |
| 2291 | + 3. Stream cleanup runs inside that window, closing both standalone stream ends. |
| 2292 | + 4. The gate opens: the event reaches the wire, the writer's next dequeue hits the |
| 2293 | + closed stream, and the response completes cleanly with nothing logged as an error. |
| 2294 | + """ |
| 2295 | + transport = StreamableHTTPServerTransport( |
| 2296 | + mcp_session_id=None, |
| 2297 | + security_settings=TransportSecuritySettings(enable_dns_rebinding_protection=False), |
| 2298 | + ) |
| 2299 | + # The GET handler only checks that a read-stream writer exists; the standalone |
| 2300 | + # writer never touches it. |
| 2301 | + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) |
| 2302 | + transport._read_stream_writer = read_stream_writer # pyright: ignore[reportPrivateUsage] |
| 2303 | + |
| 2304 | + stream_registered = anyio.Event() |
| 2305 | + |
| 2306 | + class SignalingStreams( |
| 2307 | + dict[types.RequestId, tuple[MemoryObjectSendStream[EventMessage], MemoryObjectReceiveStream[EventMessage]]] |
| 2308 | + ): |
| 2309 | + # Only the GET handler inserts here, so any insert is the standalone stream |
| 2310 | + # registration the test is waiting on. |
| 2311 | + def __setitem__( |
| 2312 | + self, |
| 2313 | + key: types.RequestId, |
| 2314 | + value: tuple[MemoryObjectSendStream[EventMessage], MemoryObjectReceiveStream[EventMessage]], |
| 2315 | + ) -> None: |
| 2316 | + super().__setitem__(key, value) |
| 2317 | + stream_registered.set() |
| 2318 | + |
| 2319 | + transport._request_streams = SignalingStreams() # pyright: ignore[reportPrivateUsage] |
| 2320 | + |
| 2321 | + gate = anyio.Event() |
| 2322 | + sent: list[Message] = [] |
| 2323 | + |
| 2324 | + async def asgi_send(message: Message) -> None: |
| 2325 | + sent.append(message) |
| 2326 | + await gate.wait() |
| 2327 | + |
| 2328 | + # Never delivers anything: parks the response's disconnect listener until the |
| 2329 | + # completed response cancels it. |
| 2330 | + disconnect_send, disconnect_receive = anyio.create_memory_object_stream[Message](0) |
| 2331 | + |
| 2332 | + async def asgi_receive() -> Message: |
| 2333 | + return await disconnect_receive.receive() |
| 2334 | + |
| 2335 | + scope: Scope = { |
| 2336 | + "type": "http", |
| 2337 | + "method": "GET", |
| 2338 | + "path": "/mcp", |
| 2339 | + "query_string": b"", |
| 2340 | + "headers": [(b"accept", b"text/event-stream")], |
| 2341 | + } |
| 2342 | + notification = types.JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") |
| 2343 | + |
| 2344 | + async with read_stream_writer, read_stream, disconnect_send, disconnect_receive: |
| 2345 | + with anyio.fail_after(5): |
| 2346 | + async with anyio.create_task_group() as tg: # pragma: no branch |
| 2347 | + tg.start_soon(transport.handle_request, scope, asgi_receive, asgi_send) |
| 2348 | + await stream_registered.wait() |
| 2349 | + standalone_send = transport._request_streams[GET_STREAM_KEY][0] # pyright: ignore[reportPrivateUsage] |
| 2350 | + # Zero-buffer rendezvous: send() returns only once the writer's receive() |
| 2351 | + # has taken the event, so the writer is now between dequeues, blocked |
| 2352 | + # forwarding to the SSE stream nothing consumes while the gate is closed. |
| 2353 | + await standalone_send.send(EventMessage(notification)) |
| 2354 | + await transport._clean_up_memory_streams(GET_STREAM_KEY) # pyright: ignore[reportPrivateUsage] |
| 2355 | + # Unblock the response: it consumes the forwarded event, and the writer's |
| 2356 | + # next dequeue hits its closed stream. |
| 2357 | + gate.set() |
| 2358 | + |
| 2359 | + # The event dequeued before teardown still reached the wire, and the response |
| 2360 | + # ended with a normal completion rather than an exception. |
| 2361 | + assert sent[0]["type"] == "http.response.start" |
| 2362 | + assert sent[0]["status"] == 200 |
| 2363 | + body_chunks = [message for message in sent if message["type"] == "http.response.body"] |
| 2364 | + assert b"notifications/initialized" in body_chunks[0]["body"] |
| 2365 | + assert body_chunks[-1] == {"type": "http.response.body", "body": b"", "more_body": False} |
| 2366 | + assert "Error in standalone SSE writer" not in caplog.text |
| 2367 | + assert "Error in standalone SSE response" not in caplog.text |
0 commit comments