[EventHub] Fix transport leak when Connection.close() hits its exception path#46831
[EventHub] Fix transport leak when Connection.close() hits its exception path#46831MuhammadAliShahzad wants to merge 4 commits into
Conversation
|
@microsoft-github-policy-service agree |
|
Thank you for your contribution @MuhammadAliShahzad! We will review the pull request and get back to you soon. |
There was a problem hiding this comment.
Pull request overview
Fixes a shutdown/cleanup bug in the pyAMQP Connection.close() exception path where _disconnect() could skip transport.close() (because state was already END), leaking transport resources—most notably aiohttp.ClientSession when using AmqpOverWebsocket.
Changes:
- Add a connection-level
_transport_closedflag in both async and syncConnectionimplementations to ensuretransport.close()is executed exactly once across all shutdown paths. - Update
_disconnect()to always transition toEND(if needed) and to swallow/log transport close errors during shutdown. - Add new unit tests (async + sync) covering the regression case, idempotency, state transition, and close-error resilience; document the fix in
CHANGELOG.md.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py |
Ensures transport closure occurs exactly once (even when close() hits exception paths) via _transport_closed gating. |
sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py |
Sync equivalent of the async fix to prevent skipped transport closure on shutdown error paths. |
sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py |
New async unit tests validating _disconnect() behavior (regression + idempotency + error swallowing). |
sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py |
New sync unit tests mirroring the async coverage for _disconnect(). |
sdk/eventhub/azure-eventhub/CHANGELOG.md |
Adds an unreleased bugfix entry describing the transport leak fix. |
|
Pushed 49467b6 to address
Note: both PRs target the same package version (5.15.2). Whichever lands first, the other can be merged afterwards without further version changes. |
There was a problem hiding this comment.
Fix looks right. The flag splits what state END was overloading (logical state vs "cleanup ran"), and the four call sites (L227, L490, L506, L879) all behave correctly with it.
One ask: the unit tests exercise _disconnect() directly but never drive Connection.close() through its try/except/finally with _outgoing_close raising, which is the actual regression path. A test like:
connection._outgoing_close = AsyncMock(side_effect=RuntimeError("close-handshake failed"))
await connection.close()
connection._transport.close.assert_awaited_once()would lock in the contract.
Minor: the docstring describes the original bug ("would early-return on the state check"), but the new code has no state check. Reads better describing the flag's role instead.
|
Mypy failures in Path to green here: once #46988 lands, rebase this branch onto main. Happy to push the rebase from the maintainer side when that happens. The earlier ask (a test that drives |
|
@MuhammadAliShahzad A release will pull in my changes which fix the failing CI tests. |
Connection._disconnect() early-returned when state was already END:
async def _disconnect(self) -> None:
if self.state == ConnectionState.END:
return
await self._set_state(ConnectionState.END)
await self._transport.close()
Connection.close() wraps the AMQP close handshake in try/except/finally.
On any error during the handshake (timeout, network error, peer already
closed, etc.) the except block sets state to END:
except Exception as exc:
await self._set_state(ConnectionState.END)
finally:
await self._disconnect()
The finally then called _disconnect(), which saw state == END and
early-returned, so self._transport.close() was never called. With
TransportType.AmqpOverWebsocket this leaked the aiohttp ClientSession
(observed as a burst of "Unclosed client session" warnings on shutdown,
one per partition).
The early return is not arbitrary — it prevents a double-close in the
happy path, where _incoming_close() already calls _disconnect() before
Connection.close()'s finally runs. The fix is to track close at the
connection level via a self._transport_closed flag so transport.close()
runs exactly once regardless of code path.
Applied to both _connection_async.py and _connection.py (sync sibling
has the identical pattern; the user-visible leak is less severe for
sync because that transport does not hold an aiohttp ClientSession,
but the logic flaw is the same).
Adds 4 unit tests per file covering the regression scenario, the
idempotency invariant, the normal state transition, and resilience to
transport.close() errors.
Fixes Azure#46830
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Build Analyze (Verify ChangeLogEntries) requires that the entry matching VERSION in _version.py be the topmost entry in CHANGELOG.md, and that no subsections under it are empty. Bump VERSION to 5.15.2 so the verifier matches the new top-of-file entry, and remove the empty Features Added / Breaking Changes / Other Changes subsections. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drive Connection.close() through its try/except/finally with _outgoing_close raising, asserting the transport is still closed exactly once rather than leaked. Locks the regression at the public API surface rather than only at _disconnect(). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
49467b6 to
4a42303
Compare
|
Rebased onto current main now that #46988 has merged, which clears the pre-existing mypy failures. Also added the requested public-API regression test (sync + async): it drives |
Summary
Fixes #46830.
Connection._disconnect()early-returned when state was alreadyEND.Connection.close()wraps the AMQP close handshake intry/except/finally; on any error during the handshake (timeout, network error, peer already closed, etc.) theexceptblock sets state toENDand thefinallythen calls_disconnect(). With the old code,_disconnect()saw state == END and early-returned, soself._transport.close()was never called.With
TransportType.AmqpOverWebsocketthis leaked theaiohttp.ClientSession. We observed bursts ofUnclosed client sessionwarnings on instance shutdown — one per partition — because each partition's close-handshake races a network teardown.Why the early return existed
It's not arbitrary — it prevents a double-close in the happy path. Tracing
Connection.close(wait=True):CLOSE_SENT_wait_for_response(wait=True, END)blocks_incoming_close(), which calls_disconnect()while state isCLOSE_SENT→ transport correctly closedEND;_wait_for_responsereturns; control returns toclose()finallycalls_disconnect()again → previously early-returned (correct, but for the wrong reason)The bug was only on the exception path, where
_disconnect()had not yet been called.Fix
Track close at the connection level via a
self._transport_closedflag, sotransport.close()runs exactly once regardless of which code path drives shutdown. Set the state toENDif it isn't already; log + swallow errors fromtransport.close()(we're shutting down — propagating the error helps no one).Applied to both:
azure/eventhub/_pyamqp/aio/_connection_async.pyazure/eventhub/_pyamqp/_connection.pyThe sync sibling has the identical pattern; the user-visible leak is less severe for sync (no
aiohttp.ClientSessionto leak) but the logic flaw is the same.Relationship to PR #46829
PR #46829 fixes the transport-level cleanup (
WebSocketTransportAsync.close()and.connect()) so transport-level resources are properly released andtransport.close()becomes idempotent. This PR fixes the connection-level code path that was skippingtransport.close()entirely in the error case.The two PRs are independent — either can land without the other — but together they close the full leak path observed in production. With both merged, leaked
aiohttp.ClientSessionwarnings should disappear from EventHub websocket consumers.Test plan
mainfor the right reasons (state == END causes early-return →transport.close()never called)unknown-option-valuewarnings from thepylint-guidelines-checkerplugin in my env)_process_incoming_framecalls remainazpysdkchecks) — to be validated on PR run🤖 Generated with Claude Code