diff --git a/livekit-api/livekit/api/_dial_timeout.py b/livekit-api/livekit/api/_dial_timeout.py new file mode 100644 index 00000000..a703b204 --- /dev/null +++ b/livekit-api/livekit/api/_dial_timeout.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import Optional, Union + +from livekit.protocol.connector_whatsapp import AcceptWhatsAppCallRequest +from livekit.protocol.sip import CreateSIPParticipantRequest, TransferSIPParticipantRequest + +# Requests that carry wait_until_answered / ringing_timeout and share the +# phone-dialing timeout behavior. +DialRequest = Union[ + CreateSIPParticipantRequest, + TransferSIPParticipantRequest, + AcceptWhatsAppCallRequest, +] +"""@private""" + +# Ring window (seconds) assumed when a request doesn't set ringing_timeout; +# matches the server default. A dialing request must outlast it. +DEFAULT_RINGING_TIMEOUT = 30.0 +"""@private""" + +# A dialing request must outlast the ringing window, or it would abort before +# the call can be answered. Keep the request timeout at least this many seconds +# above the ringing timeout. +RINGING_TIMEOUT_MARGIN = 2.0 +"""@private""" + + +def pin_ringing_timeout(request: DialRequest) -> None: + """Set the ring window explicitly on a dialing request when the caller left it + unset, so the derived request timeout doesn't depend on the server's default + (which could change out from under us). + + @private + """ + if not request.HasField("ringing_timeout"): + request.ringing_timeout.seconds = int(DEFAULT_RINGING_TIMEOUT) + + +def dial_timeout(user_timeout: Optional[float], request: DialRequest) -> float: + """Request timeout (seconds) for a phone-dialing call: the ring window plus a + margin, so the request doesn't abort before the call can be answered. The + ring window is the request's ringing_timeout when set, else + DEFAULT_RINGING_TIMEOUT. A longer user_timeout is honored; a shorter one is + raised to the floor. + + @private + """ + if request.HasField("ringing_timeout"): + ring: float = request.ringing_timeout.seconds + else: + ring = DEFAULT_RINGING_TIMEOUT + floor = ring + RINGING_TIMEOUT_MARGIN + return max(user_timeout if user_timeout else floor, floor) diff --git a/livekit-api/livekit/api/_failover.py b/livekit-api/livekit/api/_failover.py index 49d239ed..4babb74c 100644 --- a/livekit-api/livekit/api/_failover.py +++ b/livekit-api/livekit/api/_failover.py @@ -31,16 +31,28 @@ FAILOVER_MAX_ATTEMPTS = 3 FAILOVER_BACKOFF_BASE = 0.2 # seconds - - -def failover_attempts(enabled: bool, host: Optional[str], force: bool = False) -> int: +# Below this per-request timeout (seconds) a retry is unlikely to help and many +# clients would retry in lockstep across regions, so a short request gets a +# single attempt (thundering-herd guard). +MIN_FAILOVER_TIMEOUT = 5.0 + + +def failover_attempts( + enabled: bool, + host: Optional[str], + force: bool = False, + timeout: Optional[float] = None, +) -> int: """Total request attempts for a host; 1 means no failover. Failover only - engages when enabled and the host is a LiveKit Cloud domain. ``force`` - bypasses the cloud-host check and is for internal testing only. + engages when enabled, the host is a LiveKit Cloud domain, and the request + timeout is long enough to retry. ``force`` bypasses the cloud-host check and + is for internal testing only. """ - if enabled and (force or (host is not None and is_cloud(host))): - return FAILOVER_MAX_ATTEMPTS - return 1 + if not (enabled and (force or (host is not None and is_cloud(host)))): + return 1 + if timeout is not None and 0 < timeout < MIN_FAILOVER_TIMEOUT: + return 1 + return FAILOVER_MAX_ATTEMPTS def is_cloud(host: str) -> bool: diff --git a/livekit-api/livekit/api/connector_service.py b/livekit-api/livekit/api/connector_service.py index d5eaf2a1..2c464bde 100644 --- a/livekit-api/livekit/api/connector_service.py +++ b/livekit-api/livekit/api/connector_service.py @@ -1,6 +1,7 @@ from __future__ import annotations import aiohttp +from typing import Optional from livekit.protocol.connector_whatsapp import ( DialWhatsAppCallRequest, @@ -17,6 +18,7 @@ ConnectTwilioCallResponse, ) from ._service import Service +from ._dial_timeout import dial_timeout, pin_ringing_timeout from .access_token import VideoGrants SVC = "Connector" @@ -106,23 +108,41 @@ async def connect_whatsapp_call( ) async def accept_whatsapp_call( - self, request: AcceptWhatsAppCallRequest + self, + request: AcceptWhatsAppCallRequest, + *, + timeout: Optional[float] = None, ) -> AcceptWhatsAppCallResponse: """ Accept an inbound WhatsApp call Args: request: AcceptWhatsAppCallRequest containing call parameters and SDP + timeout: Optional request timeout in seconds. When the request waits + for an answer (wait_until_answered), it defaults to a longer value + (dialing takes time) and is raised, if needed, to stay above the + request's ringing_timeout. Returns: AcceptWhatsAppCallResponse with the room name """ + client_timeout: Optional[aiohttp.ClientTimeout] = None + if request.wait_until_answered: + # Waiting for the call to be answered dials a phone, which takes + # longer than a normal request and must outlast ringing. Pin the ring + # window so the timeout doesn't depend on the server's default. + pin_ringing_timeout(request) + client_timeout = aiohttp.ClientTimeout(total=dial_timeout(timeout, request)) + elif timeout: + client_timeout = aiohttp.ClientTimeout(total=timeout) + return await self._client.request( SVC, "AcceptWhatsAppCall", request, self._auth_header(VideoGrants(room_create=True)), AcceptWhatsAppCallResponse, + timeout=client_timeout, ) async def connect_twilio_call( diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py index bc0fcfaf..5973a918 100644 --- a/livekit-api/livekit/api/livekit_api.py +++ b/livekit-api/livekit/api/livekit_api.py @@ -39,7 +39,7 @@ def __init__( url: LiveKit server URL (read from `LIVEKIT_URL` environment variable if not provided) api_key: API key (read from `LIVEKIT_API_KEY` environment variable if not provided) api_secret: API secret (read from `LIVEKIT_API_SECRET` environment variable if not provided) - timeout: Request timeout (default: 60 seconds) + timeout: Request timeout (default: 10 seconds) session: aiohttp.ClientSession instance to use for requests, if not provided, a new one will be created """ url = url or os.getenv("LIVEKIT_URL") @@ -57,7 +57,7 @@ def __init__( if not self._session: self._custom_session = False if not timeout: - timeout = aiohttp.ClientTimeout(total=60) + timeout = aiohttp.ClientTimeout(total=10) self._session = aiohttp.ClientSession(timeout=timeout) self._room = RoomService(self._session, url, api_key, api_secret, failover) diff --git a/livekit-api/livekit/api/sip_service.py b/livekit-api/livekit/api/sip_service.py index 36d215f3..6ad7c483 100644 --- a/livekit-api/livekit/api/sip_service.py +++ b/livekit-api/livekit/api/sip_service.py @@ -35,6 +35,7 @@ SIPTransport, ) from ._service import Service +from ._dial_timeout import dial_timeout as _dial_timeout, pin_ringing_timeout as _pin_ringing_timeout from .access_token import VideoGrants, SIPGrants SVC = "SIP" @@ -781,17 +782,15 @@ async def create_sip_participant( SIPError: If the SIP operation fails """ client_timeout: Optional[aiohttp.ClientTimeout] = None - if timeout: - # obay user specified timeout + if create.wait_until_answered: + # Dialing a phone and waiting for an answer takes longer than a + # normal call, and the request must outlast ringing. Pin the ring + # window so the timeout doesn't depend on the server's default. + _pin_ringing_timeout(create) + client_timeout = aiohttp.ClientTimeout(total=_dial_timeout(timeout, create)) + elif timeout: + # obey user specified timeout client_timeout = aiohttp.ClientTimeout(total=timeout) - elif create.wait_until_answered: - # ensure default timeout isn't too short when using sync mode - if ( - self._client._session.timeout - and self._client._session.timeout.total - and self._client._session.timeout.total < 20 - ): - client_timeout = aiohttp.ClientTimeout(total=20) if trunk_id: create.sip_trunk_id = trunk_id @@ -809,16 +808,27 @@ async def create_sip_participant( ) async def transfer_sip_participant( - self, transfer: TransferSIPParticipantRequest + self, + transfer: TransferSIPParticipantRequest, + *, + timeout: Optional[float] = None, ) -> SIPParticipantInfo: """Transfer a SIP participant to a different room. Args: transfer: Request containing transfer details + timeout: Optional request timeout in seconds. Transferring dials a + phone, which takes longer than normal, so it defaults to a + longer timeout when unset. Returns: Updated SIP participant information """ + # Transferring a call dials a phone, which takes longer than a normal + # call, so keep the request alive past ringing. Pin the ring window so the + # timeout doesn't depend on the server's default. + _pin_ringing_timeout(transfer) + client_timeout = aiohttp.ClientTimeout(total=_dial_timeout(timeout, transfer)) return await self._client.request( SVC, "TransferSIPParticipant", @@ -831,6 +841,7 @@ async def transfer_sip_participant( sip=SIPGrants(call=True), ), SIPParticipantInfo, + timeout=client_timeout, ) def _admin_headers(self) -> dict[str, str]: diff --git a/livekit-api/livekit/api/twirp_client.py b/livekit-api/livekit/api/twirp_client.py index 1c8d9966..8eba94fd 100644 --- a/livekit-api/livekit/api/twirp_client.py +++ b/livekit-api/livekit/api/twirp_client.py @@ -150,8 +150,15 @@ async def request( headers["Content-Type"] = "application/protobuf" serialized_data = data.SerializeToString() + # The effective per-attempt timeout is the per-call override, or the + # session default; used to gate failover for short requests. + effective_timeout = timeout.total if timeout else None + if effective_timeout is None and self._session.timeout is not None: + effective_timeout = self._session.timeout.total host = urlparse(self._origin).hostname - max_attempts = failover_attempts(self._failover, host, self._failover_force) + max_attempts = failover_attempts( + self._failover, host, self._failover_force, effective_timeout + ) attempted = {host_key(self._origin)} region_origins: Optional[List[str]] = None current_origin = self._origin