From 558830ee8b440a665b17e0733298b4a95356b3e6 Mon Sep 17 00:00:00 2001 From: MuhammadAliShahzad Date: Mon, 11 May 2026 23:01:28 +0200 Subject: [PATCH 1/3] Fix transport leak when Connection.close() hits its exception path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Connection._disconnect() early-returned when state was already END: async def _disconnect(self) -> None: if self.state == ConnectionState.END: return await self._set_state(ConnectionState.END) await self._transport.close() Connection.close() wraps the AMQP close handshake in try/except/finally. On any error during the handshake (timeout, network error, peer already closed, etc.) the except block sets state to END: except Exception as exc: await self._set_state(ConnectionState.END) finally: await self._disconnect() The finally then called _disconnect(), which saw state == END and early-returned, so self._transport.close() was never called. With TransportType.AmqpOverWebsocket this leaked the aiohttp ClientSession (observed as a burst of "Unclosed client session" warnings on shutdown, one per partition). The early return is not arbitrary — it prevents a double-close in the happy path, where _incoming_close() already calls _disconnect() before Connection.close()'s finally runs. The fix is to track close at the connection level via a self._transport_closed flag so transport.close() runs exactly once regardless of code path. Applied to both _connection_async.py and _connection.py (sync sibling has the identical pattern; the user-visible leak is less severe for sync because that transport does not hold an aiohttp ClientSession, but the logic flaw is the same). Adds 4 unit tests per file covering the regression scenario, the idempotency invariant, the normal state transition, and resilience to transport.close() errors. Fixes #46830 Co-Authored-By: Claude Opus 4.7 --- sdk/eventhub/azure-eventhub/CHANGELOG.md | 12 +++ .../azure/eventhub/_pyamqp/_connection.py | 24 +++++- .../eventhub/_pyamqp/aio/_connection_async.py | 26 +++++- .../test_connection_disconnect_async_unit.py | 85 +++++++++++++++++++ .../test_connection_disconnect_unit.py | 67 +++++++++++++++ 5 files changed, 206 insertions(+), 8 deletions(-) create mode 100644 sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py create mode 100644 sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index f287cb0b8fde..bdaf55c7390b 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -1,5 +1,17 @@ # Release History +## 5.15.2 (Unreleased) + +### Features Added + +### Breaking Changes + +### Bugs Fixed + +- Fixed a bug where `Connection._disconnect()` early-returned when state was already `END`, so the underlying transport was never closed if `Connection.close()` entered its exception handler (e.g. network error, timeout, or already-closed peer during the AMQP close handshake). With `TransportType.AmqpOverWebsocket`, the leaked transport's `aiohttp.ClientSession` produced an `Unclosed client session` warning per affected partition. + +### Other Changes + ## 5.15.1 (2025-11-11) ### Bugs Fixed diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py index 1dfd1d852f6b..5b56605fc225 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py @@ -202,6 +202,7 @@ def __init__( # pylint:disable=too-many-locals self._error: Optional[AMQPConnectionError] = None self._outgoing_endpoints: Dict[int, Session] = {} self._incoming_endpoints: Dict[int, Session] = {} + self._transport_closed: bool = False def __enter__(self) -> "Connection": self.open() @@ -258,11 +259,26 @@ def _connect(self) -> None: ) from exc def _disconnect(self) -> None: - """Disconnect the transport and set state to END.""" - if self.state == ConnectionState.END: + """Disconnect the transport and set state to END. + + ``transport.close()`` is gated on ``self._transport_closed`` so that it + runs exactly once regardless of which code path drives the shutdown. + Without this, ``Connection.close()`` could enter its exception handler, + set the state to ``END`` without closing the transport, and the + subsequent ``_disconnect()`` call from the ``finally`` block would + early-return on the state check — leaking the underlying transport. + """ + if self._transport_closed: return - self._set_state(ConnectionState.END) - self._transport.close() + self._transport_closed = True + if self.state != ConnectionState.END: + self._set_state(ConnectionState.END) + try: + self._transport.close() + except Exception as e: # pylint: disable=broad-except + _LOGGER.debug( + "Error closing transport: %r", e, extra=self._network_trace_params + ) def _can_read(self) -> bool: """Whether the connection is in a state where it is legal to read for incoming frames. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index 681cf72c8082..6d1f8bc4da3c 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -183,6 +183,7 @@ def __init__( # pylint:disable=too-many-locals self._error: Optional[AMQPConnectionError] = None self._outgoing_endpoints: Dict[int, Session] = {} self._incoming_endpoints: Dict[int, Session] = {} + self._transport_closed: bool = False async def __aenter__(self) -> "Connection": await self.open() @@ -240,11 +241,28 @@ async def _connect(self) -> None: ) from exc async def _disconnect(self) -> None: - """Disconnect the transport and set state to END.""" - if self.state == ConnectionState.END: + """Disconnect the transport and set state to END. + + ``transport.close()`` is gated on ``self._transport_closed`` so that it + runs exactly once regardless of which code path drives the shutdown. + Without this, ``Connection.close()`` could enter its exception handler, + set the state to ``END`` without closing the transport, and the + subsequent ``_disconnect()`` call from the ``finally`` block would + early-return on the state check — leaking the underlying transport + (most notably the aiohttp ``ClientSession`` used by the websocket + transport). + """ + if self._transport_closed: return - await self._set_state(ConnectionState.END) - await self._transport.close() + self._transport_closed = True + if self.state != ConnectionState.END: + await self._set_state(ConnectionState.END) + try: + await self._transport.close() + except Exception as e: # pylint: disable=broad-except + _LOGGER.debug( + "Error closing transport: %r", e, extra=self._network_trace_params + ) def _can_read(self) -> bool: """Whether the connection is in a state where it is legal to read for incoming frames. diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py new file mode 100644 index 000000000000..f5ede0e694f6 --- /dev/null +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py @@ -0,0 +1,85 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +"""Unit tests for Connection._disconnect() cleanup paths. + +Covers the regression where Connection.close() entering its exception handler +set state to END without closing the transport, and the subsequent +_disconnect() call (from the finally block) early-returned and never closed +the transport, leaking the aiohttp ClientSession. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock + +from azure.eventhub._pyamqp.aio._connection_async import Connection +from azure.eventhub._pyamqp.constants import ConnectionState + + +def _make_connection(): + """Build a Connection without going through __init__ (which opens a real + transport). Only the attributes touched by _disconnect/close are set.""" + connection = Connection.__new__(Connection) + connection.state = ConnectionState.START + connection._transport = MagicMock() + connection._transport.close = AsyncMock() + connection._network_trace_params = { + "amqpConnection": "test", + "amqpSession": "", + "amqpLink": "", + } + connection._outgoing_endpoints = {} + connection._transport_closed = False + return connection + + +@pytest.mark.asyncio +async def test_disconnect_closes_transport_when_state_already_end(): + """When Connection.close() enters its exception handler it sets state to + END before calling _disconnect() in the finally block. The previous + implementation early-returned in that case and never closed the transport. + """ + connection = _make_connection() + connection.state = ConnectionState.END + + await connection._disconnect() + + connection._transport.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_disconnect_is_idempotent(): + """_disconnect() may be called more than once (e.g. once from + _incoming_close and again from Connection.close()'s finally). The transport + must only be closed once.""" + connection = _make_connection() + + await connection._disconnect() + await connection._disconnect() + + connection._transport.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_disconnect_sets_state_to_end_when_not_already(): + """The normal _disconnect() path still transitions state to END.""" + connection = _make_connection() + assert connection.state != ConnectionState.END + + await connection._disconnect() + + assert connection.state == ConnectionState.END + connection._transport.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_disconnect_swallows_transport_close_errors(): + """Errors from transport.close() must not propagate out of _disconnect() — + the connection is shutting down and any leaked resource will be GC'd.""" + connection = _make_connection() + connection._transport.close = AsyncMock(side_effect=RuntimeError("boom")) + + await connection._disconnect() + + connection._transport.close.assert_awaited_once() diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py new file mode 100644 index 000000000000..237c63da7865 --- /dev/null +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py @@ -0,0 +1,67 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +"""Unit tests for Connection._disconnect() cleanup paths (sync). + +Mirrors the async tests in tests/pyamqp_tests/asynctests/. See that file for +the full bug description. +""" + +from unittest.mock import MagicMock + +from azure.eventhub._pyamqp._connection import Connection +from azure.eventhub._pyamqp.constants import ConnectionState + + +def _make_connection(): + """Build a Connection without going through __init__ (which opens a real + transport). Only the attributes touched by _disconnect/close are set.""" + connection = Connection.__new__(Connection) + connection.state = ConnectionState.START + connection._transport = MagicMock() + connection._network_trace_params = { + "amqpConnection": "test", + "amqpSession": "", + "amqpLink": "", + } + connection._outgoing_endpoints = {} + connection._transport_closed = False + return connection + + +def test_disconnect_closes_transport_when_state_already_end(): + connection = _make_connection() + connection.state = ConnectionState.END + + connection._disconnect() + + connection._transport.close.assert_called_once() + + +def test_disconnect_is_idempotent(): + connection = _make_connection() + + connection._disconnect() + connection._disconnect() + + connection._transport.close.assert_called_once() + + +def test_disconnect_sets_state_to_end_when_not_already(): + connection = _make_connection() + assert connection.state != ConnectionState.END + + connection._disconnect() + + assert connection.state == ConnectionState.END + connection._transport.close.assert_called_once() + + +def test_disconnect_swallows_transport_close_errors(): + connection = _make_connection() + connection._transport.close.side_effect = RuntimeError("boom") + + connection._disconnect() + + connection._transport.close.assert_called_once() From 63cb7ead95c0510c3a7b03b7168ccc4b46dfc505 Mon Sep 17 00:00:00 2001 From: MuhammadAliShahzad Date: Mon, 11 May 2026 23:25:00 +0200 Subject: [PATCH 2/3] Bump version to 5.15.2 and trim empty CHANGELOG sections Build Analyze (Verify ChangeLogEntries) requires that the entry matching VERSION in _version.py be the topmost entry in CHANGELOG.md, and that no subsections under it are empty. Bump VERSION to 5.15.2 so the verifier matches the new top-of-file entry, and remove the empty Features Added / Breaking Changes / Other Changes subsections. Co-Authored-By: Claude Opus 4.7 --- sdk/eventhub/azure-eventhub/CHANGELOG.md | 6 ------ sdk/eventhub/azure-eventhub/azure/eventhub/_version.py | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index bdaf55c7390b..1253ff9b264a 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -2,16 +2,10 @@ ## 5.15.2 (Unreleased) -### Features Added - -### Breaking Changes - ### Bugs Fixed - Fixed a bug where `Connection._disconnect()` early-returned when state was already `END`, so the underlying transport was never closed if `Connection.close()` entered its exception handler (e.g. network error, timeout, or already-closed peer during the AMQP close handshake). With `TransportType.AmqpOverWebsocket`, the leaked transport's `aiohttp.ClientSession` produced an `Unclosed client session` warning per affected partition. -### Other Changes - ## 5.15.1 (2025-11-11) ### Bugs Fixed diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py index 5bb5c30266da..5d660bbf8ea3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "5.15.1" +VERSION = "5.15.2" From 4a423035e921c946a3460af61d4899d3aa2004ed Mon Sep 17 00:00:00 2001 From: MuhammadAliShahzad Date: Wed, 17 Jun 2026 22:46:39 +0200 Subject: [PATCH 3/3] Add public-API close() regression tests for transport leak Drive Connection.close() through its try/except/finally with _outgoing_close raising, asserting the transport is still closed exactly once rather than leaked. Locks the regression at the public API surface rather than only at _disconnect(). Co-Authored-By: Claude Opus 4.8 --- .../test_connection_disconnect_async_unit.py | 25 +++++++++++++++++++ .../test_connection_disconnect_unit.py | 22 ++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py index f5ede0e694f6..e974dcef9e67 100644 --- a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py @@ -83,3 +83,28 @@ async def test_disconnect_swallows_transport_close_errors(): await connection._disconnect() connection._transport.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_close_closes_transport_when_outgoing_close_raises(): + """Regression test at the public API surface. + + When Connection.close() hits its exception path (here, _outgoing_close + raising), it sets state to END and falls through to _disconnect() in the + finally block. The transport (the aiohttp ClientSession for the websocket + transport) must still be closed exactly once rather than leaked.""" + connection = _make_connection() + connection.state = ConnectionState.OPENED + connection._error = None + connection._outgoing_close = AsyncMock(side_effect=RuntimeError("boom")) + + async def _set_state(new_state): + connection.state = new_state + + connection._set_state = AsyncMock(side_effect=_set_state) + + await connection.close() + + connection._outgoing_close.assert_awaited_once() + assert connection.state == ConnectionState.END + connection._transport.close.assert_awaited_once() diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py index 237c63da7865..640f2c7130e6 100644 --- a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py @@ -65,3 +65,25 @@ def test_disconnect_swallows_transport_close_errors(): connection._disconnect() connection._transport.close.assert_called_once() + + +def test_close_closes_transport_when_outgoing_close_raises(): + """Regression test at the public API surface. + + When Connection.close() hits its exception path (here, _outgoing_close + raising), it sets state to END and falls through to _disconnect() in the + finally block. The transport must still be closed exactly once rather than + leaked.""" + connection = _make_connection() + connection.state = ConnectionState.OPENED + connection._error = None + connection._outgoing_close = MagicMock(side_effect=RuntimeError("boom")) + connection._set_state = MagicMock( + side_effect=lambda new_state: setattr(connection, "state", new_state) + ) + + connection.close() + + connection._outgoing_close.assert_called_once() + assert connection.state == ConnectionState.END + connection._transport.close.assert_called_once()