Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions echo/server/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ DISABLE_SENTRY=0
TRANSCRIPTION_PROVIDER=
ASSEMBLYAI_API_KEY=
ASSEMBLYAI_BASE_URL=https://api.eu.assemblyai.com
# Optional webhook mode for Dembrane-25-09
ASSEMBLYAI_WEBHOOK_URL=
ASSEMBLYAI_WEBHOOK_SECRET=

# LiteLLM transcription (used when TRANSCRIPTION_PROVIDER=LiteLLM)
LITELLM_TRANSCRIPTION_MODEL=whisper-1
Expand Down
2 changes: 2 additions & 0 deletions echo/server/dembrane/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dembrane.api.verify import VerifyRouter
from dembrane.api.agentic import AgenticRouter
from dembrane.api.project import ProjectRouter
from dembrane.api.webhooks import WebhooksRouter
from dembrane.api.stateless import StatelessRouter
from dembrane.api.participant import ParticipantRouter
from dembrane.api.conversation import ConversationRouter
Expand Down Expand Up @@ -37,3 +38,4 @@ async def health() -> dict:
api.include_router(SearchRouter)
api.include_router(UserSettingsRouter, prefix="/user-settings")
api.include_router(StatsRouter, prefix="/stats")
api.include_router(WebhooksRouter)
122 changes: 122 additions & 0 deletions echo/server/dembrane/api/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""
Incoming webhook endpoints for third-party service callbacks.

These endpoints are public and authenticated via shared secrets.
"""

import hmac
from logging import getLogger

from fastapi import Request, APIRouter, HTTPException
from pydantic import BaseModel

from dembrane.settings import get_settings

logger = getLogger("api.webhooks")

WebhooksRouter = APIRouter(tags=["webhooks"])


class AssemblyAIWebhookPayload(BaseModel):
transcript_id: str
status: str


@WebhooksRouter.post("/webhooks/assemblyai")
async def assemblyai_webhook_callback(
payload: AssemblyAIWebhookPayload,
request: Request,
) -> dict[str, str]:
"""Handle AssemblyAI transcript completion callbacks."""
settings = get_settings()
expected_secret = settings.transcription.assemblyai_webhook_secret
if not expected_secret:
raise HTTPException(status_code=503, detail="AssemblyAI webhook secret is not configured")

received_secret = request.headers.get("X-AssemblyAI-Webhook-Secret", "")
if not hmac.compare_digest(received_secret, expected_secret):
logger.warning("AssemblyAI webhook auth failed for transcript %s", payload.transcript_id)
raise HTTPException(status_code=401, detail="Invalid webhook secret")

logger.info(
"AssemblyAI webhook received (transcript_id=%s status=%s)",
payload.transcript_id,
payload.status,
)

from dembrane.coordination import (
get_assemblyai_webhook_metadata,
delete_assemblyai_webhook_metadata,
mark_assemblyai_webhook_processing,
clear_assemblyai_webhook_processing,
)

metadata = get_assemblyai_webhook_metadata(payload.transcript_id)
if not metadata:
logger.warning(
"AssemblyAI webhook ignored: metadata missing for transcript %s",
payload.transcript_id,
)
return {"status": "ignored"}

if not mark_assemblyai_webhook_processing(payload.transcript_id):
logger.info(
"AssemblyAI webhook duplicate/in-flight ignored for transcript %s",
payload.transcript_id,
)
return {"status": "ignored"}

try:
chunk_id = metadata["chunk_id"]
conversation_id = metadata["conversation_id"]
normalized_status = payload.status.lower()

if normalized_status == "error":
from dembrane.tasks import _on_chunk_transcription_done
from dembrane.transcribe import _save_chunk_error

_save_chunk_error(chunk_id, f"AssemblyAI error for transcript {payload.transcript_id}")
_on_chunk_transcription_done(conversation_id, chunk_id, logger)
delete_assemblyai_webhook_metadata(payload.transcript_id)
return {"status": "error_handled"}

if normalized_status == "completed":
from dembrane.tasks import task_correct_transcript
from dembrane.transcribe import _save_transcript, fetch_assemblyai_result

transcript_text, full_response = fetch_assemblyai_result(payload.transcript_id)
anonymize_transcripts = bool(metadata.get("anonymize_transcripts", False))

if not anonymize_transcripts:
_save_transcript(
chunk_id,
transcript_text,
diarization={
"schema": "Dembrane-25-09-assemblyai-partial",
"data": full_response,
},
)

task_correct_transcript.send(
chunk_id=chunk_id,
conversation_id=conversation_id,
audio_file_uri=metadata["audio_file_uri"],
candidate_transcript=transcript_text,
hotwords=metadata.get("hotwords"),
use_pii_redaction=bool(metadata.get("use_pii_redaction", False)),
custom_guidance_prompt=metadata.get("custom_guidance_prompt"),
assemblyai_response=full_response,
anonymize_transcripts=anonymize_transcripts,
)

delete_assemblyai_webhook_metadata(payload.transcript_id)
return {"status": "ok"}

logger.warning(
"AssemblyAI webhook ignored unknown status %s for transcript %s",
payload.status,
payload.transcript_id,
)
return {"status": "ignored"}
finally:
clear_assemblyai_webhook_processing(payload.transcript_id)
104 changes: 104 additions & 0 deletions echo/server/dembrane/coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
transcriptions completed.
"""

