diff --git a/echo/server/dembrane/api/webhooks.py b/echo/server/dembrane/api/webhooks.py index 9117062ef..58324a307 100644 --- a/echo/server/dembrane/api/webhooks.py +++ b/echo/server/dembrane/api/webhooks.py @@ -73,15 +73,9 @@ async def assemblyai_webhook_callback( if normalized_status == "error": from dembrane.tasks import _on_chunk_transcription_done - from dembrane.transcribe import _save_chunk_error, fetch_assemblyai_result + from dembrane.transcribe import _save_chunk_error - error_detail = f"AssemblyAI error for transcript {payload.transcript_id}" - try: - fetch_assemblyai_result(payload.transcript_id) - except Exception as fetch_exc: - error_detail = str(fetch_exc) - - _save_chunk_error(chunk_id, error_detail) + _save_chunk_error(chunk_id, f"AssemblyAI error for transcript {payload.transcript_id}") _on_chunk_transcription_done(conversation_id, chunk_id, logger) delete_assemblyai_webhook_metadata(payload.transcript_id) return {"status": "error_handled"} diff --git a/echo/server/dembrane/async_helpers.py b/echo/server/dembrane/async_helpers.py index 79a346d6d..197ade305 100644 --- a/echo/server/dembrane/async_helpers.py +++ b/echo/server/dembrane/async_helpers.py @@ -172,17 +172,31 @@ def _get_thread_event_loop() -> asyncio.AbstractEventLoop: def run_async_in_new_loop(coro: Coroutine[Any, Any, T]) -> T: """ - Execute an async coroutine on this thread's persistent event loop. + Execute an async coroutine in a fresh, isolated event loop. Use from synchronous contexts such as Dramatiq actors or CLI scripts to - invoke async FastAPI handlers without hitting "Future attached to a - different loop" errors. + invoke async FastAPI handlers. + + A fresh loop is created per call rather than reusing a cached thread loop. + This prevents "Future attached to a different loop" errors when multiple + concurrent Dramatiq greenlets (dramatiq-gevent uses one OS thread with many + greenlets) share the same thread ID and would otherwise share the same loop. + The coroutines invoked here (summarize_conversation, get_conversation_content) + use only stateless async operations so fresh loops per call is safe. """ if not asyncio.iscoroutine(coro) and not asyncio.isfuture(coro): raise TypeError("run_async_in_new_loop expects a coroutine or Future.") - loop = _get_thread_event_loop() - logger.debug("Running async coroutine in thread loop: %s", coro) - result = loop.run_until_complete(coro) - logger.debug("Completed async coroutine: %s", coro) - return result + import nest_asyncio + + loop = asyncio.new_event_loop() + # Apply nest_asyncio in case dramatiq-gevent has patched asyncio's running + # loop detection on this thread. + nest_asyncio.apply(loop) + logger.debug("Running async coroutine in fresh event loop: %s", coro) + try: + result = loop.run_until_complete(coro) + logger.debug("Completed async coroutine: %s", coro) + return result + finally: + loop.close() diff --git a/echo/server/dembrane/transcribe.py b/echo/server/dembrane/transcribe.py index 23cf5762e..06f68dfb5 100644 --- a/echo/server/dembrane/transcribe.py +++ b/echo/server/dembrane/transcribe.py @@ -107,7 +107,7 @@ def transcribe_audio_assemblyai( data: dict[str, Any] = { "audio_url": audio_file_uri, - "speech_models": ["universal-3-pro", "universal-2"], + "speech_models": ["universal-3-pro"], "language_detection": True, "language_detection_options": { "expected_languages": list(set(get_allowed_languages()) | {"pt"}), diff --git a/echo/server/tests/test_async_helpers.py b/echo/server/tests/test_async_helpers.py new file mode 100644 index 000000000..4a906d71d --- /dev/null +++ b/echo/server/tests/test_async_helpers.py @@ -0,0 +1,123 @@ +""" +Tests for run_async_in_new_loop — specifically the concurrent-greenlet scenario +that caused "Future attached to a different loop" errors under load. + +Regression test for: multiple concurrent callers sharing the same OS thread +(as dramatiq-gevent greenlets do) must not share an event loop. +""" + +import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed + +import pytest + +from dembrane.async_helpers import run_async_in_new_loop + + +async def _simple_coro(value: int) -> int: + """Minimal async coroutine that does a thread-pool round-trip (like run_in_thread_pool).""" + loop = asyncio.get_running_loop() + # Simulate run_in_thread_pool: submit blocking work to the executor + result = await loop.run_in_executor(None, lambda: value * 2) + return result + + +async def _gather_coro(value: int) -> int: + """Uses asyncio.gather internally — matches what summarize_conversation does.""" + loop = asyncio.get_running_loop() + a, b = await asyncio.gather( + loop.run_in_executor(None, lambda: value + 1), + loop.run_in_executor(None, lambda: value + 2), + ) + return a + b + + +def test_run_async_in_new_loop_basic(): + """Single call works correctly.""" + result = run_async_in_new_loop(_simple_coro(5)) + assert result == 10 + + +def test_run_async_in_new_loop_with_gather(): + """Gather inside coroutine works correctly.""" + result = run_async_in_new_loop(_gather_coro(3)) + assert result == 9 # (3+1) + (3+2) = 9 + + +def test_run_async_in_new_loop_concurrent_threads(): + """ + Simulates the dramatiq-gevent scenario: N threads all calling + run_async_in_new_loop concurrently. Before the fix, they shared + a cached loop by thread ID, causing "Future attached to a different + loop" errors under concurrent load. + """ + errors = [] + results = [] + + def worker(value: int): + try: + r = run_async_in_new_loop(_gather_coro(value)) + results.append(r) + except Exception as e: + errors.append(str(e)) + + # Simulate 10 concurrent callers (matches or exceeds Stage 3 load test concurrency) + with ThreadPoolExecutor(max_workers=10) as pool: + futures = [pool.submit(worker, i) for i in range(10)] + for f in as_completed(futures): + f.result() # re-raises if the thread itself crashed + + assert errors == [], f"Concurrent run_async_in_new_loop raised errors: {errors}" + assert len(results) == 10 + + +def test_run_async_in_new_loop_same_thread_sequential(): + """ + Calls from the same thread are safe when sequential. + Verifies loop is properly closed between calls (no 'loop is closed' error). + """ + for i in range(5): + result = run_async_in_new_loop(_simple_coro(i)) + assert result == i * 2 + + +def test_run_async_in_new_loop_same_thread_id_concurrent(): + """ + Reproduces the exact bug: multiple coroutines submitted from threads + that all share the same thread ID (simulated by patching get_ident). + + Before the fix (persistent loop per thread ID), all concurrent callers + shared one loop → "Future attached to a different loop". + After the fix (fresh loop per call), each call is isolated. + """ + original_get_ident = threading.get_ident + # Make all threads report the same thread ID — exactly what gevent does + threading.get_ident = lambda: 99999 + + errors = [] + results = [] + + def worker(value: int): + try: + r = run_async_in_new_loop(_gather_coro(value)) + results.append(r) + except Exception as e: + errors.append(str(e)) + + try: + with ThreadPoolExecutor(max_workers=5) as pool: + futures = [pool.submit(worker, i) for i in range(5)] + for f in as_completed(futures): + f.result() + finally: + threading.get_ident = original_get_ident + + assert errors == [], f"Same-thread-ID concurrent calls raised errors: {errors}" + assert len(results) == 5 + + +def test_run_async_in_new_loop_rejects_non_coroutine(): + """Type guard still works.""" + with pytest.raises(TypeError, match="expects a coroutine or Future"): + run_async_in_new_loop(42) # type: ignore diff --git a/echo/server/tests/test_transcribe_webhook.py b/echo/server/tests/test_transcribe_webhook.py index 3f41c8755..7193dc1ed 100644 --- a/echo/server/tests/test_transcribe_webhook.py +++ b/echo/server/tests/test_transcribe_webhook.py @@ -51,7 +51,7 @@ def _fake_post(url: str, **kwargs: Any) -> _FakeResponse: assert transcript is None assert payload == {"transcript_id": "tx-1"} assert captured["url"].endswith("/v2/transcript") - assert captured["json"]["speech_models"] == ["universal-3-pro", "universal-2"] + assert captured["json"]["speech_models"] == ["universal-3-pro"] assert "speech_model" not in captured["json"] assert "prompt" not in captured["json"] assert captured["json"]["keyterms_prompt"] == ["Dembrane"] @@ -92,7 +92,7 @@ def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse: assert response["status"] == "completed" assert payloads["polls"] == 2 post_payload = payloads["posts"][0] - assert post_payload["speech_models"] == ["universal-3-pro", "universal-2"] + assert post_payload["speech_models"] == ["universal-3-pro"] assert "speech_model" not in post_payload assert "webhook_url" not in post_payload