From f2d0cc2feb606e8d96316f0f1bfad60c4e7fff80 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Tue, 4 Nov 2025 19:08:39 -0800 Subject: [PATCH 1/8] catch and send errors on stream --- src/mcp/client/streamable_http.py | 52 ++++++++++++++++++------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 57df647057..7c4f450a7d 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -251,6 +251,13 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: await event_source.response.aclose() break + async def _send_error_response(self, ctx: RequestContext, error: Exception) -> None: + """Send an error response to the client.""" + error_data = ErrorData(code=32000, message=str(error)) + jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) + await ctx.read_stream_writer.send(session_message) + async def _handle_post_request(self, ctx: RequestContext) -> None: """Handle a POST request with response processing.""" headers = self._prepare_request_headers(ctx.headers) @@ -321,23 +328,23 @@ async def _handle_sse_response( is_initialization: bool = False, ) -> None: """Handle SSE response from the server.""" - try: - event_source = EventSource(response) - async for sse in event_source.aiter_sse(): - is_complete = await self._handle_sse_event( - sse, - ctx.read_stream_writer, - resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), - is_initialization=is_initialization, - ) - # If the SSE event indicates completion, like returning respose/error - # break the loop - if is_complete: - await response.aclose() - break - except Exception as e: - logger.exception("Error reading SSE stream:") - await ctx.read_stream_writer.send(e) + event_source = EventSource(response) + finished = False + async for sse in event_source.aiter_sse(): + is_complete = await self._handle_sse_event( + sse, + ctx.read_stream_writer, + resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), + is_initialization=is_initialization, + ) + # If the SSE event indicates completion, like returning respose/error + # break the loop + if is_complete: + finished = True + await response.aclose() + break + if not finished: + raise Exception("SSE stream ended without completing") async def _handle_unexpected_content_type( self, @@ -403,10 +410,13 @@ async def post_writer( ) async def handle_request_async(): - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) + try: + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) + except Exception as e: + await self._send_error_response(ctx, e) # If this is a request, start a new task to handle it if isinstance(message.root, JSONRPCRequest): From 333b32cc30450dfe36a1e1fc5dfd44843570e560 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Thu, 6 Nov 2025 14:54:38 -0800 Subject: [PATCH 2/8] only catch errors in _handle_sse_response --- src/mcp/client/streamable_http.py | 49 ++++++++++++++++--------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 7c4f450a7d..853d2fec15 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -328,23 +328,27 @@ async def _handle_sse_response( is_initialization: bool = False, ) -> None: """Handle SSE response from the server.""" - event_source = EventSource(response) - finished = False - async for sse in event_source.aiter_sse(): - is_complete = await self._handle_sse_event( - sse, - ctx.read_stream_writer, - resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), - is_initialization=is_initialization, - ) - # If the SSE event indicates completion, like returning respose/error - # break the loop - if is_complete: - finished = True - await response.aclose() - break - if not finished: - raise Exception("SSE stream ended without completing") + try: + event_source = EventSource(response) + finished = False + async for sse in event_source.aiter_sse(): + is_complete = await self._handle_sse_event( + sse, + ctx.read_stream_writer, + resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), + is_initialization=is_initialization, + ) + # If the SSE event indicates completion, like returning respose/error + # break the loop + if is_complete: + finished = True + await response.aclose() + break + if not finished: + raise Exception("SSE stream ended without completing") + except Exception as exc: + logger.exception("Error handling SSE response") + await self._send_error_response(ctx, exc) async def _handle_unexpected_content_type( self, @@ -410,13 +414,10 @@ async def post_writer( ) async def handle_request_async(): - try: - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) - except Exception as e: - await self._send_error_response(ctx, e) + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) # If this is a request, start a new task to handle it if isinstance(message.root, JSONRPCRequest): From aca621e4cc0066e2a2fc9da52e80658d644f247e Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Thu, 6 Nov 2025 15:19:40 -0800 Subject: [PATCH 3/8] add typecheck --- src/mcp/client/streamable_http.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 853d2fec15..a307eda815 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -254,9 +254,10 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: async def _send_error_response(self, ctx: RequestContext, error: Exception) -> None: """Send an error response to the client.""" error_data = ErrorData(code=32000, message=str(error)) - jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data) - session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) - await ctx.read_stream_writer.send(session_message) + if isinstance(ctx.session_message.message.root, JSONRPCRequest): + jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data) + session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) + await ctx.read_stream_writer.send(session_message) async def _handle_post_request(self, ctx: RequestContext) -> None: """Handle a POST request with response processing.""" From 5e2aea49549cf3369ccbf47feaec632b44934734 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Fri, 7 Nov 2025 12:31:56 -0800 Subject: [PATCH 4/8] catch errors in _handle_post_request --- src/mcp/client/streamable_http.py | 70 +++++++++++++++++-------------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index a307eda815..d610559f26 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -265,41 +265,47 @@ async def _handle_post_request(self, ctx: RequestContext) -> None: message = ctx.session_message.message is_initialization = self._is_initialization_request(message) - async with ctx.client.stream( - "POST", - self.url, - json=message.model_dump(by_alias=True, mode="json", exclude_none=True), - headers=headers, - ) as response: - if response.status_code == 202: - logger.debug("Received 202 Accepted") - return + try: + async with ctx.client.stream( + "POST", + self.url, + json=message.model_dump(by_alias=True, mode="json", exclude_none=True), + headers=headers, + ) as response: + if response.status_code == 202: + logger.debug("Received 202 Accepted") + return - if response.status_code == 404: - if isinstance(message.root, JSONRPCRequest): - await self._send_session_terminated_error( - ctx.read_stream_writer, - message.root.id, - ) - return + if response.status_code == 404: + if isinstance(message.root, JSONRPCRequest): + await self._send_session_terminated_error( + ctx.read_stream_writer, + message.root.id, + ) + return + + response.raise_for_status() + if is_initialization: + self._maybe_extract_session_id_from_response(response) - response.raise_for_status() + # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications: + # The server MUST NOT send a response to notifications. + if isinstance(message.root, JSONRPCRequest): + content_type = response.headers.get(CONTENT_TYPE, "").lower() + if content_type.startswith(JSON): + await self._handle_json_response(response, ctx.read_stream_writer, is_initialization) + elif content_type.startswith(SSE): + await self._handle_sse_response(response, ctx, is_initialization) + else: + await self._handle_unexpected_content_type( + content_type, + ctx.read_stream_writer, + ) + except Exception as exc: if is_initialization: - self._maybe_extract_session_id_from_response(response) - - # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications: - # The server MUST NOT send a response to notifications. - if isinstance(message.root, JSONRPCRequest): - content_type = response.headers.get(CONTENT_TYPE, "").lower() - if content_type.startswith(JSON): - await self._handle_json_response(response, ctx.read_stream_writer, is_initialization) - elif content_type.startswith(SSE): - await self._handle_sse_response(response, ctx, is_initialization) - else: - await self._handle_unexpected_content_type( - content_type, - ctx.read_stream_writer, - ) + raise exc + else: + await self._send_error_response(ctx, exc) async def _handle_json_response( self, From 2e5704ce69650a3dbd7233bc4c96ca61128ba3d3 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Mon, 22 Dec 2025 19:05:59 -0800 Subject: [PATCH 5/8] fix --- src/mcp/client/streamable_http.py | 33 ++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 475cb7d12a..c7765494a7 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -420,7 +420,7 @@ async def _handle_sse_response( try: event_source = EventSource(response) - async for sse in event_source.aiter_sse(): # pragma: no branch + async for sse in event_source.aiter_sse(): # Track last event ID for potential reconnection if sse.id: last_event_id = sse.id @@ -440,13 +440,17 @@ async def _handle_sse_response( if is_complete: await response.aclose() return # Normal completion, no reconnect needed - except Exception as e: # pragma: no cover - logger.debug(f"SSE stream ended: {e}") - # Stream ended without response - reconnect if we received an event with ID - if last_event_id is not None: # pragma: no branch - logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) + # Stream ended without response - try to reconnect if we have an event ID + if last_event_id is not None: + logger.info("SSE stream disconnected, reconnecting...") + await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) + else: + # No event ID received, can't reconnect - report error + raise Exception("SSE stream ended without completing") + except Exception as exc: + logger.exception("Error handling SSE response") + await self._send_error_response(ctx, exc) async def _handle_reconnection( self, @@ -455,11 +459,16 @@ async def _handle_reconnection( retry_interval_ms: int | None = None, attempt: int = 0, ) -> None: - """Reconnect with Last-Event-ID to resume stream after server disconnect.""" + """Reconnect with Last-Event-ID to resume stream after server disconnect. + + Raises: + Exception: If max reconnection attempts exceeded or reconnection fails. + """ # Bail if max retries exceeded - if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover - logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") - return + if attempt >= MAX_RECONNECTION_ATTEMPTS: + raise Exception( + f"SSE stream reconnection failed after {MAX_RECONNECTION_ATTEMPTS} attempts" + ) # Always wait - use server value or default delay_ms = retry_interval_ms if retry_interval_ms is not None else DEFAULT_RECONNECTION_DELAY_MS @@ -506,7 +515,7 @@ async def _handle_reconnection( # Stream ended again without response - reconnect again (reset attempt counter) logger.info("SSE stream disconnected, reconnecting...") await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) - except Exception as e: # pragma: no cover + except Exception as e: logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1) From c9c08728a1f29a91766ce401248f3501ddee8367 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Mon, 22 Dec 2025 19:28:29 -0800 Subject: [PATCH 6/8] lint --- src/mcp/client/streamable_http.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index c7765494a7..f12c0ebf1a 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -466,9 +466,7 @@ async def _handle_reconnection( """ # Bail if max retries exceeded if attempt >= MAX_RECONNECTION_ATTEMPTS: - raise Exception( - f"SSE stream reconnection failed after {MAX_RECONNECTION_ATTEMPTS} attempts" - ) + raise Exception(f"SSE stream reconnection failed after {MAX_RECONNECTION_ATTEMPTS} attempts") # Always wait - use server value or default delay_ms = retry_interval_ms if retry_interval_ms is not None else DEFAULT_RECONNECTION_DELAY_MS From a51e6a1f970367bb6e2051f976a3779b8970dad4 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Mon, 22 Dec 2025 19:37:02 -0800 Subject: [PATCH 7/8] add tests --- tests/shared/test_streamable_http.py | 267 +++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index e95c309fbc..d419560d56 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -4,6 +4,7 @@ Contains tests for both server and client sides of the StreamableHTTP transport. """ +import contextlib import json import multiprocessing import socket @@ -2393,3 +2394,269 @@ async def test_streamablehttp_client_deprecation_warning(basic_server: None, bas await session.initialize() tools = await session.list_tools() assert len(tools.tools) > 0 + + +@pytest.mark.anyio +async def test_sse_stream_ends_without_completing_no_event_id() -> None: + """Test that SSE stream ending without completing and no event ID sends error response.""" + from unittest.mock import MagicMock, patch + + from mcp.client.streamable_http import RequestContext, StreamableHTTPTransport + from mcp.shared.message import SessionMessage + from mcp.types import JSONRPCError, JSONRPCMessage, JSONRPCRequest + + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + + # Create a mock response that returns an empty SSE stream (no events) + mock_response = MagicMock() + + async def mock_aclose() -> None: + pass + + mock_response.aclose = mock_aclose + + # Create a mock EventSource that yields no events + async def empty_iter(): + return + yield # Make it an async generator that yields nothing + + mock_event_source = MagicMock() + mock_event_source.aiter_sse = empty_iter + + # Create streams for testing + write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](10) + + # Create a request context + mock_client = MagicMock() + mock_message = JSONRPCMessage(root=JSONRPCRequest(jsonrpc="2.0", id="test-1", method="test")) + session_message = SessionMessage(message=mock_message) + + ctx = RequestContext( + client=mock_client, + session_id="test-session", + session_message=session_message, + metadata=None, + read_stream_writer=write_stream, + ) + + try: + with patch("mcp.client.streamable_http.EventSource", return_value=mock_event_source): + await transport._handle_sse_response(mock_response, ctx, is_initialization=False) + + # Should have received an error response + received = await read_stream.receive() + assert isinstance(received, SessionMessage) + assert isinstance(received.message.root, JSONRPCError) + assert "SSE stream ended without completing" in received.message.root.error.message + finally: + await write_stream.aclose() + await read_stream.aclose() + + +@pytest.mark.anyio +async def test_handle_post_request_non_init_error_sends_error_response() -> None: + """Test that non-initialization request errors send error response instead of raising.""" + from mcp.client.streamable_http import RequestContext, StreamableHTTPTransport + from mcp.shared.message import SessionMessage + from mcp.types import JSONRPCError, JSONRPCMessage, JSONRPCRequest + + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + + # Create streams for testing + write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](10) + + # Create a non-initialization request + mock_message = JSONRPCMessage(root=JSONRPCRequest(jsonrpc="2.0", id="test-1", method="tools/list")) + session_message = SessionMessage(message=mock_message) + + # Create a mock client that raises an exception + mock_client = MagicMock() + + # Create an async context manager that raises + class FailingStream: + async def __aenter__(self) -> None: + raise httpx.HTTPStatusError("Server error", request=MagicMock(), response=MagicMock(status_code=500)) + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + pass + + mock_client.stream = MagicMock(return_value=FailingStream()) + + ctx = RequestContext( + client=mock_client, + session_id="test-session", + session_message=session_message, + metadata=None, + read_stream_writer=write_stream, + ) + + try: + # This should NOT raise, but send an error response + await transport._handle_post_request(ctx) + + # Should have received an error response + received = await read_stream.receive() + assert isinstance(received, SessionMessage) + assert isinstance(received.message.root, JSONRPCError) + finally: + await write_stream.aclose() + await read_stream.aclose() + + +@pytest.mark.anyio +async def test_handle_post_request_init_error_raises() -> None: + """Test that initialization request errors are raised, not sent as error response.""" + from mcp.client.streamable_http import RequestContext, StreamableHTTPTransport + from mcp.shared.message import SessionMessage + from mcp.types import JSONRPCMessage, JSONRPCRequest + + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + + # Create streams for testing + write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](10) + + # Create an initialization request + mock_message = JSONRPCMessage( + root=JSONRPCRequest( + jsonrpc="2.0", + id="init-1", + method="initialize", + params={ + "clientInfo": {"name": "test", "version": "1.0"}, + "protocolVersion": "2025-03-26", + "capabilities": {}, + }, + ) + ) + session_message = SessionMessage(message=mock_message) + + # Create a mock client that raises an exception + mock_client = MagicMock() + + class FailingStream: + async def __aenter__(self) -> None: + raise httpx.HTTPStatusError("Server error", request=MagicMock(), response=MagicMock(status_code=500)) + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + pass + + mock_client.stream = MagicMock(return_value=FailingStream()) + + ctx = RequestContext( + client=mock_client, + session_id=None, + session_message=session_message, + metadata=None, + read_stream_writer=write_stream, + ) + + try: + # This SHOULD raise for initialization requests + with pytest.raises(httpx.HTTPStatusError): + await transport._handle_post_request(ctx) + finally: + await write_stream.aclose() + await read_stream.aclose() + + +@pytest.mark.anyio +async def test_handle_reconnection_max_attempts_exceeded() -> None: + """Test that _handle_reconnection raises when max attempts exceeded.""" + from mcp.client.streamable_http import ( + MAX_RECONNECTION_ATTEMPTS, + RequestContext, + StreamableHTTPTransport, + ) + from mcp.shared.message import SessionMessage + from mcp.types import JSONRPCMessage, JSONRPCRequest + + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + + # Create streams for testing + write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](10) + + # Create a request context + mock_message = JSONRPCMessage(root=JSONRPCRequest(jsonrpc="2.0", id="test-1", method="test")) + session_message = SessionMessage(message=mock_message) + + ctx = RequestContext( + client=MagicMock(), + session_id="test-session", + session_message=session_message, + metadata=None, + read_stream_writer=write_stream, + ) + + try: + # Call with attempt >= MAX_RECONNECTION_ATTEMPTS should raise + with pytest.raises(Exception, match="SSE stream reconnection failed"): + await transport._handle_reconnection( + ctx, + last_event_id="test-event-id", + retry_interval_ms=1, # Use 1ms to speed up test + attempt=MAX_RECONNECTION_ATTEMPTS, + ) + finally: + await write_stream.aclose() + await read_stream.aclose() + + +@pytest.mark.anyio +async def test_handle_reconnection_failure_retries() -> None: + """Test that _handle_reconnection retries on failure and eventually raises.""" + from collections.abc import AsyncGenerator + from unittest.mock import MagicMock, patch + + from mcp.client.streamable_http import ( + MAX_RECONNECTION_ATTEMPTS, + RequestContext, + StreamableHTTPTransport, + ) + from mcp.shared.message import SessionMessage + from mcp.types import JSONRPCMessage, JSONRPCRequest + + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + + # Create streams for testing + write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](10) + + # Create a mock client + mock_client = MagicMock() + + # Create a request context + mock_message = JSONRPCMessage(root=JSONRPCRequest(jsonrpc="2.0", id="test-1", method="test")) + session_message = SessionMessage(message=mock_message) + + ctx = RequestContext( + client=mock_client, + session_id="test-session", + session_message=session_message, + metadata=None, + read_stream_writer=write_stream, + ) + + # Track how many times aconnect_sse is called + call_count = 0 + + @contextlib.asynccontextmanager + async def failing_aconnect_sse(*args: Any, **kwargs: Any) -> AsyncGenerator[None, None]: + nonlocal call_count + call_count += 1 + raise httpx.HTTPStatusError("Connection failed", request=MagicMock(), response=MagicMock(status_code=503)) + yield # Make it an async generator + + try: + with patch("mcp.client.streamable_http.aconnect_sse", failing_aconnect_sse): + with pytest.raises(Exception, match="SSE stream reconnection failed"): + await transport._handle_reconnection( + ctx, + last_event_id="test-event-id", + retry_interval_ms=1, # Use 1ms to speed up test + attempt=0, + ) + + # Should have tried MAX_RECONNECTION_ATTEMPTS times + assert call_count == MAX_RECONNECTION_ATTEMPTS + finally: + await write_stream.aclose() + await read_stream.aclose() From 2f6efdb8dad3dcdf34443da4978b63649be70dc3 Mon Sep 17 00:00:00 2001 From: Ming Yang Date: Tue, 23 Dec 2025 11:21:37 -0800 Subject: [PATCH 8/8] lint --- src/mcp/client/streamable_http.py | 2 +- tests/shared/test_streamable_http.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index f12c0ebf1a..9ba81773a4 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -334,7 +334,7 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: async def _send_error_response(self, ctx: RequestContext, error: Exception) -> None: """Send an error response to the client.""" error_data = ErrorData(code=32000, message=str(error)) - if isinstance(ctx.session_message.message.root, JSONRPCRequest): + if isinstance(ctx.session_message.message.root, JSONRPCRequest): # pragma: no branch jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data) session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error)) await ctx.read_stream_writer.send(session_message) diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index d419560d56..146bebfe55 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2411,7 +2411,7 @@ async def test_sse_stream_ends_without_completing_no_event_id() -> None: mock_response = MagicMock() async def mock_aclose() -> None: - pass + pass # pragma: no cover mock_response.aclose = mock_aclose @@ -2478,7 +2478,7 @@ async def __aenter__(self) -> None: raise httpx.HTTPStatusError("Server error", request=MagicMock(), response=MagicMock(status_code=500)) async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - pass + pass # pragma: no cover mock_client.stream = MagicMock(return_value=FailingStream()) @@ -2538,7 +2538,7 @@ async def __aenter__(self) -> None: raise httpx.HTTPStatusError("Server error", request=MagicMock(), response=MagicMock(status_code=500)) async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - pass + pass # pragma: no cover mock_client.stream = MagicMock(return_value=FailingStream())