import json
from typing import Any
from logging import getLogger

Expand Down Expand Up @@ -476,3 +477,106 @@ def cleanup_conversation_coordination(conversation_id: str) -> None:
logger.debug(f"Cleaned up {deleted} coordination keys for {conversation_id}")
finally:
client.close()


# ------------------------------------------------------------------------------
# AssemblyAI Webhook Coordination
# ------------------------------------------------------------------------------

_AAI_WEBHOOK_TTL_SECONDS = 60 * 60 * 24 # 24 hours
_AAI_WEBHOOK_PROCESSING_TTL_SECONDS = 60 * 10 # 10 minutes


def _assemblyai_webhook_key(transcript_id: str) -> str:
return f"{_KEY_PREFIX}:aai_transcript:{transcript_id}"


def _assemblyai_webhook_processing_key(transcript_id: str) -> str:
return f"{_KEY_PREFIX}:aai_transcript_processing:{transcript_id}"


def store_assemblyai_webhook_metadata(
transcript_id: str,
chunk_id: str,
conversation_id: str,
audio_file_uri: str,
language: str | None,
hotwords: list[str] | None,
use_pii_redaction: bool,
custom_guidance_prompt: str | None,
anonymize_transcripts: bool,
) -> None:
"""Store metadata for webhook callback processing."""
client = _get_sync_redis_client()
key = _assemblyai_webhook_key(transcript_id)
payload = {
"chunk_id": chunk_id,
"conversation_id": conversation_id,
"audio_file_uri": audio_file_uri,
"language": language,
"hotwords": hotwords or [],
"use_pii_redaction": use_pii_redaction,
"custom_guidance_prompt": custom_guidance_prompt,
"anonymize_transcripts": anonymize_transcripts,
}

try:
client.set(key, json.dumps(payload))
client.expire(key, _AAI_WEBHOOK_TTL_SECONDS)
logger.debug(
"Stored AssemblyAI webhook metadata for transcript %s chunk %s",
transcript_id,
chunk_id,
)
finally:
client.close()


def get_assemblyai_webhook_metadata(transcript_id: str) -> dict[str, Any] | None:
"""Fetch AssemblyAI webhook metadata by transcript ID."""
client = _get_sync_redis_client()
key = _assemblyai_webhook_key(transcript_id)

try:
raw = client.get(key)
if not raw:
return None
return json.loads(raw)
finally:
client.close()


def delete_assemblyai_webhook_metadata(transcript_id: str) -> None:
"""Delete webhook metadata after terminal handling."""
client = _get_sync_redis_client()
key = _assemblyai_webhook_key(transcript_id)

try:
client.delete(key)
finally:
client.close()


def mark_assemblyai_webhook_processing(transcript_id: str) -> bool:
"""Acquire a per-transcript webhook processing lock."""
client = _get_sync_redis_client()
key = _assemblyai_webhook_processing_key(transcript_id)

try:
was_set = client.setnx(key, "1")
if was_set:
client.expire(key, _AAI_WEBHOOK_PROCESSING_TTL_SECONDS)
return bool(was_set)
finally:
client.close()


def clear_assemblyai_webhook_processing(transcript_id: str) -> None:
"""Release a per-transcript webhook processing lock."""
client = _get_sync_redis_client()
key = _assemblyai_webhook_processing_key(transcript_id)

try:
client.delete(key)
finally:
client.close()
21 changes: 21 additions & 0 deletions echo/server/dembrane/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,22 @@ class TranscriptionSettings(BaseSettings):
alias="ASSEMBLYAI_BASE_URL",
validation_alias=AliasChoices("ASSEMBLYAI_BASE_URL", "TRANSCRIPTION__ASSEMBLYAI__BASE_URL"),
)
assemblyai_webhook_url: Optional[str] = Field(
default=None,
alias="ASSEMBLYAI_WEBHOOK_URL",
validation_alias=AliasChoices(
"ASSEMBLYAI_WEBHOOK_URL",
"TRANSCRIPTION__ASSEMBLYAI__WEBHOOK_URL",
),
)
assemblyai_webhook_secret: Optional[str] = Field(
default=None,
alias="ASSEMBLYAI_WEBHOOK_SECRET",
validation_alias=AliasChoices(
"ASSEMBLYAI_WEBHOOK_SECRET",
"TRANSCRIPTION__ASSEMBLYAI__WEBHOOK_SECRET",
),
)
litellm_model: Optional[str] = Field(
default=None,
alias="LITELLM_TRANSCRIPTION_MODEL",
Expand Down Expand Up @@ -524,6 +540,11 @@ def ensure_valid(self) -> None:
raise ValueError(
"GCP_SA_JSON must be provided when TRANSCRIPTION_PROVIDER=Dembrane-25-09"
)
if self.assemblyai_webhook_url and not self.assemblyai_webhook_secret:
raise ValueError(
"ASSEMBLYAI_WEBHOOK_SECRET must be set when "
"ASSEMBLYAI_WEBHOOK_URL is configured for TRANSCRIPTION_PROVIDER=Dembrane-25-09"
)


class AppSettings:
Expand Down
Loading