Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions examples/voice_agents/error_callback.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import asyncio
import logging
import os
import pathlib

from dotenv import load_dotenv

from livekit.agents import AgentServer, JobContext, cli, inference
from livekit.agents.utils.audio import audio_frames_from_file
from livekit.agents.voice import Agent, AgentSession
from livekit.agents.voice.events import CloseEvent, ErrorEvent
from livekit.rtc import ParticipantKind
Expand All @@ -29,24 +27,22 @@ async def entrypoint(ctx: JobContext):
stt=inference.STT("deepgram/nova-3"),
llm=inference.LLM("openai/gpt-4.1-mini"),
tts=inference.TTS("cartesia/sonic-3"),
# play a pre-recorded file (or any AudioSource) before the session closes on an
# unrecoverable error; bypasses TTS, so it's still heard when TTS is the failed
# resource. A non-file str is synthesized through TTS instead.
unrecoverable_error_message=str(pathlib.Path(__file__).parent / "error_message.ogg"),
)

custom_error_audio = os.path.join(pathlib.Path(__file__).parent.absolute(), "error_message.ogg")

# Advanced path: for full control (e.g. continuing the conversation on recoverable
# errors, or triggering a SIP transfer) handle the "error" event yourself. This runs
# in addition to the built-in unrecoverable_error_message above.
@session.on("error")
def on_error(ev: ErrorEvent):
if ev.error.recoverable:
return

logger.info(f"session is closing due to unrecoverable error {ev.error}")

# To bypass the TTS service in case it's unavailable, we use a custom audio file instead
session.say(
"I'm having trouble connecting right now. Let me transfer your call.",
audio=audio_frames_from_file(custom_error_audio),
allow_interruptions=False,
)

# If want to continue the conversation, we can set the recoverable to True

# TTS and LLM errors can be marked as recoverable
Expand Down
4 changes: 3 additions & 1 deletion livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@
AMDCategory,
AMDPredictionEvent,
)
from .voice.background_audio import AudioConfig, BackgroundAudioPlayer, BuiltinAudioClip, PlayHandle
from .voice.audio_source import AudioSource, BuiltinAudioClip
from .voice.background_audio import AudioConfig, BackgroundAudioPlayer, PlayHandle
from .voice.room_io import RoomInputOptions, RoomIO, RoomOutputOptions
from .voice.run_result import (
AgentHandoffEvent,
Expand Down Expand Up @@ -220,6 +221,7 @@ def __getattr__(name: str) -> typing.Any:
"DEFAULT_API_CONNECT_OPTIONS",
"BackgroundAudioPlayer",
"BuiltinAudioClip",
"AudioSource",
"AudioConfig",
"PlayHandle",
"FlushSentinel",
Expand Down
28 changes: 22 additions & 6 deletions livekit-agents/livekit/agents/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,33 @@ class APIError(Exception):
"""

retryable: bool = False
"""Whether the error can be retried."""
"""Whether the error can be retried (within the request's retry loop)."""

def __init__(self, message: str, *, body: object | None = None, retryable: bool = True) -> None:
terminal: bool = False
"""Whether the error is terminal — it will fail identically on every turn, so
callers should surface it immediately rather than absorbing it under a
transient-error tolerance (e.g. ``AgentSession``'s ``max_unrecoverable_errors``)."""

def __init__(
self,
message: str,
*,
body: object | None = None,
retryable: bool = True,
terminal: bool = False,
) -> None:
super().__init__(message)

self.message = message
self.body = body
self.retryable = retryable
self.retryable = retryable and not terminal
self.terminal = terminal

def __str__(self) -> str:
return self.message

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.message!r}, body={self.body!r}, retryable={self.retryable!r})"
return f"{self.__class__.__name__}({self.message!r}, body={self.body!r}, retryable={self.retryable!r}, terminal={self.terminal!r})"


