Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 110 additions & 17 deletions livekit-agents/livekit/agents/voice/amd/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
class AMDCategory(str, Enum):
HUMAN = "human"
MACHINE_IVR = "machine-ivr"
MACHINE_SCREENING = "machine-screening"
MACHINE_VM = "machine-vm"
MACHINE_UNAVAILABLE = "machine-unavailable"
UNCERTAIN = "uncertain"
Expand All @@ -40,6 +41,13 @@ class AMDPredictionEvent(BaseModel):
reason: str
transcript: str
delay: float
screening_detected: bool = False
"""Whether a call-screening prompt was encountered before this verdict."""
message_playback: Literal["played", "interrupted", "not_played"] = "not_played"
"""Outcome of the predefined message AMD auto-played for this verdict path
(``voicemail_message`` for a voicemail, ``screening_message`` for a human reached
after screening). ``not_played`` when no message applied or none was supplied;
``interrupted`` when the callee barged in before playout finished."""

@property
def is_human(self) -> bool:
Expand All @@ -49,6 +57,7 @@ def is_human(self) -> bool:
def is_machine(self) -> bool:
return self.category in (
AMDCategory.MACHINE_IVR,
AMDCategory.MACHINE_SCREENING,
AMDCategory.MACHINE_VM,
AMDCategory.MACHINE_UNAVAILABLE,
)
Expand All @@ -60,6 +69,7 @@ def is_machine(self) -> bool:

human: A person answered (e.g., "Hello?", "This is John.").
machine-ivr: A prompt to press a key (e.g., "Press 1 to continue").
machine-screening: A call-screening prompt that asks the caller to identify themselves or state their reason before being connected (e.g., "Please state your name and why you're calling").
machine-vm: A voicemail greeting where leaving a message IS possible.
machine-unavailable: Any greeting indicating it's NOT possible to leave message, eg because mailbox is full, not setup, etc.
uncertain: For partial transcripts that are ambiguous.
Expand All @@ -75,8 +85,7 @@ def is_machine(self) -> bool:
Output: machine-ivr

Input: "Please state your name and why you're calling, and I will check if the person is available"
Output: machine-ivr
Note: this should apply for any call screening prompts.
Output: machine-screening

Input: "I'm away from my desk. If you leave a message, I will get back to you."
Output: machine-vm
Expand Down Expand Up @@ -148,6 +157,7 @@ def __init__(
self._emitted = False
self._transcript = ""
self._extension_count = 0
self._amd_stt_seen = False

def start_detection_timer(self) -> None:
"""Arm the overall detection-timeout budget."""
Expand Down Expand Up @@ -382,9 +392,21 @@ def push_text(self, text: str, source: str = "stt") -> None:
if self._input_ch.closed:
logger.debug("push_text called after close")
return
# ignore text from other sources (e.g. when both session and AMD have STT specified)
if source != self._source:
return
# pick session stt when it arrives before AMD stt
if self._source == "amd_stt" and source == "stt" and not self._amd_stt_seen:
logger.warning("amd: session STT won the transcript race, using session STT")
self._source = "stt"
else:
# ignore text from other sources if it is not faster
return

if source == "amd_stt":
self._amd_stt_seen = True

if self._speech_started_at is None:
# treat this as a VAD false negative
self._synthesize_short_utterance()

if self._silence_timer is not None and self._silence_timer_trigger == "short_speech":
self._silence_timer.cancel()
Expand Down Expand Up @@ -496,6 +518,89 @@ async def _run(transcript: str) -> None:
if run_atask is not None:
await aio.cancel_and_wait(run_atask)

def _synthesize_short_utterance(self) -> None:
"""Stand in for a missed VAD event when a transcript arrives with no speech bracket.

AMD's timing is VAD-driven, and VAD only drops utterances below its
``min_speech_duration`` — i.e. short ones — so a transcript with no preceding speech
event is almost certainly a short utterance VAD missed. Arm the short
``human_silence`` window so a human verdict can release quickly; the transcript is
still classified by the LLM (a short machine fragment like "press 1" stays gated on
end-of-turn), so this only speeds up the human case. Tagged ``long_speech`` and
fires ``_on_silence_reached`` (not the pre-baked HUMAN timeout) because we already
have a transcript to classify, and so ``push_text`` doesn't clobber it.
"""
now = time.time()
self._speech_started_at = now
self._speech_ended_at = now
if self._silence_timer is not None:
self._silence_timer.cancel()
self._silence_timer = asyncio.get_running_loop().call_later(
self._human_silence_threshold,
self._on_silence_reached,
)
self._silence_timer_trigger = "long_speech"
if self._eot_timer is not None:
self._eot_timer.cancel()
self._eot_timer = asyncio.get_running_loop().call_later(
self._max_endpointing_delay,
self._on_eot_reached,
)

def _cancel_timers(self) -> None:
"""Cancel and clear every per-turn timer."""
for timer in (
self._no_speech_timer,
self._silence_timer,
self._detection_timeout_timer,
self._eot_timer,
):
if timer is not None:
timer.cancel()
self._no_speech_timer = None
self._silence_timer = None
self._silence_timer_trigger = None
self._detection_timeout_timer = None
self._eot_timer = None

def switch_source(self, source: str) -> None:
"""Switch which transcript source the classifier consumes (one-way fallback)."""
self._source = source

async def reset(self) -> None:
"""Re-arm the classifier for the next detection turn.

Cancels the current classification and timers, clears the per-turn state and
rebuilds the verdict gate / input channel, then re-arms the detection budget and
re-opens the listening gate. Used by the detector between internal screening turns.
"""
if self._closed:
return

if self._classify_task is not None:
await aio.cancel_and_wait(self._classify_task)
self._classify_task = None

self._cancel_timers()

if self._input_ch.closed:
self._input_ch = aio.Chan()

self._verdict_result = None
self._verdict_ready = asyncio.Event()
self._speech_started_at = None
self._speech_ended_at = None
self._silence_reached = False
self._eot_reached = False
self._emitted = False
self._transcript = ""
self._extension_count = 0
self._amd_stt_seen = False

self._listening = False
self.start_detection_timer()
self.start_listening()

async def close(self) -> None:
if self._closed:
return
Expand All @@ -504,19 +609,7 @@ async def close(self) -> None:
if not self._input_ch.closed:
self._input_ch.close()

if self._no_speech_timer is not None:
self._no_speech_timer.cancel()
self._no_speech_timer = None
if self._silence_timer is not None:
self._silence_timer.cancel()
self._silence_timer = None
self._silence_timer_trigger = None
if self._detection_timeout_timer is not None:
self._detection_timeout_timer.cancel()
self._detection_timeout_timer = None
if self._eot_timer is not None:
self._eot_timer.cancel()
self._eot_timer = None
self._cancel_timers()

if self._classify_task is not None:
await aio.cancel_and_wait(self._classify_task)
Expand Down
Loading
Loading