From 66df74ad27edd7c2957ba6170d66ddb13f305f71 Mon Sep 17 00:00:00 2001 From: Dat Date: Mon, 16 Feb 2026 21:02:14 +0100 Subject: [PATCH 1/5] feat: add AssemblyAI webhook-driven transcription flow --- echo/server/dembrane/api/webhooks.py | 10 ++-------- echo/server/dembrane/transcribe.py | 8 ++++---- echo/server/tests/test_settings_webhook.py | 14 ++++++++------ echo/server/tests/test_transcribe_webhook.py | 4 ++-- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/echo/server/dembrane/api/webhooks.py b/echo/server/dembrane/api/webhooks.py index 9117062ef..58324a307 100644 --- a/echo/server/dembrane/api/webhooks.py +++ b/echo/server/dembrane/api/webhooks.py @@ -73,15 +73,9 @@ async def assemblyai_webhook_callback( if normalized_status == "error": from dembrane.tasks import _on_chunk_transcription_done - from dembrane.transcribe import _save_chunk_error, fetch_assemblyai_result + from dembrane.transcribe import _save_chunk_error - error_detail = f"AssemblyAI error for transcript {payload.transcript_id}" - try: - fetch_assemblyai_result(payload.transcript_id) - except Exception as fetch_exc: - error_detail = str(fetch_exc) - - _save_chunk_error(chunk_id, error_detail) + _save_chunk_error(chunk_id, f"AssemblyAI error for transcript {payload.transcript_id}") _on_chunk_transcription_done(conversation_id, chunk_id, logger) delete_assemblyai_webhook_metadata(payload.transcript_id) return {"status": "error_handled"} diff --git a/echo/server/dembrane/transcribe.py b/echo/server/dembrane/transcribe.py index 23cf5762e..c1c14aa2b 100644 --- a/echo/server/dembrane/transcribe.py +++ b/echo/server/dembrane/transcribe.py @@ -107,7 +107,7 @@ def transcribe_audio_assemblyai( data: dict[str, Any] = { "audio_url": audio_file_uri, - "speech_models": ["universal-3-pro", "universal-2"], + "speech_models": ["universal-3-pro"], "language_detection": True, "language_detection_options": { "expected_languages": list(set(get_allowed_languages()) | {"pt"}), @@ -644,16 +644,16 @@ def transcribe_conversation_chunk( logger.info("Using AssemblyAI for transcription") hotwords = _build_hotwords(conversation) signed_url = get_signed_url(chunk["path"], expires_in_seconds=3 * 24 * 60 * 60) - assemblyai_transcript, assemblyai_response = transcribe_audio_assemblyai( + transcript, assemblyai_response = transcribe_audio_assemblyai( signed_url, language=language, hotwords=hotwords ) - if assemblyai_transcript is None: + if transcript is None: raise TranscriptionError( "AssemblyAI returned webhook-mode response without transcript text in sync workflow." ) _save_transcript( conversation_chunk_id, - assemblyai_transcript, + transcript, diarization={ "schema": "ASSEMBLYAI", "data": assemblyai_response.get("words", {}), diff --git a/echo/server/tests/test_settings_webhook.py b/echo/server/tests/test_settings_webhook.py index 65928d48b..02a92aec5 100644 --- a/echo/server/tests/test_settings_webhook.py +++ b/echo/server/tests/test_settings_webhook.py @@ -4,9 +4,10 @@ def test_dembrane_webhook_url_requires_secret() -> None: - settings = TranscriptionSettings() - settings.provider = "Dembrane-25-09" - settings.gcp_sa_json = {"type": "service_account"} + settings = TranscriptionSettings( + provider="Dembrane-25-09", + gcp_sa_json={"type": "service_account"}, + ) settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" settings.assemblyai_webhook_secret = None @@ -18,9 +19,10 @@ def test_dembrane_webhook_url_requires_secret() -> None: def test_dembrane_webhook_url_with_secret_is_valid() -> None: - settings = TranscriptionSettings() - settings.provider = "Dembrane-25-09" - settings.gcp_sa_json = {"type": "service_account"} + settings = TranscriptionSettings( + provider="Dembrane-25-09", + gcp_sa_json={"type": "service_account"}, + ) settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" settings.assemblyai_webhook_secret = "secret" diff --git a/echo/server/tests/test_transcribe_webhook.py b/echo/server/tests/test_transcribe_webhook.py index 3f41c8755..7193dc1ed 100644 --- a/echo/server/tests/test_transcribe_webhook.py +++ b/echo/server/tests/test_transcribe_webhook.py @@ -51,7 +51,7 @@ def _fake_post(url: str, **kwargs: Any) -> _FakeResponse: assert transcript is None assert payload == {"transcript_id": "tx-1"} assert captured["url"].endswith("/v2/transcript") - assert captured["json"]["speech_models"] == ["universal-3-pro", "universal-2"] + assert captured["json"]["speech_models"] == ["universal-3-pro"] assert "speech_model" not in captured["json"] assert "prompt" not in captured["json"] assert captured["json"]["keyterms_prompt"] == ["Dembrane"] @@ -92,7 +92,7 @@ def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse: assert response["status"] == "completed" assert payloads["polls"] == 2 post_payload = payloads["posts"][0] - assert post_payload["speech_models"] == ["universal-3-pro", "universal-2"] + assert post_payload["speech_models"] == ["universal-3-pro"] assert "speech_model" not in post_payload assert "webhook_url" not in post_payload From ebb7ad1eb3beea5d43f2347b37ab6c4a3ce0853d Mon Sep 17 00:00:00 2001 From: Dat Date: Mon, 16 Feb 2026 21:09:50 +0100 Subject: [PATCH 2/5] test: make webhook settings tests env-independent --- echo/server/tests/test_settings_webhook.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/echo/server/tests/test_settings_webhook.py b/echo/server/tests/test_settings_webhook.py index 02a92aec5..65928d48b 100644 --- a/echo/server/tests/test_settings_webhook.py +++ b/echo/server/tests/test_settings_webhook.py @@ -4,10 +4,9 @@ def test_dembrane_webhook_url_requires_secret() -> None: - settings = TranscriptionSettings( - provider="Dembrane-25-09", - gcp_sa_json={"type": "service_account"}, - ) + settings = TranscriptionSettings() + settings.provider = "Dembrane-25-09" + settings.gcp_sa_json = {"type": "service_account"} settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" settings.assemblyai_webhook_secret = None @@ -19,10 +18,9 @@ def test_dembrane_webhook_url_requires_secret() -> None: def test_dembrane_webhook_url_with_secret_is_valid() -> None: - settings = TranscriptionSettings( - provider="Dembrane-25-09", - gcp_sa_json={"type": "service_account"}, - ) + settings = TranscriptionSettings() + settings.provider = "Dembrane-25-09" + settings.gcp_sa_json = {"type": "service_account"} settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" settings.assemblyai_webhook_secret = "secret" From cb041f99d7ad768829ccdee153c7d1b379a37033 Mon Sep 17 00:00:00 2001 From: Dat Date: Mon, 16 Feb 2026 21:21:32 +0100 Subject: [PATCH 3/5] fix: narrow AssemblyAI transcript type in sync path --- echo/server/dembrane/transcribe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/echo/server/dembrane/transcribe.py b/echo/server/dembrane/transcribe.py index c1c14aa2b..06f68dfb5 100644 --- a/echo/server/dembrane/transcribe.py +++ b/echo/server/dembrane/transcribe.py @@ -644,16 +644,16 @@ def transcribe_conversation_chunk( logger.info("Using AssemblyAI for transcription") hotwords = _build_hotwords(conversation) signed_url = get_signed_url(chunk["path"], expires_in_seconds=3 * 24 * 60 * 60) - transcript, assemblyai_response = transcribe_audio_assemblyai( + assemblyai_transcript, assemblyai_response = transcribe_audio_assemblyai( signed_url, language=language, hotwords=hotwords ) - if transcript is None: + if assemblyai_transcript is None: raise TranscriptionError( "AssemblyAI returned webhook-mode response without transcript text in sync workflow." ) _save_transcript( conversation_chunk_id, - transcript, + assemblyai_transcript, diarization={ "schema": "ASSEMBLYAI", "data": assemblyai_response.get("words", {}), From f056825f7599c6df2cd3e969d5bf53547c665e89 Mon Sep 17 00:00:00 2001 From: Dat Date: Thu, 19 Feb 2026 16:20:45 +0100 Subject: [PATCH 4/5] fix: use fresh event loop per run_async_in_new_loop call MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With dramatiq-gevent, multiple concurrent greenlets share a single OS thread, so threading.get_ident() returns the same ID for all of them. This caused _get_thread_event_loop() to return a shared cached loop to concurrent greenlets. When 4-5 task_summarize_conversation actors fired simultaneously, each called nest_asyncio-patched run_until_complete() on the same loop. The asyncio.Future objects created by run_in_executor() inside those coroutines cross-contaminated each other, raising: "got Future attached to a different loop" Fix: create a fresh asyncio.new_event_loop() per run_async_in_new_loop() call and close it in finally. Both callers (summarize_conversation, get_conversation_content) use only stateless async operations — all I/O is synchronous requests-based via run_in_thread_pool — so fresh loops per call is safe. Exposed by Stage 3 load test (5 concurrent conversations finalizing simultaneously). Pre-existing bug, not introduced by webhook changes. Co-authored-by: Cursor --- echo/server/dembrane/async_helpers.py | 30 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/echo/server/dembrane/async_helpers.py b/echo/server/dembrane/async_helpers.py index 79a346d6d..197ade305 100644 --- a/echo/server/dembrane/async_helpers.py +++ b/echo/server/dembrane/async_helpers.py @@ -172,17 +172,31 @@ def _get_thread_event_loop() -> asyncio.AbstractEventLoop: def run_async_in_new_loop(coro: Coroutine[Any, Any, T]) -> T: """ - Execute an async coroutine on this thread's persistent event loop. + Execute an async coroutine in a fresh, isolated event loop. Use from synchronous contexts such as Dramatiq actors or CLI scripts to - invoke async FastAPI handlers without hitting "Future attached to a - different loop" errors. + invoke async FastAPI handlers. + + A fresh loop is created per call rather than reusing a cached thread loop. + This prevents "Future attached to a different loop" errors when multiple + concurrent Dramatiq greenlets (dramatiq-gevent uses one OS thread with many + greenlets) share the same thread ID and would otherwise share the same loop. + The coroutines invoked here (summarize_conversation, get_conversation_content) + use only stateless async operations so fresh loops per call is safe. """ if not asyncio.iscoroutine(coro) and not asyncio.isfuture(coro): raise TypeError("run_async_in_new_loop expects a coroutine or Future.") - loop = _get_thread_event_loop() - logger.debug("Running async coroutine in thread loop: %s", coro) - result = loop.run_until_complete(coro) - logger.debug("Completed async coroutine: %s", coro) - return result + import nest_asyncio + + loop = asyncio.new_event_loop() + # Apply nest_asyncio in case dramatiq-gevent has patched asyncio's running + # loop detection on this thread. + nest_asyncio.apply(loop) + logger.debug("Running async coroutine in fresh event loop: %s", coro) + try: + result = loop.run_until_complete(coro) + logger.debug("Completed async coroutine: %s", coro) + return result + finally: + loop.close() From d5a8b81360738e9c0476ebb731235de2353c475a Mon Sep 17 00:00:00 2001 From: Dat Date: Thu, 19 Feb 2026 17:07:46 +0100 Subject: [PATCH 5/5] test: add concurrent run_async_in_new_loop regression tests Adds tests that reproduce the exact failure condition: - Concurrent threads calling run_async_in_new_loop simultaneously - Same-thread-ID scenario (simulates gevent greenlets sharing one OS thread) Both pass with the fresh-loop-per-call fix and would have failed with the previous cached thread-loop implementation. Co-authored-by: Cursor --- echo/server/tests/test_async_helpers.py | 123 ++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 echo/server/tests/test_async_helpers.py diff --git a/echo/server/tests/test_async_helpers.py b/echo/server/tests/test_async_helpers.py new file mode 100644 index 000000000..4a906d71d --- /dev/null +++ b/echo/server/tests/test_async_helpers.py @@ -0,0 +1,123 @@ +""" +Tests for run_async_in_new_loop — specifically the concurrent-greenlet scenario +that caused "Future attached to a different loop" errors under load. + +Regression test for: multiple concurrent callers sharing the same OS thread +(as dramatiq-gevent greenlets do) must not share an event loop. +""" + +import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed + +import pytest + +from dembrane.async_helpers import run_async_in_new_loop + + +async def _simple_coro(value: int) -> int: + """Minimal async coroutine that does a thread-pool round-trip (like run_in_thread_pool).""" + loop = asyncio.get_running_loop() + # Simulate run_in_thread_pool: submit blocking work to the executor + result = await loop.run_in_executor(None, lambda: value * 2) + return result + + +async def _gather_coro(value: int) -> int: + """Uses asyncio.gather internally — matches what summarize_conversation does.""" + loop = asyncio.get_running_loop() + a, b = await asyncio.gather( + loop.run_in_executor(None, lambda: value + 1), + loop.run_in_executor(None, lambda: value + 2), + ) + return a + b + + +def test_run_async_in_new_loop_basic(): + """Single call works correctly.""" + result = run_async_in_new_loop(_simple_coro(5)) + assert result == 10 + + +def test_run_async_in_new_loop_with_gather(): + """Gather inside coroutine works correctly.""" + result = run_async_in_new_loop(_gather_coro(3)) + assert result == 9 # (3+1) + (3+2) = 9 + + +def test_run_async_in_new_loop_concurrent_threads(): + """ + Simulates the dramatiq-gevent scenario: N threads all calling + run_async_in_new_loop concurrently. Before the fix, they shared + a cached loop by thread ID, causing "Future attached to a different + loop" errors under concurrent load. + """ + errors = [] + results = [] + + def worker(value: int): + try: + r = run_async_in_new_loop(_gather_coro(value)) + results.append(r) + except Exception as e: + errors.append(str(e)) + + # Simulate 10 concurrent callers (matches or exceeds Stage 3 load test concurrency) + with ThreadPoolExecutor(max_workers=10) as pool: + futures = [pool.submit(worker, i) for i in range(10)] + for f in as_completed(futures): + f.result() # re-raises if the thread itself crashed + + assert errors == [], f"Concurrent run_async_in_new_loop raised errors: {errors}" + assert len(results) == 10 + + +def test_run_async_in_new_loop_same_thread_sequential(): + """ + Calls from the same thread are safe when sequential. + Verifies loop is properly closed between calls (no 'loop is closed' error). + """ + for i in range(5): + result = run_async_in_new_loop(_simple_coro(i)) + assert result == i * 2 + + +def test_run_async_in_new_loop_same_thread_id_concurrent(): + """ + Reproduces the exact bug: multiple coroutines submitted from threads + that all share the same thread ID (simulated by patching get_ident). + + Before the fix (persistent loop per thread ID), all concurrent callers + shared one loop → "Future attached to a different loop". + After the fix (fresh loop per call), each call is isolated. + """ + original_get_ident = threading.get_ident + # Make all threads report the same thread ID — exactly what gevent does + threading.get_ident = lambda: 99999 + + errors = [] + results = [] + + def worker(value: int): + try: + r = run_async_in_new_loop(_gather_coro(value)) + results.append(r) + except Exception as e: + errors.append(str(e)) + + try: + with ThreadPoolExecutor(max_workers=5) as pool: + futures = [pool.submit(worker, i) for i in range(5)] + for f in as_completed(futures): + f.result() + finally: + threading.get_ident = original_get_ident + + assert errors == [], f"Same-thread-ID concurrent calls raised errors: {errors}" + assert len(results) == 5 + + +def test_run_async_in_new_loop_rejects_non_coroutine(): + """Type guard still works.""" + with pytest.raises(TypeError, match="expects a coroutine or Future"): + run_async_in_new_loop(42) # type: ignore