class APIStatusError(APIError):
Expand All @@ -62,6 +75,7 @@ def __init__(
request_id: str | None = None,
body: object | None = None,
retryable: bool | None = None,
terminal: bool = False,
) -> None:
if retryable is None:
retryable = True
Expand All @@ -73,7 +87,7 @@ def __init__(
if 400 <= status_code < 500 and status_code not in (408, 429, 499):
retryable = False

super().__init__(message, body=body, retryable=retryable)
super().__init__(message, body=body, retryable=retryable, terminal=terminal)

self.status_code = status_code
self.request_id = request_id
Expand All @@ -83,6 +97,7 @@ def __str__(self) -> str:
f"message={self.message!r}",
f"status_code={self.status_code}",
f"retryable={self.retryable}",
f"terminal={self.terminal}",
]
if self.request_id:
parts.append(f"request_id={self.request_id}")
Expand All @@ -96,7 +111,8 @@ def __repr__(self) -> str:
f"status_code={self.status_code!r}, "
f"request_id={self.request_id!r}, "
f"body={self.body!r}, "
f"retryable={self.retryable!r})"
f"retryable={self.retryable!r}, "
f"terminal={self.terminal!r})"
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
from ...voice import room_io
from ...voice.agent import Agent, AgentTask
from ...voice.agent_session import AgentSession
from ...voice.audio_source import AudioSource, BuiltinAudioClip
from ...voice.background_audio import (
AudioConfig,
AudioSource,
BackgroundAudioPlayer,
BuiltinAudioClip,
PlayHandle,
)
from .utils import InstructionParts
Expand Down
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/inference/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._exceptions import InferenceQuotaExceededError
from .eot import TurnDetector, TurnDetectorModels, TurnDetectorVersions
from .interruption import (
AdaptiveInterruptionDetector,
Expand All @@ -22,6 +23,7 @@
"VADModels",
"AdaptiveInterruptionDetector",
"InterruptionDetectionError",
"InferenceQuotaExceededError",
"OverlappingSpeechEvent",
"InterruptionDataFrameType",
"TurnDetector",
Expand Down
140 changes: 140 additions & 0 deletions livekit-agents/livekit/agents/inference/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from __future__ import annotations

from .._exceptions import APIStatusError

_INFERENCE_QUOTA_EXCEEDED_TYPE = "inference_quota_exceeded"
"""Value of the ``type`` field in a LiveKit Inference 429 quota response body."""

_TERMINAL_QUOTA_CATEGORIES = frozenset(
{"MaxGatewayCredits", "MaxBargeInRequests", "MaxEotRequests"}
)


def _str_or_none(value: object) -> str | None:
"""Coerce an untrusted JSON field to ``str``; non-str values become ``None``."""
return value if isinstance(value, str) else None


class InferenceQuotaExceededError(APIStatusError):
"""Raised when the LiveKit Inference gateway rejects a request because a usage quota
or rate limit has been exhausted.

The gateway answers an exhausted project with ``HTTP 429`` and a structured JSON
body (``type == "inference_quota_exceeded"``). This error surfaces the fields of that
body directly so callers can log the quota state or forward it to their frontend
instead of leaving the agent silent.

The gateway uses this single ``type`` for two different conditions, told apart by
``category``:

* **Credit/quota exhaustion** (``MaxGatewayCredits``, ``MaxBargeInRequests``,
``MaxEotRequests``) — recovers only at the next billing cycle, so it is
:attr:`terminal` and ``retryable=False``.
* **Rate / concurrency limits** (e.g. ``MaxConcurrentGatewayLLMRpm`` / ``…Tpm``) —
recover within ~a minute via backoff, so they stay ``retryable=True`` and
non-terminal (they fall through the usual transient-error handling).

``retryable`` / ``terminal`` are derived from ``category`` automatically; pass them
explicitly to override.

On a terminal quota error, ``AgentSession`` by default speaks a generic,
provider-agnostic message and closes on the first occurrence (see
``AgentSession(unrecoverable_error_message=...)``); transient variants go through the
normal retry/tolerance path. The gateway ``hint`` is never spoken — quota details
aren't surfaced to end users. Subscribe to ``error`` when you need the structured
fields, e.g. to log the quota state or forward an "out of credits" state to your
frontend. ``ErrorEvent.error`` is the ``LLMError``/``STTError``/… wrapper, so the
underlying exception is at ``ev.error.error``:

Example:
```python
from livekit.agents import ErrorEvent
from livekit.agents.inference import InferenceQuotaExceededError


@session.on("error")
def _on_error(ev: ErrorEvent) -> None:
err = ev.error.error
if isinstance(err, InferenceQuotaExceededError):
logger.warning("inference quota exceeded: %s (%s)", err.hint, err.quota_type)
```
"""

quota_type: str | None
"""Quota resource: ``"llm"``, ``"stt"``, ``"tts"``, ``"bargein"`` or ``"eot"``."""

category: str | None
"""Quota error category. Credit-exhaustion categories (``"MaxGatewayCredits"``,
``"MaxBargeInRequests"``, ``"MaxEotRequests"``) are terminal; rate-limit variants
such as ``"MaxConcurrentGatewayLLMRpm"`` are transient."""

hint: str | None
"""Human-readable explanation from the error."""

remaining_limit: str | None
"""Remaining quota for ``quota_type`` as reported by the gateway; ``"0"`` when
fully exhausted. An opaque diagnostic string (not guaranteed numeric)."""

def __init__(
self,
message: str,
*,
status_code: int = 429,
request_id: str | None = None,
body: object | None = None,
retryable: bool | None = None,
terminal: bool | None = None,
quota_type: str | None = None,
category: str | None = None,
hint: str | None = None,
remaining_limit: str | None = None,
) -> None:
if isinstance(body, dict):
if quota_type is None:
quota_type = _str_or_none(body.get("quota_type"))
if category is None:
category = _str_or_none(body.get("category"))
if hint is None:
hint = _str_or_none(body.get("hint"))
if remaining_limit is None:
remaining_limit = _str_or_none(body.get("remaining_limit"))

is_credit_exhaustion = category in _TERMINAL_QUOTA_CATEGORIES
if terminal is None:
terminal = is_credit_exhaustion
if retryable is None:
retryable = not is_credit_exhaustion

super().__init__(
message,
status_code=status_code,
request_id=request_id,
body=body,
retryable=retryable,
terminal=terminal,
)

self.quota_type = quota_type
self.category = category
self.hint = hint
self.remaining_limit = remaining_limit

@classmethod
def from_response(
cls,
message: str,
*,
status_code: int = 429,
request_id: str | None = None,
body: object | None = None,
) -> InferenceQuotaExceededError | None:
"""Build an :class:`InferenceQuotaExceededError` from a response body, or return
``None`` if the body isn't a LiveKit Inference quota-exceeded payload.

Lets plugins centralize quota detection: pass the decoded JSON body and
raise the result when it isn't ``None``.
"""
if not (isinstance(body, dict) and body.get("type") == _INFERENCE_QUOTA_EXCEEDED_TYPE):
return None

return cls(message, status_code=status_code, request_id=request_id, body=body)
24 changes: 23 additions & 1 deletion livekit-agents/livekit/agents/inference/llm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib
import os
from dataclasses import dataclass
from typing import Any, Literal, cast
Expand All @@ -21,13 +22,18 @@
from typing_extensions import TypedDict

from .. import llm
from .._exceptions import APIConnectionError, APIStatusError, APITimeoutError
from .._exceptions import (
APIConnectionError,
APIStatusError,
APITimeoutError,
)
from ..llm import ToolChoice, utils as llm_utils
from ..llm.chat_context import ChatContext
from ..llm.tool_context import Tool
from ..log import logger
from ..types import DEFAULT_API_CONNECT_OPTIONS, NOT_GIVEN, APIConnectOptions, NotGivenOr
from ..utils import is_given
from ._exceptions import InferenceQuotaExceededError
from ._utils import (
HEADER_INFERENCE_PRIORITY,
HEADER_INFERENCE_PROVIDER,
Expand Down Expand Up @@ -454,6 +460,22 @@ async def _run(self) -> None:
except openai.APITimeoutError:
raise APITimeoutError(retryable=retryable) from None
except openai.APIStatusError as e:
# OpenAI's SDK stores only body["error"] on the exception; for gateway quota
# payloads that's a bare string, dropping the `type`/`category` fields we need,
# so re-parse the raw response to recover the full JSON body.
body: object = e.body
if not isinstance(body, dict):
with contextlib.suppress(Exception):
body = e.response.json()

if quota_error := InferenceQuotaExceededError.from_response(
e.message,
status_code=e.status_code,
request_id=e.request_id,
body=body,
):
raise quota_error from None

raise APIStatusError(
e.message,
status_code=e.status_code,
Expand Down
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/inference/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,9 @@ async def _connect_ws(
params["type"] = "session.create"
await ws.send_str(json.dumps(params))
except aiohttp.ClientResponseError as e:
# aiohttp discards the failed-handshake body, so we can't pass the quota
# JSON to body=; a quota 429 stays a plain APIStatusError here. Without the
# body's `category` it can't be classified anyway. Typing it is future work.
raise create_api_error_from_http(e.message, status=e.status) from e
except asyncio.TimeoutError as e:
raise APITimeoutError("LiveKit Inference STT connection timed out.") from e
Expand Down
6 changes: 6 additions & 0 deletions livekit-agents/livekit/agents/inference/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
timeout,
)
except aiohttp.ClientResponseError as e:
# aiohttp discards the failed-handshake body, so we can't pass the quota
# JSON to body=; a quota 429 stays a plain APIStatusError here. Without the
# body's `category` it can't be classified anyway. Typing it is future work.
raise create_api_error_from_http(e.message, status=e.status) from e
except asyncio.TimeoutError as e:
raise APITimeoutError("LiveKit Inference TTS connection timed out.") from e
Expand Down Expand Up @@ -734,6 +737,9 @@ async def _recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
raise APITimeoutError() from None

except aiohttp.ClientResponseError as e:
# No body= for the same reason as the connect path above (see :496): aiohttp
# discards the failed-response body, so a quota 429 stays a plain
# APIStatusError. Typing it is future work.
raise create_api_error_from_http(e.message, status=e.status) from None

except APIError:
Expand Down
Loading