Skip to content

[EventHub] Fix transport leak when Connection.close() hits its exception path#46831

Open
MuhammadAliShahzad wants to merge 4 commits into
Azure:mainfrom
MuhammadAliShahzad:fix/eventhub-connection-disconnect-leak
Open

[EventHub] Fix transport leak when Connection.close() hits its exception path#46831
MuhammadAliShahzad wants to merge 4 commits into
Azure:mainfrom
MuhammadAliShahzad:fix/eventhub-connection-disconnect-leak

Conversation

@MuhammadAliShahzad

Copy link
Copy Markdown

Summary

Fixes #46830.

Connection._disconnect() early-returned when state was already END. Connection.close() wraps the AMQP close handshake in try/except/finally; on any error during the handshake (timeout, network error, peer already closed, etc.) the except block sets state to END and the finally then calls _disconnect(). With the old code, _disconnect() saw state == END and early-returned, so self._transport.close() was never called.

With TransportType.AmqpOverWebsocket this leaked the aiohttp.ClientSession. We observed bursts of Unclosed client session warnings on instance shutdown — one per partition — because each partition's close-handshake races a network teardown.

Why the early return existed

It's not arbitrary — it prevents a double-close in the happy path. Tracing Connection.close(wait=True):

  1. Send Close frame, state → CLOSE_SENT
  2. _wait_for_response(wait=True, END) blocks
  3. Peer sends Close back; the frame loop invokes _incoming_close(), which calls _disconnect() while state is CLOSE_SENT → transport correctly closed
  4. State is now END; _wait_for_response returns; control returns to close()
  5. finally calls _disconnect() again → previously early-returned (correct, but for the wrong reason)

The bug was only on the exception path, where _disconnect() had not yet been called.

Fix

Track close at the connection level via a self._transport_closed flag, so transport.close() runs exactly once regardless of which code path drives shutdown. Set the state to END if it isn't already; log + swallow errors from transport.close() (we're shutting down — propagating the error helps no one).

Applied to both:

  • azure/eventhub/_pyamqp/aio/_connection_async.py
  • azure/eventhub/_pyamqp/_connection.py

The sync sibling has the identical pattern; the user-visible leak is less severe for sync (no aiohttp.ClientSession to leak) but the logic flaw is the same.

Relationship to PR #46829

PR #46829 fixes the transport-level cleanup (WebSocketTransportAsync.close() and .connect()) so transport-level resources are properly released and transport.close() becomes idempotent. This PR fixes the connection-level code path that was skipping transport.close() entirely in the error case.

The two PRs are independent — either can land without the other — but together they close the full leak path observed in production. With both merged, leaked aiohttp.ClientSession warnings should disappear from EventHub websocket consumers.

Test plan

  • New unit tests fail against current main for the right reasons (state == END causes early-return → transport.close() never called)
  • All 8 new unit tests pass against the fixed code (4 async + 4 sync, covering: regression scenario, idempotency, normal state transition, transport.close() error resilience)
  • Pylint clean for both modified files (rated 9.99/10 and 10/10 locally; only pre-existing unknown-option-value warnings from the pylint-guidelines-checker plugin in my env)
  • Pyright reports zero new errors on the modified code; 2 pre-existing errors on unchanged _process_incoming_frame calls remain
  • Mypy reports zero new errors on the modified code
  • CI (azpysdk checks) — to be validated on PR run

🤖 Generated with Claude Code

Copilot AI review requested due to automatic review settings May 11, 2026 21:01
@MuhammadAliShahzad

Copy link
Copy Markdown
Author

@microsoft-github-policy-service agree

@github-actions github-actions Bot added Community Contribution Community members are working on the issue customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs labels May 11, 2026
@github-actions

Copy link
Copy Markdown
Contributor

Thank you for your contribution @MuhammadAliShahzad! We will review the pull request and get back to you soon.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a shutdown/cleanup bug in the pyAMQP Connection.close() exception path where _disconnect() could skip transport.close() (because state was already END), leaking transport resources—most notably aiohttp.ClientSession when using AmqpOverWebsocket.

Changes:

  • Add a connection-level _transport_closed flag in both async and sync Connection implementations to ensure transport.close() is executed exactly once across all shutdown paths.
  • Update _disconnect() to always transition to END (if needed) and to swallow/log transport close errors during shutdown.
  • Add new unit tests (async + sync) covering the regression case, idempotency, state transition, and close-error resilience; document the fix in CHANGELOG.md.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.

Show a summary per file
File Description
sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py Ensures transport closure occurs exactly once (even when close() hits exception paths) via _transport_closed gating.
sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py Sync equivalent of the async fix to prevent skipped transport closure on shutdown error paths.
sdk/eventhub/azure-eventhub/tests/pyamqp_tests/asynctests/test_connection_disconnect_async_unit.py New async unit tests validating _disconnect() behavior (regression + idempotency + error swallowing).
sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_connection_disconnect_unit.py New sync unit tests mirroring the async coverage for _disconnect().
sdk/eventhub/azure-eventhub/CHANGELOG.md Adds an unreleased bugfix entry describing the transport leak fix.

