From c224ac856ec457dfd6c1386d73646d77c030ab97 Mon Sep 17 00:00:00 2001 From: Dat Date: Mon, 16 Feb 2026 21:02:14 +0100 Subject: [PATCH 1/3] feat: add AssemblyAI webhook-driven transcription flow --- echo/server/.env.sample | 3 + echo/server/dembrane/api/api.py | 2 + echo/server/dembrane/api/webhooks.py | 122 +++++++++ echo/server/dembrane/coordination.py | 104 +++++++ echo/server/dembrane/settings.py | 21 ++ echo/server/dembrane/tasks.py | 165 ++++++++++- echo/server/dembrane/transcribe.py | 69 ++++- echo/server/tests/api/test_webhooks.py | 273 +++++++++++++++++++ echo/server/tests/test_settings_webhook.py | 29 ++ echo/server/tests/test_tasks_webhook.py | 220 +++++++++++++++ echo/server/tests/test_transcribe_webhook.py | 131 +++++++++ 11 files changed, 1133 insertions(+), 6 deletions(-) create mode 100644 echo/server/dembrane/api/webhooks.py create mode 100644 echo/server/tests/api/test_webhooks.py create mode 100644 echo/server/tests/test_settings_webhook.py create mode 100644 echo/server/tests/test_tasks_webhook.py create mode 100644 echo/server/tests/test_transcribe_webhook.py diff --git a/echo/server/.env.sample b/echo/server/.env.sample index d51a787e7..25be277e8 100644 --- a/echo/server/.env.sample +++ b/echo/server/.env.sample @@ -38,6 +38,9 @@ DISABLE_SENTRY=0 TRANSCRIPTION_PROVIDER= ASSEMBLYAI_API_KEY= ASSEMBLYAI_BASE_URL=https://api.eu.assemblyai.com +# Optional webhook mode for Dembrane-25-09 +ASSEMBLYAI_WEBHOOK_URL= +ASSEMBLYAI_WEBHOOK_SECRET= # LiteLLM transcription (used when TRANSCRIPTION_PROVIDER=LiteLLM) LITELLM_TRANSCRIPTION_MODEL=whisper-1 diff --git a/echo/server/dembrane/api/api.py b/echo/server/dembrane/api/api.py index 640498acf..06a5754b3 100644 --- a/echo/server/dembrane/api/api.py +++ b/echo/server/dembrane/api/api.py @@ -10,6 +10,7 @@ from dembrane.api.verify import VerifyRouter from dembrane.api.agentic import AgenticRouter from dembrane.api.project import ProjectRouter +from dembrane.api.webhooks import WebhooksRouter from dembrane.api.stateless import StatelessRouter from dembrane.api.participant import ParticipantRouter from dembrane.api.conversation import ConversationRouter @@ -37,3 +38,4 @@ async def health() -> dict: api.include_router(SearchRouter) api.include_router(UserSettingsRouter, prefix="/user-settings") api.include_router(StatsRouter, prefix="/stats") +api.include_router(WebhooksRouter) diff --git a/echo/server/dembrane/api/webhooks.py b/echo/server/dembrane/api/webhooks.py new file mode 100644 index 000000000..58324a307 --- /dev/null +++ b/echo/server/dembrane/api/webhooks.py @@ -0,0 +1,122 @@ +""" +Incoming webhook endpoints for third-party service callbacks. + +These endpoints are public and authenticated via shared secrets. +""" + +import hmac +from logging import getLogger + +from fastapi import Request, APIRouter, HTTPException +from pydantic import BaseModel + +from dembrane.settings import get_settings + +logger = getLogger("api.webhooks") + +WebhooksRouter = APIRouter(tags=["webhooks"]) + + +class AssemblyAIWebhookPayload(BaseModel): + transcript_id: str + status: str + + +@WebhooksRouter.post("/webhooks/assemblyai") +async def assemblyai_webhook_callback( + payload: AssemblyAIWebhookPayload, + request: Request, +) -> dict[str, str]: + """Handle AssemblyAI transcript completion callbacks.""" + settings = get_settings() + expected_secret = settings.transcription.assemblyai_webhook_secret + if not expected_secret: + raise HTTPException(status_code=503, detail="AssemblyAI webhook secret is not configured") + + received_secret = request.headers.get("X-AssemblyAI-Webhook-Secret", "") + if not hmac.compare_digest(received_secret, expected_secret): + logger.warning("AssemblyAI webhook auth failed for transcript %s", payload.transcript_id) + raise HTTPException(status_code=401, detail="Invalid webhook secret") + + logger.info( + "AssemblyAI webhook received (transcript_id=%s status=%s)", + payload.transcript_id, + payload.status, + ) + + from dembrane.coordination import ( + get_assemblyai_webhook_metadata, + delete_assemblyai_webhook_metadata, + mark_assemblyai_webhook_processing, + clear_assemblyai_webhook_processing, + ) + + metadata = get_assemblyai_webhook_metadata(payload.transcript_id) + if not metadata: + logger.warning( + "AssemblyAI webhook ignored: metadata missing for transcript %s", + payload.transcript_id, + ) + return {"status": "ignored"} + + if not mark_assemblyai_webhook_processing(payload.transcript_id): + logger.info( + "AssemblyAI webhook duplicate/in-flight ignored for transcript %s", + payload.transcript_id, + ) + return {"status": "ignored"} + + try: + chunk_id = metadata["chunk_id"] + conversation_id = metadata["conversation_id"] + normalized_status = payload.status.lower() + + if normalized_status == "error": + from dembrane.tasks import _on_chunk_transcription_done + from dembrane.transcribe import _save_chunk_error + + _save_chunk_error(chunk_id, f"AssemblyAI error for transcript {payload.transcript_id}") + _on_chunk_transcription_done(conversation_id, chunk_id, logger) + delete_assemblyai_webhook_metadata(payload.transcript_id) + return {"status": "error_handled"} + + if normalized_status == "completed": + from dembrane.tasks import task_correct_transcript + from dembrane.transcribe import _save_transcript, fetch_assemblyai_result + + transcript_text, full_response = fetch_assemblyai_result(payload.transcript_id) + anonymize_transcripts = bool(metadata.get("anonymize_transcripts", False)) + + if not anonymize_transcripts: + _save_transcript( + chunk_id, + transcript_text, + diarization={ + "schema": "Dembrane-25-09-assemblyai-partial", + "data": full_response, + }, + ) + + task_correct_transcript.send( + chunk_id=chunk_id, + conversation_id=conversation_id, + audio_file_uri=metadata["audio_file_uri"], + candidate_transcript=transcript_text, + hotwords=metadata.get("hotwords"), + use_pii_redaction=bool(metadata.get("use_pii_redaction", False)), + custom_guidance_prompt=metadata.get("custom_guidance_prompt"), + assemblyai_response=full_response, + anonymize_transcripts=anonymize_transcripts, + ) + + delete_assemblyai_webhook_metadata(payload.transcript_id) + return {"status": "ok"} + + logger.warning( + "AssemblyAI webhook ignored unknown status %s for transcript %s", + payload.status, + payload.transcript_id, + ) + return {"status": "ignored"} + finally: + clear_assemblyai_webhook_processing(payload.transcript_id) diff --git a/echo/server/dembrane/coordination.py b/echo/server/dembrane/coordination.py index 5781b8e61..2233c8cc2 100644 --- a/echo/server/dembrane/coordination.py +++ b/echo/server/dembrane/coordination.py @@ -13,6 +13,7 @@ transcriptions completed. """ +import json from typing import Any from logging import getLogger @@ -476,3 +477,106 @@ def cleanup_conversation_coordination(conversation_id: str) -> None: logger.debug(f"Cleaned up {deleted} coordination keys for {conversation_id}") finally: client.close() + + +# ------------------------------------------------------------------------------ +# AssemblyAI Webhook Coordination +# ------------------------------------------------------------------------------ + +_AAI_WEBHOOK_TTL_SECONDS = 60 * 60 * 24 # 24 hours +_AAI_WEBHOOK_PROCESSING_TTL_SECONDS = 60 * 10 # 10 minutes + + +def _assemblyai_webhook_key(transcript_id: str) -> str: + return f"{_KEY_PREFIX}:aai_transcript:{transcript_id}" + + +def _assemblyai_webhook_processing_key(transcript_id: str) -> str: + return f"{_KEY_PREFIX}:aai_transcript_processing:{transcript_id}" + + +def store_assemblyai_webhook_metadata( + transcript_id: str, + chunk_id: str, + conversation_id: str, + audio_file_uri: str, + language: str | None, + hotwords: list[str] | None, + use_pii_redaction: bool, + custom_guidance_prompt: str | None, + anonymize_transcripts: bool, +) -> None: + """Store metadata for webhook callback processing.""" + client = _get_sync_redis_client() + key = _assemblyai_webhook_key(transcript_id) + payload = { + "chunk_id": chunk_id, + "conversation_id": conversation_id, + "audio_file_uri": audio_file_uri, + "language": language, + "hotwords": hotwords or [], + "use_pii_redaction": use_pii_redaction, + "custom_guidance_prompt": custom_guidance_prompt, + "anonymize_transcripts": anonymize_transcripts, + } + + try: + client.set(key, json.dumps(payload)) + client.expire(key, _AAI_WEBHOOK_TTL_SECONDS) + logger.debug( + "Stored AssemblyAI webhook metadata for transcript %s chunk %s", + transcript_id, + chunk_id, + ) + finally: + client.close() + + +def get_assemblyai_webhook_metadata(transcript_id: str) -> dict[str, Any] | None: + """Fetch AssemblyAI webhook metadata by transcript ID.""" + client = _get_sync_redis_client() + key = _assemblyai_webhook_key(transcript_id) + + try: + raw = client.get(key) + if not raw: + return None + return json.loads(raw) + finally: + client.close() + + +def delete_assemblyai_webhook_metadata(transcript_id: str) -> None: + """Delete webhook metadata after terminal handling.""" + client = _get_sync_redis_client() + key = _assemblyai_webhook_key(transcript_id) + + try: + client.delete(key) + finally: + client.close() + + +def mark_assemblyai_webhook_processing(transcript_id: str) -> bool: + """Acquire a per-transcript webhook processing lock.""" + client = _get_sync_redis_client() + key = _assemblyai_webhook_processing_key(transcript_id) + + try: + was_set = client.setnx(key, "1") + if was_set: + client.expire(key, _AAI_WEBHOOK_PROCESSING_TTL_SECONDS) + return bool(was_set) + finally: + client.close() + + +def clear_assemblyai_webhook_processing(transcript_id: str) -> None: + """Release a per-transcript webhook processing lock.""" + client = _get_sync_redis_client() + key = _assemblyai_webhook_processing_key(transcript_id) + + try: + client.delete(key) + finally: + client.close() diff --git a/echo/server/dembrane/settings.py b/echo/server/dembrane/settings.py index 8d2533b2e..e87dc2232 100644 --- a/echo/server/dembrane/settings.py +++ b/echo/server/dembrane/settings.py @@ -466,6 +466,22 @@ class TranscriptionSettings(BaseSettings): alias="ASSEMBLYAI_BASE_URL", validation_alias=AliasChoices("ASSEMBLYAI_BASE_URL", "TRANSCRIPTION__ASSEMBLYAI__BASE_URL"), ) + assemblyai_webhook_url: Optional[str] = Field( + default=None, + alias="ASSEMBLYAI_WEBHOOK_URL", + validation_alias=AliasChoices( + "ASSEMBLYAI_WEBHOOK_URL", + "TRANSCRIPTION__ASSEMBLYAI__WEBHOOK_URL", + ), + ) + assemblyai_webhook_secret: Optional[str] = Field( + default=None, + alias="ASSEMBLYAI_WEBHOOK_SECRET", + validation_alias=AliasChoices( + "ASSEMBLYAI_WEBHOOK_SECRET", + "TRANSCRIPTION__ASSEMBLYAI__WEBHOOK_SECRET", + ), + ) litellm_model: Optional[str] = Field( default=None, alias="LITELLM_TRANSCRIPTION_MODEL", @@ -524,6 +540,11 @@ def ensure_valid(self) -> None: raise ValueError( "GCP_SA_JSON must be provided when TRANSCRIPTION_PROVIDER=Dembrane-25-09" ) + if self.assemblyai_webhook_url and not self.assemblyai_webhook_secret: + raise ValueError( + "ASSEMBLYAI_WEBHOOK_SECRET must be set when " + "ASSEMBLYAI_WEBHOOK_URL is configured for TRANSCRIPTION_PROVIDER=Dembrane-25-09" + ) class AppSettings: diff --git a/echo/server/dembrane/tasks.py b/echo/server/dembrane/tasks.py index e5128ca4a..e94a9c2ca 100644 --- a/echo/server/dembrane/tasks.py +++ b/echo/server/dembrane/tasks.py @@ -1,6 +1,6 @@ # ruff: noqa: E402 import logging -from typing import Optional +from typing import Any, Optional from logging import getLogger import dramatiq @@ -107,7 +107,10 @@ def decode(self, data: bytes) -> MessageData: # Transcription Task @dramatiq.actor(queue_name="network", priority=0) def task_transcribe_chunk( - conversation_chunk_id: str, conversation_id: str, use_pii_redaction: bool = False, anonymize_transcripts: bool = False + conversation_chunk_id: str, + conversation_id: str, + use_pii_redaction: bool = False, + anonymize_transcripts: bool = False, ) -> None: """ Transcribe a conversation chunk. @@ -124,6 +127,61 @@ def task_transcribe_chunk( event_prefix="task_transcribe_chunk", message=f"for chunk {conversation_chunk_id}", ): + from dembrane.s3 import get_signed_url + from dembrane.transcribe import ( + ASSEMBLYAI_WEBHOOK_URL, + TRANSCRIPTION_PROVIDER, + ASSEMBLYAI_WEBHOOK_SECRET, + _fetch_chunk, + _build_hotwords, + _fetch_conversation, + transcribe_audio_assemblyai, + ) + from dembrane.coordination import store_assemblyai_webhook_metadata + + if TRANSCRIPTION_PROVIDER == "Dembrane-25-09" and ASSEMBLYAI_WEBHOOK_URL: + chunk = _fetch_chunk(conversation_chunk_id) + conversation = _fetch_conversation(chunk["conversation_id"]) + language = conversation["project_id"]["language"] or "en" + hotwords = _build_hotwords(conversation) + custom_guidance_prompt = conversation["project_id"].get( + "default_conversation_transcript_prompt" + ) + signed_url = get_signed_url(chunk["path"], expires_in_seconds=3 * 24 * 60 * 60) + + _, response = transcribe_audio_assemblyai( + signed_url, + language=language, + hotwords=hotwords, + webhook_url=ASSEMBLYAI_WEBHOOK_URL, + webhook_secret=ASSEMBLYAI_WEBHOOK_SECRET, + ) + + transcript_id = response.get("transcript_id") + if not transcript_id: + raise ValueError( + "AssemblyAI webhook submission succeeded but transcript_id was missing." + ) + + store_assemblyai_webhook_metadata( + transcript_id=transcript_id, + chunk_id=conversation_chunk_id, + conversation_id=conversation_id, + audio_file_uri=signed_url, + language=language, + hotwords=hotwords, + use_pii_redaction=use_pii_redaction, + custom_guidance_prompt=custom_guidance_prompt, + anonymize_transcripts=anonymize_transcripts, + ) + + logger.info( + "Webhook mode: submitted transcript %s for chunk %s and freed worker", + transcript_id, + conversation_chunk_id, + ) + return + transcribe_conversation_chunk(conversation_chunk_id, use_pii_redaction, anonymize_transcripts) # Transcription succeeded - decrement counter and check for finalization @@ -196,6 +254,108 @@ def _on_chunk_transcription_done( logger.error(f"Error checking conversation state for {conversation_id}: {e}") +@dramatiq.actor(queue_name="network", priority=0) +def task_correct_transcript( + chunk_id: str, + conversation_id: str, + audio_file_uri: str, + candidate_transcript: str, + hotwords: list[str] | None, + use_pii_redaction: bool, + custom_guidance_prompt: str | None, + assemblyai_response: dict[str, Any], + anonymize_transcripts: bool = False, +) -> None: + """Run transcript correction and persist the final transcript for webhook mode.""" + task_logger = getLogger("dembrane.tasks.task_correct_transcript") + + from dembrane.transcribe import ( + _save_transcript, + _save_chunk_error, + _transcript_correction_workflow, + ) + + fallback_transcript = candidate_transcript or "[Nothing to transcribe]" + + try: + if anonymize_transcripts: + from dembrane.pii_regex import regex_redact_pii + + fallback_transcript = regex_redact_pii(fallback_transcript) or "[Nothing to transcribe]" + + with ProcessingStatusContext( + conversation_id=conversation_id, + event_prefix="task_correct_transcript", + message=f"for chunk {chunk_id}", + ): + corrected_transcript, note = _transcript_correction_workflow( + audio_file_uri=audio_file_uri, + candidate_transcript=fallback_transcript, + hotwords=hotwords, + use_pii_redaction=True if anonymize_transcripts else use_pii_redaction, + custom_guidance_prompt=custom_guidance_prompt, + ) + + final_transcript = corrected_transcript or "[Nothing to transcribe]" + if anonymize_transcripts: + diarization = { + "schema": "Dembrane-26-01-redaction", + "data": { + "note": note, + "raw": {}, + "error": None, + }, + } + else: + diarization = { + "schema": "Dembrane-25-09", + "data": { + "note": note, + "raw": assemblyai_response, + "error": None, + }, + } + + _save_transcript(chunk_id, final_transcript, diarization=diarization) + except Exception as e: + task_logger.error("Gemini correction failed for chunk %s: %s", chunk_id, e) + + try: + if anonymize_transcripts: + fallback_diarization = { + "schema": "Dembrane-26-01-redaction", + "data": { + "note": None, + "raw": {}, + "error": str(e), + }, + } + else: + fallback_diarization = { + "schema": "Dembrane-25-09", + "data": { + "note": None, + "raw": assemblyai_response, + "error": str(e), + }, + } + + _save_transcript( + chunk_id, + fallback_transcript or "[Nothing to transcribe]", + diarization=fallback_diarization, + ) + except Exception as save_error: + task_logger.error( + "Failed to save fallback transcript for chunk %s: %s", + chunk_id, + save_error, + ) + _save_chunk_error(chunk_id, f"Failed to save fallback transcript: {save_error}") + finally: + _on_chunk_transcription_done(conversation_id, chunk_id, task_logger) + + @dramatiq.actor(queue_name="network", priority=20) def task_finalize_conversation(conversation_id: str) -> None: """ @@ -950,4 +1110,3 @@ def task_dispatch_webhook(webhook_id: str, payload: dict) -> None: except Exception as e: logger.error(f"Webhook {webhook_id} dispatch failed: {e}") raise # Retry on network errors etc. - diff --git a/echo/server/dembrane/transcribe.py b/echo/server/dembrane/transcribe.py index b717f26fe..c1c14aa2b 100644 --- a/echo/server/dembrane/transcribe.py +++ b/echo/server/dembrane/transcribe.py @@ -34,6 +34,8 @@ GCP_SA_JSON = transcription_cfg.gcp_sa_json ASSEMBLYAI_API_KEY = transcription_cfg.assemblyai_api_key ASSEMBLYAI_BASE_URL = transcription_cfg.assemblyai_base_url +ASSEMBLYAI_WEBHOOK_URL = transcription_cfg.assemblyai_webhook_url +ASSEMBLYAI_WEBHOOK_SECRET = transcription_cfg.assemblyai_webhook_secret TRANSCRIPTION_PROVIDER = transcription_cfg.provider LITELLM_TRANSCRIPTION_MODEL = transcription_cfg.litellm_model LITELLM_TRANSCRIPTION_API_KEY = transcription_cfg.litellm_api_key @@ -91,7 +93,9 @@ def transcribe_audio_assemblyai( audio_file_uri: str, language: Optional[str], # pyright: ignore[reportUnusedParameter] hotwords: Optional[List[str]], -) -> tuple[str, dict[str, Any]]: + webhook_url: Optional[str] = None, + webhook_secret: Optional[str] = None, +) -> tuple[Optional[str], dict[str, Any]]: """Transcribe audio through AssemblyAI""" logger = logging.getLogger("transcribe.transcribe_audio_assemblyai") logger.info("Submitting AssemblyAI transcription request for %s", audio_file_uri) @@ -103,7 +107,7 @@ def transcribe_audio_assemblyai( data: dict[str, Any] = { "audio_url": audio_file_uri, - "speech_model": "universal", + "speech_models": ["universal-3-pro"], "language_detection": True, "language_detection_options": { "expected_languages": list(set(get_allowed_languages()) | {"pt"}), @@ -121,6 +125,27 @@ def transcribe_audio_assemblyai( # We slice to ensure we don't exceed the limit data["keyterms_prompt"] = hotwords[:ASSEMBLYAI_MAX_HOTWORDS] + # Webhook mode: submit and return immediately. + if webhook_url: + logger.info("AssemblyAI submit payload uses speech_models=%s", data["speech_models"]) + data["webhook_url"] = webhook_url + if webhook_secret: + data["webhook_auth_header_name"] = "X-AssemblyAI-Webhook-Secret" + data["webhook_auth_header_value"] = webhook_secret + + response = requests.post(f"{ASSEMBLYAI_BASE_URL}/v2/transcript", headers=headers, json=data) + if response.status_code == 200: + transcript_id = response.json()["id"] + logger.info( + "AssemblyAI job submitted in webhook mode (transcript_id=%s)", + transcript_id, + ) + return None, {"transcript_id": transcript_id} + if response.status_code == 400: + raise TranscriptionError(f"Transcription failed: {response.json()['error']}") + raise Exception(f"Transcription failed: {response.json()['error']}") + + logger.info("AssemblyAI submit payload uses speech_models=%s", data["speech_models"]) response = requests.post(f"{ASSEMBLYAI_BASE_URL}/v2/transcript", headers=headers, json=data) if response.status_code == 200: @@ -168,6 +193,32 @@ def transcribe_audio_assemblyai( raise Exception(f"Transcription failed: {response.json()['error']}") +def fetch_assemblyai_result(transcript_id: str) -> tuple[str, dict[str, Any]]: + """Fetch a completed AssemblyAI transcript by ID.""" + fetch_logger = logging.getLogger("transcribe.fetch_assemblyai_result") + headers = {"Authorization": f"Bearer {ASSEMBLYAI_API_KEY}"} + url = f"{ASSEMBLYAI_BASE_URL}/v2/transcript/{transcript_id}" + response = requests.get(url, headers=headers) + + if response.status_code != 200: + raise TranscriptionError( + f"Failed to fetch transcript {transcript_id}: HTTP {response.status_code}" + ) + + data = response.json() + status = data.get("status") + if status == "error": + raise TranscriptionError(f"Transcript {transcript_id} failed: {data.get('error')}") + if status != "completed": + raise TranscriptionError( + f"Transcript {transcript_id} not completed: status={status}" + ) + + text = data.get("text", "") + fetch_logger.info("Fetched transcript %s (%d chars)", transcript_id, len(text)) + return text, data + + def _get_audio_file_object(audio_file_uri: str) -> Any: try: audio_stream = file_service.get_stream(audio_file_uri) @@ -298,6 +349,10 @@ def transcribe_audio_dembrane_25_09( assemblyai_response_failed = False try: transcript, response = transcribe_audio_assemblyai(audio_file_uri, language, hotwords) + if transcript is None: + raise TranscriptionError( + "AssemblyAI returned webhook-mode response without transcript text in sync workflow." + ) logger.debug(f"transcript from assemblyai: {transcript}") except TranscriptionError as e: assemblyai_response_failed = True @@ -361,7 +416,11 @@ def transcribe_audio_dembrane_26_01_redaction( assemblyai_response_failed = False try: - transcript, response = transcribe_audio_assemblyai(audio_file_uri, language, hotwords) + transcript, _ = transcribe_audio_assemblyai(audio_file_uri, language, hotwords) + if transcript is None: + raise TranscriptionError( + "AssemblyAI returned webhook-mode response without transcript text in sync workflow." + ) logger.debug(f"transcript from assemblyai: {transcript}") except TranscriptionError as e: assemblyai_response_failed = True @@ -588,6 +647,10 @@ def transcribe_conversation_chunk( transcript, assemblyai_response = transcribe_audio_assemblyai( signed_url, language=language, hotwords=hotwords ) + if transcript is None: + raise TranscriptionError( + "AssemblyAI returned webhook-mode response without transcript text in sync workflow." + ) _save_transcript( conversation_chunk_id, transcript, diff --git a/echo/server/tests/api/test_webhooks.py b/echo/server/tests/api/test_webhooks.py new file mode 100644 index 000000000..12e73a6e0 --- /dev/null +++ b/echo/server/tests/api/test_webhooks.py @@ -0,0 +1,273 @@ +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any, AsyncIterator +from contextlib import asynccontextmanager + +import pytest +from httpx import AsyncClient, ASGITransport +from fastapi import FastAPI + +import dembrane.tasks as tasks +import dembrane.transcribe as transcribe +import dembrane.api.webhooks as webhooks_api +import dembrane.coordination as coordination + + +@asynccontextmanager +async def _build_client() -> AsyncIterator[AsyncClient]: + app = FastAPI() + app.include_router(webhooks_api.WebhooksRouter, prefix="/api") + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + yield client + + +def _set_secret(monkeypatch: pytest.MonkeyPatch, secret: str) -> None: + monkeypatch.setattr( + webhooks_api, + "get_settings", + lambda: SimpleNamespace( + transcription=SimpleNamespace(assemblyai_webhook_secret=secret), + ), + ) + + +@pytest.mark.asyncio +async def test_assemblyai_webhook_auth_rejected(monkeypatch: pytest.MonkeyPatch) -> None: + _set_secret(monkeypatch, "expected-secret") + + async with _build_client() as client: + response = await client.post( + "/api/webhooks/assemblyai", + headers={"X-AssemblyAI-Webhook-Secret": "wrong-secret"}, + json={"transcript_id": "tx-1", "status": "completed"}, + ) + + assert response.status_code == 401 + + +@pytest.mark.asyncio +async def test_assemblyai_webhook_completed_standard(monkeypatch: pytest.MonkeyPatch) -> None: + _set_secret(monkeypatch, "expected-secret") + state: dict[str, Any] = {} + + monkeypatch.setattr( + coordination, + "get_assemblyai_webhook_metadata", + lambda _transcript_id: { + "chunk_id": "chunk-1", + "conversation_id": "conv-1", + "audio_file_uri": "https://signed/audio.mp3", + "hotwords": ["Dembrane"], + "use_pii_redaction": False, + "custom_guidance_prompt": "guide", + "anonymize_transcripts": False, + }, + ) + monkeypatch.setattr(coordination, "mark_assemblyai_webhook_processing", lambda _id: True) + monkeypatch.setattr( + coordination, + "delete_assemblyai_webhook_metadata", + lambda _id: state.update({"deleted": True}), + ) + monkeypatch.setattr( + coordination, + "clear_assemblyai_webhook_processing", + lambda _id: state.update({"cleared": True}), + ) + monkeypatch.setattr( + transcribe, + "fetch_assemblyai_result", + lambda _id: ("hello world", {"status": "completed", "text": "hello world"}), + ) + monkeypatch.setattr( + transcribe, + "_save_transcript", + lambda chunk_id, transcript, diarization=None: state.update( + {"partial_chunk_id": chunk_id, "partial_transcript": transcript, "partial": diarization} + ), + ) + monkeypatch.setattr( + tasks.task_correct_transcript, + "send", + lambda **kwargs: state.update({"task_payload": kwargs}), + ) + + async with _build_client() as client: + response = await client.post( + "/api/webhooks/assemblyai", + headers={"X-AssemblyAI-Webhook-Secret": "expected-secret"}, + json={"transcript_id": "tx-1", "status": "completed"}, + ) + + assert response.status_code == 200 + assert response.json()["status"] == "ok" + assert state["partial_chunk_id"] == "chunk-1" + assert state["partial"]["schema"] == "Dembrane-25-09-assemblyai-partial" + assert state["task_payload"]["anonymize_transcripts"] is False + assert state["deleted"] is True + assert state["cleared"] is True + + +@pytest.mark.asyncio +async def test_assemblyai_webhook_completed_anonymized_skips_partial( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _set_secret(monkeypatch, "expected-secret") + state: dict[str, Any] = {"partial_calls": 0} + + monkeypatch.setattr( + coordination, + "get_assemblyai_webhook_metadata", + lambda _transcript_id: { + "chunk_id": "chunk-2", + "conversation_id": "conv-2", + "audio_file_uri": "https://signed/audio.mp3", + "hotwords": [], + "use_pii_redaction": True, + "custom_guidance_prompt": None, + "anonymize_transcripts": True, + }, + ) + monkeypatch.setattr(coordination, "mark_assemblyai_webhook_processing", lambda _id: True) + monkeypatch.setattr(coordination, "delete_assemblyai_webhook_metadata", lambda _id: None) + monkeypatch.setattr( + coordination, + "clear_assemblyai_webhook_processing", + lambda _id: state.update({"cleared": True}), + ) + monkeypatch.setattr( + transcribe, + "fetch_assemblyai_result", + lambda _id: ("hello world", {"status": "completed", "text": "hello world"}), + ) + monkeypatch.setattr( + transcribe, + "_save_transcript", + lambda *_args, **_kwargs: state.update({"partial_calls": state["partial_calls"] + 1}), + ) + monkeypatch.setattr( + tasks.task_correct_transcript, + "send", + lambda **kwargs: state.update({"task_payload": kwargs}), + ) + + async with _build_client() as client: + response = await client.post( + "/api/webhooks/assemblyai", + headers={"X-AssemblyAI-Webhook-Secret": "expected-secret"}, + json={"transcript_id": "tx-2", "status": "completed"}, + ) + + assert response.status_code == 200 + assert response.json()["status"] == "ok" + assert state["partial_calls"] == 0 + assert state["task_payload"]["anonymize_transcripts"] is True + assert state["cleared"] is True + + +@pytest.mark.asyncio +async def test_assemblyai_webhook_error_status(monkeypatch: pytest.MonkeyPatch) -> None: + _set_secret(monkeypatch, "expected-secret") + state: dict[str, Any] = {} + + monkeypatch.setattr( + coordination, + "get_assemblyai_webhook_metadata", + lambda _transcript_id: {"chunk_id": "chunk-3", "conversation_id": "conv-3"}, + ) + monkeypatch.setattr(coordination, "mark_assemblyai_webhook_processing", lambda _id: True) + monkeypatch.setattr( + coordination, + "delete_assemblyai_webhook_metadata", + lambda _id: state.update({"deleted": True}), + ) + monkeypatch.setattr( + coordination, + "clear_assemblyai_webhook_processing", + lambda _id: state.update({"cleared": True}), + ) + monkeypatch.setattr( + transcribe, + "_save_chunk_error", + lambda chunk_id, message: state.update({"chunk_id": chunk_id, "message": message}), + ) + monkeypatch.setattr( + tasks, + "_on_chunk_transcription_done", + lambda conversation_id, chunk_id, _logger: state.update( + {"conversation_id": conversation_id, "done_chunk_id": chunk_id} + ), + ) + + async with _build_client() as client: + response = await client.post( + "/api/webhooks/assemblyai", + headers={"X-AssemblyAI-Webhook-Secret": "expected-secret"}, + json={"transcript_id": "tx-3", "status": "error"}, + ) + + assert response.status_code == 200 + assert response.json()["status"] == "error_handled" + assert state["chunk_id"] == "chunk-3" + assert state["conversation_id"] == "conv-3" + assert state["deleted"] is True + assert state["cleared"] is True + + +@pytest.mark.asyncio +async def test_assemblyai_webhook_missing_metadata_is_ignored( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _set_secret(monkeypatch, "expected-secret") + state: dict[str, Any] = {"marked": 0} + + monkeypatch.setattr(coordination, "get_assemblyai_webhook_metadata", lambda _id: None) + monkeypatch.setattr( + coordination, + "mark_assemblyai_webhook_processing", + lambda _id: state.update({"marked": state["marked"] + 1}), + ) + + async with _build_client() as client: + response = await client.post( + "/api/webhooks/assemblyai", + headers={"X-AssemblyAI-Webhook-Secret": "expected-secret"}, + json={"transcript_id": "tx-4", "status": "completed"}, + ) + + assert response.status_code == 200 + assert response.json()["status"] == "ignored" + assert state["marked"] == 0 + + +@pytest.mark.asyncio +async def test_assemblyai_webhook_duplicate_inflight_is_ignored( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _set_secret(monkeypatch, "expected-secret") + state: dict[str, Any] = {"cleared": 0} + + monkeypatch.setattr( + coordination, + "get_assemblyai_webhook_metadata", + lambda _id: {"chunk_id": "chunk-5", "conversation_id": "conv-5"}, + ) + monkeypatch.setattr(coordination, "mark_assemblyai_webhook_processing", lambda _id: False) + monkeypatch.setattr( + coordination, + "clear_assemblyai_webhook_processing", + lambda _id: state.update({"cleared": state["cleared"] + 1}), + ) + + async with _build_client() as client: + response = await client.post( + "/api/webhooks/assemblyai", + headers={"X-AssemblyAI-Webhook-Secret": "expected-secret"}, + json={"transcript_id": "tx-5", "status": "completed"}, + ) + + assert response.status_code == 200 + assert response.json()["status"] == "ignored" + assert state["cleared"] == 0 diff --git a/echo/server/tests/test_settings_webhook.py b/echo/server/tests/test_settings_webhook.py new file mode 100644 index 000000000..02a92aec5 --- /dev/null +++ b/echo/server/tests/test_settings_webhook.py @@ -0,0 +1,29 @@ +import pytest + +from dembrane.settings import TranscriptionSettings + + +def test_dembrane_webhook_url_requires_secret() -> None: + settings = TranscriptionSettings( + provider="Dembrane-25-09", + gcp_sa_json={"type": "service_account"}, + ) + settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" + settings.assemblyai_webhook_secret = None + + with pytest.raises( + ValueError, + match="ASSEMBLYAI_WEBHOOK_SECRET must be set when ASSEMBLYAI_WEBHOOK_URL is configured", + ): + settings.ensure_valid() + + +def test_dembrane_webhook_url_with_secret_is_valid() -> None: + settings = TranscriptionSettings( + provider="Dembrane-25-09", + gcp_sa_json={"type": "service_account"}, + ) + settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" + settings.assemblyai_webhook_secret = "secret" + + settings.ensure_valid() diff --git a/echo/server/tests/test_tasks_webhook.py b/echo/server/tests/test_tasks_webhook.py new file mode 100644 index 000000000..c1561e49d --- /dev/null +++ b/echo/server/tests/test_tasks_webhook.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +from typing import Any +from contextlib import nullcontext + +import dembrane.tasks as tasks +import dembrane.transcribe as transcribe + + +def test_task_transcribe_chunk_webhook_mode_returns_early(monkeypatch) -> None: + stored: dict[str, Any] = {} + called: dict[str, Any] = {"transcribe": False, "decrement": False} + + monkeypatch.setattr(tasks, "ProcessingStatusContext", lambda **_kwargs: nullcontext()) + monkeypatch.setattr(transcribe, "TRANSCRIPTION_PROVIDER", "Dembrane-25-09") + monkeypatch.setattr(transcribe, "ASSEMBLYAI_WEBHOOK_URL", "https://api.example.com/hook") + monkeypatch.setattr(transcribe, "ASSEMBLYAI_WEBHOOK_SECRET", "secret") + monkeypatch.setattr( + transcribe, + "_fetch_chunk", + lambda _chunk_id: {"conversation_id": "conv-1", "path": "uploads/chunk.mp3"}, + ) + monkeypatch.setattr( + transcribe, + "_fetch_conversation", + lambda _conversation_id: { + "project_id": { + "language": "en", + "default_conversation_transcript_prompt": "Dembrane", + } + }, + ) + monkeypatch.setattr(transcribe, "_build_hotwords", lambda _conversation: ["Dembrane"]) + monkeypatch.setattr( + transcribe, + "transcribe_audio_assemblyai", + lambda *_args, **_kwargs: (None, {"transcript_id": "tx-100"}), + ) + + import dembrane.s3 as s3 + import dembrane.coordination as coordination + + monkeypatch.setattr( + s3, + "get_signed_url", + lambda _path, expires_in_seconds: f"https://signed/{expires_in_seconds}", + ) + monkeypatch.setattr( + coordination, + "store_assemblyai_webhook_metadata", + lambda **kwargs: stored.update(kwargs), + ) + + def _unexpected_transcribe(*_args, **_kwargs) -> None: + called["transcribe"] = True + raise AssertionError("polling transcription path should not run in webhook mode") + + def _unexpected_decrement(*_args, **_kwargs) -> None: + called["decrement"] = True + raise AssertionError("decrement should not run before webhook completion") + + monkeypatch.setattr(tasks, "transcribe_conversation_chunk", _unexpected_transcribe) + monkeypatch.setattr(tasks, "_on_chunk_transcription_done", _unexpected_decrement) + + tasks.task_transcribe_chunk.fn( + conversation_chunk_id="chunk-1", + conversation_id="conv-1", + use_pii_redaction=True, + anonymize_transcripts=True, + ) + + assert called["transcribe"] is False + assert called["decrement"] is False + assert stored["transcript_id"] == "tx-100" + assert stored["chunk_id"] == "chunk-1" + assert stored["conversation_id"] == "conv-1" + assert stored["anonymize_transcripts"] is True + + +def test_task_correct_transcript_standard_mode(monkeypatch) -> None: + saved: dict[str, Any] = {} + decremented: dict[str, Any] = {} + + monkeypatch.setattr(tasks, "ProcessingStatusContext", lambda **_kwargs: nullcontext()) + monkeypatch.setattr( + transcribe, + "_transcript_correction_workflow", + lambda **_kwargs: ("corrected text", "note"), + ) + monkeypatch.setattr( + transcribe, + "_save_transcript", + lambda chunk_id, transcript, diarization=None: saved.update( + {"chunk_id": chunk_id, "transcript": transcript, "diarization": diarization} + ), + ) + monkeypatch.setattr( + tasks, + "_on_chunk_transcription_done", + lambda conversation_id, chunk_id, _logger: decremented.update( + {"conversation_id": conversation_id, "chunk_id": chunk_id} + ), + ) + + tasks.task_correct_transcript.fn( + chunk_id="chunk-2", + conversation_id="conv-2", + audio_file_uri="https://signed/audio.mp3", + candidate_transcript="candidate", + hotwords=["Dembrane"], + use_pii_redaction=False, + custom_guidance_prompt="guide", + assemblyai_response={"words": []}, + anonymize_transcripts=False, + ) + + assert saved["chunk_id"] == "chunk-2" + assert saved["transcript"] == "corrected text" + assert saved["diarization"]["schema"] == "Dembrane-25-09" + assert saved["diarization"]["data"]["raw"] == {"words": []} + assert decremented == {"conversation_id": "conv-2", "chunk_id": "chunk-2"} + + +def test_task_correct_transcript_anonymized_mode(monkeypatch) -> None: + saved: dict[str, Any] = {} + observed: dict[str, Any] = {} + decremented: dict[str, Any] = {} + + monkeypatch.setattr(tasks, "ProcessingStatusContext", lambda **_kwargs: nullcontext()) + import dembrane.pii_regex as pii_regex + + monkeypatch.setattr(pii_regex, "regex_redact_pii", lambda text: f"redacted:{text}") + + def _fake_workflow(**kwargs): + observed["candidate_transcript"] = kwargs["candidate_transcript"] + observed["use_pii_redaction"] = kwargs["use_pii_redaction"] + return ("clean transcript", "note") + + monkeypatch.setattr(transcribe, "_transcript_correction_workflow", _fake_workflow) + monkeypatch.setattr( + transcribe, + "_save_transcript", + lambda chunk_id, transcript, diarization=None: saved.update( + {"chunk_id": chunk_id, "transcript": transcript, "diarization": diarization} + ), + ) + monkeypatch.setattr( + tasks, + "_on_chunk_transcription_done", + lambda conversation_id, chunk_id, _logger: decremented.update( + {"conversation_id": conversation_id, "chunk_id": chunk_id} + ), + ) + + tasks.task_correct_transcript.fn( + chunk_id="chunk-3", + conversation_id="conv-3", + audio_file_uri="https://signed/audio.mp3", + candidate_transcript="my raw transcript", + hotwords=[], + use_pii_redaction=False, + custom_guidance_prompt=None, + assemblyai_response={"words": [{"text": "secret"}]}, + anonymize_transcripts=True, + ) + + assert observed["candidate_transcript"] == "redacted:my raw transcript" + assert observed["use_pii_redaction"] is True + assert saved["diarization"]["schema"] == "Dembrane-26-01-redaction" + assert saved["diarization"]["data"]["raw"] == {} + assert decremented == {"conversation_id": "conv-3", "chunk_id": "chunk-3"} + + +def test_task_correct_transcript_fallback_and_error_save(monkeypatch) -> None: + state: dict[str, Any] = {"save_calls": 0} + decremented: dict[str, Any] = {} + + monkeypatch.setattr(tasks, "ProcessingStatusContext", lambda **_kwargs: nullcontext()) + monkeypatch.setattr( + transcribe, + "_transcript_correction_workflow", + lambda **_kwargs: (_ for _ in ()).throw(RuntimeError("workflow failed")), + ) + + def _failing_save(*_args, **_kwargs) -> None: + state["save_calls"] += 1 + raise RuntimeError("save failed") + + monkeypatch.setattr(transcribe, "_save_transcript", _failing_save) + monkeypatch.setattr( + transcribe, + "_save_chunk_error", + lambda chunk_id, error_message: state.update( + {"chunk_id": chunk_id, "error_message": error_message} + ), + ) + monkeypatch.setattr( + tasks, + "_on_chunk_transcription_done", + lambda conversation_id, chunk_id, _logger: decremented.update( + {"conversation_id": conversation_id, "chunk_id": chunk_id} + ), + ) + + tasks.task_correct_transcript.fn( + chunk_id="chunk-4", + conversation_id="conv-4", + audio_file_uri="https://signed/audio.mp3", + candidate_transcript="candidate", + hotwords=None, + use_pii_redaction=False, + custom_guidance_prompt=None, + assemblyai_response={}, + anonymize_transcripts=False, + ) + + assert state["save_calls"] == 1 + assert state["chunk_id"] == "chunk-4" + assert "Failed to save fallback transcript" in state["error_message"] + assert decremented == {"conversation_id": "conv-4", "chunk_id": "chunk-4"} diff --git a/echo/server/tests/test_transcribe_webhook.py b/echo/server/tests/test_transcribe_webhook.py new file mode 100644 index 000000000..7193dc1ed --- /dev/null +++ b/echo/server/tests/test_transcribe_webhook.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from typing import Any + +import pytest + +import dembrane.transcribe as transcribe +from dembrane.transcribe import TranscriptionError + + +class _FakeResponse: + def __init__(self, status_code: int, payload: dict[str, Any]) -> None: + self.status_code = status_code + self._payload = payload + + def json(self) -> dict[str, Any]: + return self._payload + + +def _stub_languages(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(transcribe, "get_allowed_languages", lambda: ["en", "nl"]) + + +def test_transcribe_audio_assemblyai_webhook_mode(monkeypatch: pytest.MonkeyPatch) -> None: + _stub_languages(monkeypatch) + captured: dict[str, Any] = {} + + def _fake_post(url: str, **kwargs: Any) -> _FakeResponse: + captured["url"] = url + captured["headers"] = kwargs["headers"] + captured["json"] = kwargs["json"] + return _FakeResponse(200, {"id": "tx-1"}) + + monkeypatch.setattr(transcribe.requests, "post", _fake_post) + monkeypatch.setattr( + transcribe.requests, + "get", + lambda *_args, **_kwargs: (_ for _ in ()).throw( + AssertionError("polling GET must not run in webhook mode") + ), + ) + + transcript, payload = transcribe.transcribe_audio_assemblyai( + audio_file_uri="https://example.com/audio.mp3", + language="en", + hotwords=["Dembrane"], + webhook_url="https://api.example.com/api/webhooks/assemblyai", + webhook_secret="top-secret", + ) + + assert transcript is None + assert payload == {"transcript_id": "tx-1"} + assert captured["url"].endswith("/v2/transcript") + assert captured["json"]["speech_models"] == ["universal-3-pro"] + assert "speech_model" not in captured["json"] + assert "prompt" not in captured["json"] + assert captured["json"]["keyterms_prompt"] == ["Dembrane"] + assert captured["json"]["webhook_url"] == "https://api.example.com/api/webhooks/assemblyai" + assert captured["json"]["webhook_auth_header_name"] == "X-AssemblyAI-Webhook-Secret" + assert captured["json"]["webhook_auth_header_value"] == "top-secret" + + +def test_transcribe_audio_assemblyai_polling_mode(monkeypatch: pytest.MonkeyPatch) -> None: + _stub_languages(monkeypatch) + payloads: dict[str, Any] = {"posts": [], "polls": 0} + poll_responses = iter( + [ + {"status": "processing"}, + {"status": "completed", "text": "hello", "words": [{"text": "hello"}]}, + ] + ) + + def _fake_post(_url: str, **kwargs: Any) -> _FakeResponse: + payloads["posts"].append(kwargs["json"]) + return _FakeResponse(200, {"id": "tx-2"}) + + def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse: + payloads["polls"] += 1 + return _FakeResponse(200, next(poll_responses)) + + monkeypatch.setattr(transcribe.requests, "post", _fake_post) + monkeypatch.setattr(transcribe.requests, "get", _fake_get) + monkeypatch.setattr(transcribe.time, "sleep", lambda *_args, **_kwargs: None) + + transcript, response = transcribe.transcribe_audio_assemblyai( + audio_file_uri="https://example.com/audio.mp3", + language="en", + hotwords=["Dembrane"], + ) + + assert transcript == "hello" + assert response["status"] == "completed" + assert payloads["polls"] == 2 + post_payload = payloads["posts"][0] + assert post_payload["speech_models"] == ["universal-3-pro"] + assert "speech_model" not in post_payload + assert "webhook_url" not in post_payload + + +def test_fetch_assemblyai_result_success(monkeypatch: pytest.MonkeyPatch) -> None: + def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse: + return _FakeResponse(200, {"status": "completed", "text": "done"}) + + monkeypatch.setattr(transcribe.requests, "get", _fake_get) + + text, response = transcribe.fetch_assemblyai_result("tx-3") + assert text == "done" + assert response["status"] == "completed" + + +@pytest.mark.parametrize( + ("status_code", "payload", "message"), + [ + (500, {"error": "boom"}, "HTTP 500"), + (200, {"status": "error", "error": "failed"}, "failed"), + (200, {"status": "processing"}, "not completed"), + ], +) +def test_fetch_assemblyai_result_errors( + monkeypatch: pytest.MonkeyPatch, + status_code: int, + payload: dict[str, Any], + message: str, +) -> None: + def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse: + return _FakeResponse(status_code, payload) + + monkeypatch.setattr(transcribe.requests, "get", _fake_get) + + with pytest.raises(TranscriptionError, match=message): + transcribe.fetch_assemblyai_result("tx-4") From 2e6195abab63dc3743801010570bc105cf88e275 Mon Sep 17 00:00:00 2001 From: Dat Date: Mon, 16 Feb 2026 21:09:50 +0100 Subject: [PATCH 2/3] test: make webhook settings tests env-independent --- echo/server/tests/test_settings_webhook.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/echo/server/tests/test_settings_webhook.py b/echo/server/tests/test_settings_webhook.py index 02a92aec5..65928d48b 100644 --- a/echo/server/tests/test_settings_webhook.py +++ b/echo/server/tests/test_settings_webhook.py @@ -4,10 +4,9 @@ def test_dembrane_webhook_url_requires_secret() -> None: - settings = TranscriptionSettings( - provider="Dembrane-25-09", - gcp_sa_json={"type": "service_account"}, - ) + settings = TranscriptionSettings() + settings.provider = "Dembrane-25-09" + settings.gcp_sa_json = {"type": "service_account"} settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" settings.assemblyai_webhook_secret = None @@ -19,10 +18,9 @@ def test_dembrane_webhook_url_requires_secret() -> None: def test_dembrane_webhook_url_with_secret_is_valid() -> None: - settings = TranscriptionSettings( - provider="Dembrane-25-09", - gcp_sa_json={"type": "service_account"}, - ) + settings = TranscriptionSettings() + settings.provider = "Dembrane-25-09" + settings.gcp_sa_json = {"type": "service_account"} settings.assemblyai_webhook_url = "https://api.example.com/api/webhooks/assemblyai" settings.assemblyai_webhook_secret = "secret" From ffdc724169285ad093719f7d4d218922fe6654ce Mon Sep 17 00:00:00 2001 From: Dat Date: Mon, 16 Feb 2026 21:21:32 +0100 Subject: [PATCH 3/3] fix: narrow AssemblyAI transcript type in sync path --- echo/server/dembrane/transcribe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/echo/server/dembrane/transcribe.py b/echo/server/dembrane/transcribe.py index c1c14aa2b..06f68dfb5 100644 --- a/echo/server/dembrane/transcribe.py +++ b/echo/server/dembrane/transcribe.py @@ -644,16 +644,16 @@ def transcribe_conversation_chunk( logger.info("Using AssemblyAI for transcription") hotwords = _build_hotwords(conversation) signed_url = get_signed_url(chunk["path"], expires_in_seconds=3 * 24 * 60 * 60) - transcript, assemblyai_response = transcribe_audio_assemblyai( + assemblyai_transcript, assemblyai_response = transcribe_audio_assemblyai( signed_url, language=language, hotwords=hotwords ) - if transcript is None: + if assemblyai_transcript is None: raise TranscriptionError( "AssemblyAI returned webhook-mode response without transcript text in sync workflow." ) _save_transcript( conversation_chunk_id, - transcript, + assemblyai_transcript, diarization={ "schema": "ASSEMBLYAI", "data": assemblyai_response.get("words", {}),