From b50dce5b4bbbe5153dc11f167d2a836ae440f06e Mon Sep 17 00:00:00 2001 From: Autumn Date: Mon, 1 Jun 2026 02:11:55 +0800 Subject: [PATCH] fix(query): add regression tests for async-generator teardown cancel-scope error Query.close() previously used anyio.TaskGroup which raised RuntimeError("Attempted to exit cancel scope in a different task") when close() was called from a different task during async-generator teardown (GeneratorExit). This was fixed by PRs #746 and #870, which replaced the task group with backend-agnostic spawn_detached() from _task_compat.py. This PR adds targeted regression tests that reproduce the exact pattern from the issue: - An async generator wrapping the message stream calls close() in its `finally` block, matching the pattern in _process_query_inner - The consumer breaks the loop, triggering GeneratorExit which runs the finally block on a (potentially) different task - Verifies that no RuntimeError is raised on either backend Closes #983. Co-Authored-By: Claude Opus 4.8 --- tests/test_query.py | 82 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/tests/test_query.py b/tests/test_query.py index 16c088b1..59c1b90e 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -547,7 +547,7 @@ async def _test(): class TestQueryCrossTaskCleanup: - """Tests for cross-task cleanup of Query task groups (issue #454). + """Tests for cross-task cleanup of Query task groups (issues #454, #983). When a user breaks out of an async for loop over process_query(), Python finalizes the async generator in a different task than the one that called @@ -598,6 +598,86 @@ async def _test(): anyio.run(_test) + def test_close_during_generator_teardown_asyncio(self): + """Regression test for #983: close() called from an async generator's + finally block (triggered by GeneratorExit when the consumer breaks the + loop) must not raise RuntimeError about cross-task cancel scope exit. + + This simulates the exact pattern in _process_query_inner where + query.close() is called in a finally block during generator teardown + on a different task than Query.start(). + """ + import asyncio + + async def _test(): + mock_transport = _make_mock_transport(messages=[]) + q = Query(transport=mock_transport, is_streaming_mode=True) + + await q.start() + + errors: list[BaseException] = [] + + # Wrapping generator that simulates _process_query_inner's + # try/finally pattern: close() runs in the finally block when + # GeneratorExit is thrown by the consumer breaking the loop. + async def wrapping_gen(): + try: + async for msg in q.receive_messages(): + yield msg + finally: + await q.close() + + async def consumer(): + try: + async for _ in wrapping_gen(): + break # GeneratorExit -> wrapping_gen.finally -> q.close() + except Exception as e: + errors.append(e) + + # Run the consumer on a separate task so close() runs on a + # different task than start() -- the exact scenario from #983. + task = asyncio.create_task(consumer()) + await task + + assert errors == [], ( + f"close() during generator teardown raised: {errors}" + ) + + asyncio.run(_test()) + + def test_close_during_generator_teardown_trio(self): + """Trio parity for the #983 regression test above.""" + async def _test(): + mock_transport = _make_mock_transport(messages=[]) + q = Query(transport=mock_transport, is_streaming_mode=True) + + await q.start() + + errors: list[BaseException] = [] + + async def wrapping_gen(): + try: + async for msg in q.receive_messages(): + yield msg + finally: + await q.close() + + async def consumer(): + try: + async for _ in wrapping_gen(): + break + except Exception as e: + errors.append(e) + + async with anyio.create_task_group() as tg: + tg.start_soon(consumer) + + assert errors == [], ( + f"close() during generator teardown raised: {errors}" + ) + + anyio.run(_test, backend="trio") + @pytest.mark.filterwarnings( "ignore:Unclosed