@MuhammadAliShahzad

Copy link
Copy Markdown
Author

Pushed 49467b6 to address Build Analyze failures:

  • Verify ChangeLogEntries: bumped VERSION in _version.py from 5.15.1 → 5.15.2 (so the verifier matches my new top-of-file entry) and trimmed the empty ### Features Added / ### Breaking Changes / ### Other Changes subsections.
  • Run MyPy: the failures are all pre-existing on main in files not touched by this PR (_common.py, _producer.py, _consumer.py, _consumer_async.py). Verified by running the same mypy command locally against upstream/main. Happy to defer to maintainers on whether those should be fixed in a separate PR or whether the CI mypy check needs to be relaxed.

Note: both PRs target the same package version (5.15.2). Whichever lands first, the other can be merged afterwards without further version changes.

@j7nw4r j7nw4r left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix looks right. The flag splits what state END was overloading (logical state vs "cleanup ran"), and the four call sites (L227, L490, L506, L879) all behave correctly with it.

One ask: the unit tests exercise _disconnect() directly but never drive Connection.close() through its try/except/finally with _outgoing_close raising, which is the actual regression path. A test like:

connection._outgoing_close = AsyncMock(side_effect=RuntimeError("close-handshake failed"))
await connection.close()
connection._transport.close.assert_awaited_once()

would lock in the contract.

Minor: the docstring describes the original bug ("would early-return on the state check"), but the new code has no state check. Reads better describing the flag's role instead.

@j7nw4r

j7nw4r commented May 22, 2026

Copy link
Copy Markdown
Member

Mypy failures in Run MyPy are pre-existing on main and unrelated to this change. I put up #46988 to fix all 16 of them (the _common.py setters / annotation accesses, the keep_alive_interval: Optional[int] widening in the transport layer, and the event_position_selector / create_source signature changes).

Path to green here: once #46988 lands, rebase this branch onto main. Happy to push the rebase from the maintainer side when that happens.

The earlier ask (a test that drives Connection.close() through its try/except/finally with _outgoing_close raising, so the regression case is locked in at the public API rather than just at _disconnect()) still stands.

@j7nw4r

j7nw4r commented May 22, 2026

Copy link
Copy Markdown
Member

@MuhammadAliShahzad A release will pull in my changes which fix the failing CI tests.

MuhammadAliShahzad and others added 3 commits June 17, 2026 22:46
Connection._disconnect() early-returned when state was already END:

    async def _disconnect(self) -> None:
        if self.state == ConnectionState.END:
            return
        await self._set_state(ConnectionState.END)
        await self._transport.close()

Connection.close() wraps the AMQP close handshake in try/except/finally.
On any error during the handshake (timeout, network error, peer already
closed, etc.) the except block sets state to END:

    except Exception as exc:
        await self._set_state(ConnectionState.END)
    finally:
        await self._disconnect()

The finally then called _disconnect(), which saw state == END and
early-returned, so self._transport.close() was never called. With
TransportType.AmqpOverWebsocket this leaked the aiohttp ClientSession
(observed as a burst of "Unclosed client session" warnings on shutdown,
one per partition).

The early return is not arbitrary — it prevents a double-close in the
happy path, where _incoming_close() already calls _disconnect() before
Connection.close()'s finally runs. The fix is to track close at the
connection level via a self._transport_closed flag so transport.close()
runs exactly once regardless of code path.

Applied to both _connection_async.py and _connection.py (sync sibling
has the identical pattern; the user-visible leak is less severe for
sync because that transport does not hold an aiohttp ClientSession,
but the logic flaw is the same).

Adds 4 unit tests per file covering the regression scenario, the
idempotency invariant, the normal state transition, and resilience to
transport.close() errors.

Fixes Azure#46830

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Build Analyze (Verify ChangeLogEntries) requires that the entry matching
VERSION in _version.py be the topmost entry in CHANGELOG.md, and that
no subsections under it are empty. Bump VERSION to 5.15.2 so the
verifier matches the new top-of-file entry, and remove the empty
Features Added / Breaking Changes / Other Changes subsections.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drive Connection.close() through its try/except/finally with
_outgoing_close raising, asserting the transport is still closed exactly
once rather than leaked. Locks the regression at the public API surface
rather than only at _disconnect().

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@MuhammadAliShahzad MuhammadAliShahzad force-pushed the fix/eventhub-connection-disconnect-leak branch from 49467b6 to 4a42303 Compare June 17, 2026 20:47
@MuhammadAliShahzad

MuhammadAliShahzad commented Jun 17, 2026

Copy link
Copy Markdown
Author

Rebased onto current main now that #46988 has merged, which clears the pre-existing mypy failures.

Also added the requested public-API regression test (sync + async): it drives Connection.close() through its try/except/finally with _outgoing_close raising and asserts the transport is still closed exactly once. This locks the leak fix at the public API surface rather than only at _disconnect().
@j7nw4r

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Community Contribution Community members are working on the issue customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Connection._disconnect() early-return skips transport.close() on close-handshake errors, leaking aiohttp ClientSession

3 participants