From 7fdfaa82b36f150504ae3f3298f8b0e468a8cad8 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 17:15:49 +0200 Subject: [PATCH 01/13] feat: auto failover APIs with LK Cloud --- .github/workflows/test-api.yml | 64 ++++++ livekit-api/Cargo.toml | 2 +- livekit-api/src/lib.rs | 11 + livekit-api/src/region.rs | 86 ++++++++ livekit-api/src/services/agent_dispatch.rs | 7 + livekit-api/src/services/api_test.rs | 177 ++++++++++++++++ livekit-api/src/services/connector.rs | 7 + livekit-api/src/services/egress.rs | 7 + livekit-api/src/services/failover.rs | 224 +++++++++++++++++++++ livekit-api/src/services/ingress.rs | 7 + livekit-api/src/services/mod.rs | 4 + livekit-api/src/services/room.rs | 7 + livekit-api/src/services/sip.rs | 7 + livekit-api/src/services/twirp_client.rs | 121 ++++++++++- livekit-api/src/signal_client/region.rs | 65 +----- 15 files changed, 725 insertions(+), 71 deletions(-) create mode 100644 .github/workflows/test-api.yml create mode 100644 livekit-api/src/region.rs create mode 100644 livekit-api/src/services/api_test.rs create mode 100644 livekit-api/src/services/failover.rs diff --git a/.github/workflows/test-api.yml b/.github/workflows/test-api.yml new file mode 100644 index 000000000..2625940fc --- /dev/null +++ b/.github/workflows/test-api.yml @@ -0,0 +1,64 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Test API + +permissions: + contents: read + +on: + workflow_dispatch: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + failover: + runs-on: ubuntu-latest + services: + mock-server: + image: livekit/test-server:latest + ports: + - 9999:9999 + - 10000:10000 + - 10001:10001 + - 10002:10002 + steps: + - uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0 + with: + submodules: true + + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@46268bd060767258de96ed93c1251119784f2ab6 # v1.16.1 + with: + cache: false + rustflags: "" + + - name: Install Protoc + uses: arduino/setup-protoc@a8b67ba40b37d35169e222f3bb352603327985b6 # v2.1.0 + with: + version: "25.2" + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Wait for mock server + run: | + for i in $(seq 1 30); do + curl -sf http://127.0.0.1:9999/settings/regions >/dev/null && exit 0 + sleep 1 + done + echo "mock server did not become ready" && exit 1 + + - name: Run API tests + run: cargo test -p livekit-api --lib services::api_test -- --nocapture diff --git a/livekit-api/Cargo.toml b/livekit-api/Cargo.toml index b4253e91c..4487e4d76 100644 --- a/livekit-api/Cargo.toml +++ b/livekit-api/Cargo.toml @@ -45,7 +45,7 @@ __signal-client-async-compatible = [ ] -services-tokio = ["dep:reqwest"] +services-tokio = ["dep:reqwest", "dep:tokio", "tokio/time"] services-async = ["dep:isahc"] access-token = ["dep:jsonwebtoken", "dep:hmac", "dep:signature"] webhooks = ["access-token", "dep:serde_json", "dep:base64"] diff --git a/livekit-api/src/lib.rs b/livekit-api/src/lib.rs index 850772f4a..386bce68f 100644 --- a/livekit-api/src/lib.rs +++ b/livekit-api/src/lib.rs @@ -39,6 +39,17 @@ pub mod signal_client; ))] mod http_client; +// Region-discovery helpers shared by the signaling region provider +// (signal_client::region) and the API failover region cache (services::failover). +#[cfg(any( + feature = "signal-client-tokio", + feature = "signal-client-async", + feature = "signal-client-dispatcher", + feature = "services-tokio", + feature = "services-async" +))] +mod region; + #[cfg(feature = "webhooks")] pub mod webhooks; diff --git a/livekit-api/src/region.rs b/livekit-api/src/region.rs new file mode 100644 index 000000000..43dbbd7d7 --- /dev/null +++ b/livekit-api/src/region.rs @@ -0,0 +1,86 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region-discovery primitives shared by the two `/settings/regions` consumers: +//! the signaling region provider ([`crate::signal_client::region`]) and the API +//! failover region cache ([`crate::services::failover`]). +//! +//! Only the feature-independent pieces live here. The caches themselves are +//! deliberately separate: the signaling path keeps `wss://` URLs, de-duplicates +//! in-flight fetches and prunes failed regions, while the API path rewrites URLs +//! to `http(s)` and forwards the caller's request headers. The two are also in +//! independently compiled feature islands (the API SDK builds without the signal +//! client), so neither can depend on the other. + +use std::time::Duration; + +use serde::Deserialize; + +/// Response body of the LiveKit Cloud `/settings/regions` endpoint. Extra fields +/// (`region`, `distance`, …) are ignored; both consumers only need the URLs. A +/// body missing `regions` is rejected so a malformed discovery response surfaces +/// as an error rather than an empty region list. +#[derive(Debug, Deserialize)] +pub(crate) struct RegionsResponse { + pub regions: Vec, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct RegionInfo { + pub url: String, +} + +/// Reports whether `host` belongs to a LiveKit Cloud project — a +/// `*.livekit.cloud` or `*.livekit.run` subdomain. Region discovery and API +/// failover only engage for these hosts. +pub(crate) fn is_cloud_host(host: &str) -> bool { + host.ends_with(".livekit.cloud") || host.ends_with(".livekit.run") +} + +/// Parses the `max-age` directive (in seconds) out of a `Cache-Control` header +/// value, e.g. `"public, max-age=300"` -> `Some(300s)`. Returns `None` when the +/// directive is absent or unparseable. Directive names are case-insensitive. +pub(crate) fn parse_max_age(cache_control: &str) -> Option { + cache_control.split(',').find_map(|directive| { + let (name, value) = directive.split_once('=')?; + name.trim().eq_ignore_ascii_case("max-age").then_some(())?; + value.trim().parse::().ok().map(Duration::from_secs) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_cloud_host() { + assert!(is_cloud_host("myapp.livekit.cloud")); + assert!(is_cloud_host("myapp.region.livekit.cloud")); + assert!(is_cloud_host("myapp.livekit.run")); + assert!(!is_cloud_host("localhost")); + assert!(!is_cloud_host("example.com")); + assert!(!is_cloud_host("livekit.cloud.example.com")); + assert!(!is_cloud_host("127.0.0.1")); + } + + #[test] + fn test_parse_max_age() { + assert_eq!(parse_max_age("max-age=300"), Some(Duration::from_secs(300))); + assert_eq!(parse_max_age("public, max-age=300"), Some(Duration::from_secs(300))); + assert_eq!(parse_max_age("MAX-AGE=0, no-cache"), Some(Duration::ZERO)); + assert_eq!(parse_max_age("no-store"), None); + assert_eq!(parse_max_age("max-age=notanumber"), None); + assert_eq!(parse_max_age(""), None); + } +} diff --git a/livekit-api/src/services/agent_dispatch.rs b/livekit-api/src/services/agent_dispatch.rs index a2a0bd45d..45cb7b94b 100644 --- a/livekit-api/src/services/agent_dispatch.rs +++ b/livekit-api/src/services/agent_dispatch.rs @@ -41,6 +41,13 @@ impl AgentDispatchClient { Ok(Self::with_api_key(host, &api_key, &api_secret)) } + /// Enables or disables region failover (enabled by default). Failover only + /// engages for LiveKit Cloud hosts. + pub fn with_failover(mut self, enabled: bool) -> Self { + self.client = self.client.with_failover(enabled); + self + } + /// Creates an explicit dispatch for an agent to join a room. /// /// To use explicit dispatch, your agent must be registered with an `agent_name`. diff --git a/livekit-api/src/services/api_test.rs b/livekit-api/src/services/api_test.rs new file mode 100644 index 000000000..73b1bcb70 --- /dev/null +++ b/livekit-api/src/services/api_test.rs @@ -0,0 +1,177 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! API tests against the shared mock LiveKit API server (livekit/livekit +//! cmd/test-server). Point them at a running instance with LK_TEST_SERVER_URL +//! (default http://127.0.0.1:9999); they no-op when no server is reachable. In +//! CI the server is booted as a Docker container. +//! +//! See cmd/test-server/README.md for the X-Lk-Mock-* control protocol. These +//! tests drive TwirpClient::request() directly because the public service +//! methods do not expose per-call headers. + +use std::time::Duration; + +use http::header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION}; +use livekit_protocol as proto; + +use super::failover::FailoverConfig; +use super::twirp_client::{TwirpClient, TwirpError, TwirpResult}; +use super::LIVEKIT_PACKAGE; + +fn base_url() -> String { + std::env::var("LK_TEST_SERVER_URL").unwrap_or_else(|_| "http://127.0.0.1:9999".to_owned()) +} + +async fn reachable(base: &str) -> bool { + reqwest::Client::new() + .get(format!("{base}/settings/regions")) + .send() + .await + .map(|r| r.status().is_success()) + .unwrap_or(false) +} + +// `force` bypasses the cloud-host check (the mock is on 127.0.0.1) and the tiny +// backoff keeps tests fast — both are internal, test-only knobs. +fn config(enabled: bool, force: bool) -> FailoverConfig { + FailoverConfig { enabled, force, backoff_base: Duration::from_millis(1) } +} + +async fn call( + base: &str, + cfg: FailoverConfig, + directives: &[(&'static str, &str)], +) -> TwirpResult { + let client = TwirpClient::new(base, LIVEKIT_PACKAGE, None).with_failover_config(cfg); + let mut headers = HeaderMap::new(); + headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer test-token")); + for (k, v) in directives { + headers.insert(HeaderName::from_static(k), HeaderValue::from_str(v).unwrap()); + } + client + .request::( + "RoomService", + "CreateRoom", + proto::CreateRoomRequest::default(), + headers, + ) + .await +} + +macro_rules! skip_if_offline { + ($base:expr) => { + if !reachable(&$base).await { + eprintln!("skipping: mock test server not reachable at {}", $base); + return; + } + }; +} + +#[tokio::test] +async fn healthy() { + let base = base_url(); + skip_if_offline!(base); + call(&base, config(true, true), &[]).await.expect("healthy request should succeed"); +} + +#[tokio::test] +async fn primary_unavailable() { + let base = base_url(); + skip_if_offline!(base); + call(&base, config(true, true), &[("x-lk-mock-fail-regions", "0")]) + .await + .expect("should fail over to a healthy region"); +} + +#[tokio::test] +async fn two_regions_unavailable() { + let base = base_url(); + skip_if_offline!(base); + call(&base, config(true, true), &[("x-lk-mock-fail-regions", "0,1")]) + .await + .expect("should fail over to region 2 on the 3rd attempt"); +} + +#[tokio::test] +async fn all_unavailable() { + let base = base_url(); + skip_if_offline!(base); + let err = call(&base, config(true, true), &[("x-lk-mock-fail-regions", "0,1,2,3")]) + .await + .expect_err("all regions down should surface an error"); + assert!(matches!(err, TwirpError::Twirp(_))); +} + +#[tokio::test] +async fn client_error_not_retried() { + let base = base_url(); + skip_if_offline!(base); + let err = call( + &base, + config(true, true), + &[("x-lk-mock-fail-regions", "0"), ("x-lk-mock-fail-status", "400")], + ) + .await + .expect_err("a 4xx must be returned without failover"); + match err { + TwirpError::Twirp(code) => assert_eq!(code.code, "invalid_argument"), + other => panic!("expected a twirp error, got {other:?}"), + } +} + +#[tokio::test] +async fn transport_error_failover() { + let base = base_url(); + skip_if_offline!(base); + call( + &base, + config(true, true), + &[("x-lk-mock-fail-regions", "0"), ("x-lk-mock-fail-mode", "drop")], + ) + .await + .expect("a dropped connection should fail over to a healthy region"); +} + +#[tokio::test] +async fn region_discovery_unreachable() { + let base = base_url(); + skip_if_offline!(base); + call( + &base, + config(true, true), + &[("x-lk-mock-fail-regions", "0"), ("x-lk-mock-regions-status", "500")], + ) + .await + .expect_err("no fallback hosts means the original 5xx is surfaced"); +} + +#[tokio::test] +async fn not_cloud_host() { + let base = base_url(); + skip_if_offline!(base); + // Enabled but not forced; 127.0.0.1 is not a cloud host, so no failover. + call(&base, config(true, false), &[("x-lk-mock-fail-regions", "0")]) + .await + .expect_err("failover should be cloud-gated for a non-cloud host"); +} + +#[tokio::test] +async fn disabled() { + let base = base_url(); + skip_if_offline!(base); + call(&base, config(false, true), &[("x-lk-mock-fail-regions", "0")]) + .await + .expect_err("disabled failover should not retry"); +} diff --git a/livekit-api/src/services/connector.rs b/livekit-api/src/services/connector.rs index b5f0818f1..4a7535146 100644 --- a/livekit-api/src/services/connector.rs +++ b/livekit-api/src/services/connector.rs @@ -98,6 +98,13 @@ impl ConnectorClient { Ok(Self::with_api_key(host, &api_key, &api_secret)) } + /// Enables or disables region failover (enabled by default). Failover only + /// engages for LiveKit Cloud hosts. + pub fn with_failover(mut self, enabled: bool) -> Self { + self.client = self.client.with_failover(enabled); + self + } + /// Dials a WhatsApp call /// /// # Arguments diff --git a/livekit-api/src/services/egress.rs b/livekit-api/src/services/egress.rs index 4cfa71a24..ccd1b9114 100644 --- a/livekit-api/src/services/egress.rs +++ b/livekit-api/src/services/egress.rs @@ -121,6 +121,13 @@ impl EgressClient { Ok(Self::with_api_key(host, &api_key, &api_secret)) } + /// Enables or disables region failover (enabled by default). Failover only + /// engages for LiveKit Cloud hosts. + pub fn with_failover(mut self, enabled: bool) -> Self { + self.client = self.client.with_failover(enabled); + self + } + pub async fn start_room_composite_egress( &self, room: &str, diff --git a/livekit-api/src/services/failover.rs b/livekit-api/src/services/failover.rs new file mode 100644 index 000000000..518069bf6 --- /dev/null +++ b/livekit-api/src/services/failover.rs @@ -0,0 +1,224 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region failover for the Twirp API clients. +//! +//! On a retryable failure (any transport error or HTTP 5xx) the [`TwirpClient`] +//! discovers alternative LiveKit Cloud regions via `/settings/regions` and +//! replays the request against the next region, with exponential backoff. 4xx +//! responses are returned immediately. See [`TwirpClient::request`]. + +use std::{ + collections::HashMap, + sync::{Mutex, OnceLock}, + time::{Duration, Instant}, +}; + +use http::header::{HeaderMap, CONTENT_LENGTH, CONTENT_TYPE}; +use url::Url; + +use crate::region::{is_cloud_host, parse_max_age, RegionsResponse}; + +/// Total attempts (the original request plus fallback regions) and the base +/// retry backoff are fixed, not user-configurable, so retries can't be tuned to +/// values that could overwhelm the server. +pub(crate) const MAX_ATTEMPTS: u32 = 3; +pub(crate) const BACKOFF_BASE: Duration = Duration::from_millis(200); + +/// Internal region-failover configuration. The public API exposes only the +/// `enabled` toggle (default true); `force` and `backoff_base` are test-only. +#[derive(Debug, Clone, Copy)] +pub(crate) struct FailoverConfig { + pub enabled: bool, + /// Bypasses the cloud-host check. Internal testing only. + pub force: bool, + /// Retry backoff base. Internal testing only. + pub backoff_base: Duration, +} + +impl Default for FailoverConfig { + fn default() -> Self { + Self { enabled: true, force: false, backoff_base: BACKOFF_BASE } + } +} + +impl FailoverConfig { + /// Total request attempts for a host; 1 means no failover. Failover only + /// engages when enabled and the host is a LiveKit Cloud domain. `force` + /// bypasses the cloud-host check and is for internal testing only. + pub(crate) fn attempts(&self, host: Option<&str>) -> u32 { + if self.enabled && (self.force || host.map(is_cloud_host).unwrap_or(false)) { + MAX_ATTEMPTS + } else { + 1 + } + } +} + +/// Normalizes a region URL to an http(s) scheme (ws -> http, wss -> https), +/// mirroring the other SDKs and the server. +fn to_http_url(url: &str) -> String { + if let Some(rest) = url.strip_prefix("ws") { + format!("http{rest}") + } else { + url.to_owned() + } +} + +/// A stable key identifying a host (including port) for dedup across attempts. +pub(crate) fn host_key(url: &Url) -> String { + format!("{}:{}", url.host_str().unwrap_or(""), url.port_or_known_default().unwrap_or(0)) +} + +/// Returns the first region URL whose host has not yet been attempted. +pub(crate) fn pick_next(region_urls: &[String], attempted: &[String]) -> Option { + for raw in region_urls { + let Ok(url) = Url::parse(raw) else { continue }; + if url.host_str().is_none() { + continue; + } + if !attempted.iter().any(|a| a == &host_key(&url)) { + return Some(url); + } + } + None +} + +/// Sleeps for `d` before a retry. Backoff is applied on the tokio backend; the +/// async backend retries without delay (the failover path is short). +pub(crate) async fn backoff_sleep(d: Duration) { + if d.is_zero() { + return; + } + #[cfg(feature = "services-tokio")] + { + tokio::time::sleep(d).await; + } + #[cfg(not(feature = "services-tokio"))] + { + let _ = d; + } +} + +/// Region discovery (`/settings/regions`) uses a short timeout so a slow or +/// unreachable endpoint doesn't stall the failover path. +const DISCOVERY_TIMEOUT: Duration = Duration::from_secs(2); + +struct CacheEntry { + urls: Vec, + fetched_at: Instant, + ttl: Duration, +} + +fn cache() -> &'static Mutex> { + static CACHE: OnceLock>> = OnceLock::new(); + CACHE.get_or_init(|| Mutex::new(HashMap::new())) +} + +/// Returns the alternative region URLs for `base`, fetching `/settings/regions` +/// if the cache is stale. Best-effort: on a fetch failure it serves a stale +/// cached list when available, otherwise an empty list (the caller then stops +/// failing over). Forwards the caller's headers so a valid token — and any test +/// directives — reach the discovery endpoint. +pub(crate) async fn region_urls(base: &Url, headers: &HeaderMap) -> Vec { + let key = host_key(base); + + { + let c = cache().lock().unwrap(); + if let Some(entry) = c.get(&key) { + if entry.fetched_at.elapsed() < entry.ttl { + return entry.urls.clone(); + } + } + } + + match fetch(base, headers).await { + Ok((urls, ttl)) => { + // A zero TTL (e.g. Cache-Control: max-age=0) means "do not cache". + if !ttl.is_zero() { + cache().lock().unwrap().insert( + key, + CacheEntry { urls: urls.clone(), fetched_at: Instant::now(), ttl }, + ); + } + urls + } + Err(()) => cache().lock().unwrap().get(&key).map(|e| e.urls.clone()).unwrap_or_default(), + } +} + +/// Builds the header set forwarded to the discovery endpoint: the caller's +/// headers minus body-specific ones. +fn forward_headers(headers: &HeaderMap) -> HeaderMap { + let mut out = headers.clone(); + out.remove(CONTENT_TYPE); + out.remove(CONTENT_LENGTH); + out +} + +/// The discovery response carries `wss://` region URLs; the API client speaks +/// HTTP, so rewrite each to its `http(s)` equivalent. +fn normalize(list: RegionsResponse) -> Vec { + list.regions.into_iter().filter(|r| !r.url.is_empty()).map(|r| to_http_url(&r.url)).collect() +} + +/// Reads the cache TTL from a `Cache-Control` header value; absent or +/// unparseable means a zero TTL ("do not cache"). +fn ttl_from_cache_control(value: Option<&str>) -> Duration { + value.and_then(parse_max_age).unwrap_or(Duration::ZERO) +} + +#[cfg(feature = "services-tokio")] +async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Duration), ()> { + let mut url = base.clone(); + url.set_path("/settings/regions"); + + let resp = reqwest::Client::new() + .get(url) + .headers(forward_headers(headers)) + .timeout(DISCOVERY_TIMEOUT) + .send() + .await + .map_err(|_| ())?; + if !resp.status().is_success() { + return Err(()); + } + let ttl = + ttl_from_cache_control(resp.headers().get("cache-control").and_then(|v| v.to_str().ok())); + let list: RegionsResponse = resp.json().await.map_err(|_| ())?; + Ok((normalize(list), ttl)) +} + +#[cfg(all(feature = "services-async", not(feature = "services-tokio")))] +async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Duration), ()> { + use isahc::config::Configurable; + use isahc::AsyncReadResponseExt; + + let mut url = base.clone(); + url.set_path("/settings/regions"); + + let mut builder = isahc::Request::get(url.as_str()).timeout(DISCOVERY_TIMEOUT); + for (name, value) in forward_headers(headers).iter() { + builder = builder.header(name, value); + } + let request = builder.body(()).map_err(|_| ())?; + let mut resp = isahc::send_async(request).await.map_err(|_| ())?; + if !resp.status().is_success() { + return Err(()); + } + let ttl = + ttl_from_cache_control(resp.headers().get("cache-control").and_then(|v| v.to_str().ok())); + let list: RegionsResponse = resp.json().await.map_err(|_| ())?; + Ok((normalize(list), ttl)) +} diff --git a/livekit-api/src/services/ingress.rs b/livekit-api/src/services/ingress.rs index 8437d5c21..e885e8da9 100644 --- a/livekit-api/src/services/ingress.rs +++ b/livekit-api/src/services/ingress.rs @@ -72,6 +72,13 @@ impl IngressClient { Ok(Self::with_api_key(host, &api_key, &api_secret)) } + /// Enables or disables region failover (enabled by default). Failover only + /// engages for LiveKit Cloud hosts. + pub fn with_failover(mut self, enabled: bool) -> Self { + self.client = self.client.with_failover(enabled); + self + } + pub async fn create_ingress( &self, input_type: proto::IngressInput, diff --git a/livekit-api/src/services/mod.rs b/livekit-api/src/services/mod.rs index 65617db82..257ac05b2 100644 --- a/livekit-api/src/services/mod.rs +++ b/livekit-api/src/services/mod.rs @@ -28,8 +28,12 @@ pub mod ingress; pub mod room; pub mod sip; +mod failover; mod twirp_client; +#[cfg(all(test, feature = "services-tokio"))] +mod api_test; + pub const LIVEKIT_PACKAGE: &str = "livekit"; #[derive(Debug, Error)] diff --git a/livekit-api/src/services/room.rs b/livekit-api/src/services/room.rs index 6c3da17f4..28fa48a28 100644 --- a/livekit-api/src/services/room.rs +++ b/livekit-api/src/services/room.rs @@ -73,6 +73,13 @@ impl RoomClient { Ok(Self::with_api_key(host, &api_key, &api_secret)) } + /// Enables or disables region failover (enabled by default). Failover only + /// engages for LiveKit Cloud hosts. + pub fn with_failover(mut self, enabled: bool) -> Self { + self.client = self.client.with_failover(enabled); + self + } + pub async fn create_room( &self, name: &str, diff --git a/livekit-api/src/services/sip.rs b/livekit-api/src/services/sip.rs index fde982b16..83b25e443 100644 --- a/livekit-api/src/services/sip.rs +++ b/livekit-api/src/services/sip.rs @@ -169,6 +169,13 @@ impl SIPClient { Ok(Self::with_api_key(host, &api_key, &api_secret)) } + /// Enables or disables region failover (enabled by default). Failover only + /// engages for LiveKit Cloud hosts. + pub fn with_failover(mut self, enabled: bool) -> Self { + self.client = self.client.with_failover(enabled); + self + } + fn duration_to_proto(d: Option) -> Option { d.map(|d| ProtoDuration { seconds: d.as_secs() as i64, nanos: d.subsec_nanos() as i32 }) } diff --git a/livekit-api/src/services/twirp_client.rs b/livekit-api/src/services/twirp_client.rs index 1570febca..a3f1f4d06 100644 --- a/livekit-api/src/services/twirp_client.rs +++ b/livekit-api/src/services/twirp_client.rs @@ -20,7 +20,9 @@ use http::{ }; use serde::Deserialize; use thiserror::Error; +use url::Url; +use super::failover::{self, FailoverConfig}; use crate::http_client; pub const DEFAULT_PREFIX: &str = "/twirp"; @@ -82,6 +84,7 @@ pub struct TwirpClient { pkg: String, prefix: String, client: http_client::Client, + failover: FailoverConfig, } impl TwirpClient { @@ -91,9 +94,30 @@ impl TwirpClient { pkg: pkg.to_owned(), prefix: prefix.unwrap_or(DEFAULT_PREFIX).to_owned(), client: http_client::Client::new(), + failover: FailoverConfig::default(), } } + /// Enables or disables region failover (enabled by default). Failover only + /// engages for LiveKit Cloud hosts. + pub fn with_failover(mut self, enabled: bool) -> Self { + self.failover.enabled = enabled; + self + } + + /// Overrides the full failover configuration, including the internal + /// test-only `force` and `backoff_base` knobs. + #[cfg(test)] + pub(crate) fn with_failover_config(mut self, config: FailoverConfig) -> Self { + self.failover = config; + self + } + + /// Issues a Twirp request, failing over to alternative regions on retryable + /// errors. On any transport error or HTTP 5xx it discovers regions via + /// `/settings/regions` and replays the request — body and headers intact — + /// against the next untried region, with exponential backoff. A 4xx is + /// returned immediately. pub async fn request( &self, service: &str, @@ -101,19 +125,96 @@ impl TwirpClient { data: D, mut headers: HeaderMap, ) -> TwirpResult { - let mut url = url::Url::parse(&self.host)?; - - url.set_path(&format!("{}/{}.{}/{}", self.prefix, self.pkg, service, method)); - + let original = Url::parse(&self.host)?; + let path = format!("{}/{}.{}/{}", self.prefix, self.pkg, service, method); + let forward = headers.clone(); // headers for the discovery fetch (no content-type yet) headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/protobuf")); + let body = data.encode_to_vec(); + + let max_attempts = self.failover.attempts(original.host_str()); + let mut attempted = vec![failover::host_key(&original)]; + let mut region_urls: Option> = None; + let mut current = original.clone(); + + for attempt in 0..max_attempts { + let is_last = attempt + 1 >= max_attempts; + let mut url = current.clone(); + url.set_path(&path); + + let send = + self.client.post(url).headers(headers.clone()).body(body.clone()).send().await; + match send { + Ok(resp) => { + let status = resp.status(); + if status == StatusCode::OK { + return Ok(R::decode(resp.bytes().await?)?); + } + // 4xx is terminal; only 5xx is retryable. + if is_last || status.as_u16() < 500 { + let err: TwirpErrorCode = resp.json().await?; + return Err(TwirpError::Twirp(err)); + } + match self.next_region(&original, &forward, &mut region_urls, &attempted).await + { + Some(next) => { + log::warn!( + "livekit API request to {} failed with status {}, retrying with fallback url {}", + current.host_str().unwrap_or_default(), + status, + next, + ); + drop(resp); + failover::backoff_sleep(self.backoff(attempt)).await; + attempted.push(failover::host_key(&next)); + current = next; + } + None => { + let err: TwirpErrorCode = resp.json().await?; + return Err(TwirpError::Twirp(err)); + } + } + } + Err(err) => { + if is_last { + return Err(err.into()); + } + match self.next_region(&original, &forward, &mut region_urls, &attempted).await + { + Some(next) => { + log::warn!( + "livekit API request to {} failed ({}), retrying with fallback url {}", + current.host_str().unwrap_or_default(), + err, + next, + ); + failover::backoff_sleep(self.backoff(attempt)).await; + attempted.push(failover::host_key(&next)); + current = next; + } + None => return Err(err.into()), + } + } + } + } + unreachable!("failover loop always returns within the attempt budget") + } - let resp = self.client.post(url).headers(headers).body(data.encode_to_vec()).send().await?; + fn backoff(&self, attempt: u32) -> std::time::Duration { + self.failover.backoff_base * (1u32 << attempt) + } - if resp.status() == StatusCode::OK { - Ok(R::decode(resp.bytes().await?)?) - } else { - let err: TwirpErrorCode = resp.json().await?; - Err(TwirpError::Twirp(err)) + /// Resolves the next untried region, fetching the region list lazily on the + /// first retryable failure and reusing it thereafter. + async fn next_region( + &self, + original: &Url, + forward: &HeaderMap, + region_urls: &mut Option>, + attempted: &[String], + ) -> Option { + if region_urls.is_none() { + *region_urls = Some(failover::region_urls(original, forward).await); } + failover::pick_next(region_urls.as_ref().unwrap(), attempted) } } diff --git a/livekit-api/src/signal_client/region.rs b/livekit-api/src/signal_client/region.rs index ce76ce0d5..d0b3e4835 100644 --- a/livekit-api/src/signal_client/region.rs +++ b/livekit-api/src/signal_client/region.rs @@ -21,10 +21,10 @@ use std::{ use http::header::{HeaderMap, HeaderValue, AUTHORIZATION, CACHE_CONTROL}; use parking_lot::Mutex; -use serde::Deserialize; use tokio::sync::Mutex as AsyncMutex; use crate::http_client; +use crate::region::{is_cloud_host, parse_max_age, RegionsResponse}; use super::{SignalError, SignalResult, REGION_FETCH_TIMEOUT}; @@ -161,18 +161,6 @@ fn error_with_chain(err: &dyn StdError) -> String { pub struct RegionUrlProvider; -#[derive(Deserialize)] -pub struct RegionUrlResponse { - pub regions: Vec, -} - -#[derive(Deserialize)] -pub struct RegionUrlInfo { - pub region: String, - pub url: String, - pub distance: String, -} - impl RegionUrlProvider { /// Fetch the ordered list of region signalling URLs for a LiveKit Cloud /// host. Non-cloud (direct / self-hosted) URLs have no regions, so this @@ -184,11 +172,12 @@ impl RegionUrlProvider { /// de-duplicated: only one fetch runs at a time and the rest reuse its /// result. pub async fn fetch_region_urls(url: &str, token: &str) -> SignalResult> { - if !is_cloud_url(url)? { + let host = region_host(url)?; + // Non-cloud (direct / self-hosted) hosts have no regions. + if !is_cloud_host(&host) { return Ok(vec![]); } - let host = region_host(url)?; let cache = RegionCache::shared(); // Fast path: a fresh entry needs neither a fetch nor the fetch lock. @@ -282,7 +271,7 @@ pub(crate) async fn fetch_from_endpoint( res.headers().get(CACHE_CONTROL).and_then(|v| v.to_str().ok()).and_then(parse_max_age); let res = res - .json::() + .json::() .await .map_err(|e| SignalError::RegionError(error_with_chain(&e)))?; Ok((res.regions.into_iter().map(|i| i.url).collect(), max_age)) @@ -293,29 +282,6 @@ pub(crate) async fn fetch_from_endpoint( .map_err(|_| SignalError::RegionError("region fetch timed out".into()))? } -/// Parses the `max-age` directive (in seconds) out of a `Cache-Control` header -/// value, e.g. `"max-age=300, public"` -> `Some(300s)`. Returns `None` when the -/// directive is absent or unparseable, leaving the caller on the default TTL. -fn parse_max_age(cache_control: &str) -> Option { - cache_control.split(',').find_map(|directive| { - let (name, value) = directive.split_once('=')?; - name.trim().eq_ignore_ascii_case("max-age").then_some(())?; - value.trim().parse::().ok().map(Duration::from_secs) - }) -} - -fn is_cloud_url(url: &str) -> SignalResult { - let url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?; - let host = match url.host() { - Some(host) => host.to_string(), - None => { - return Err(SignalError::UrlParse("invalid hostname".into())); - } - }; - - Ok(host.ends_with(".livekit.cloud") || host.ends_with(".livekit.run")) -} - fn region_endpoint(url: &str) -> SignalResult { let mut url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?; match url.scheme() { @@ -476,17 +442,6 @@ mod tests { ); } - #[test] - fn test_is_cloud_url() { - assert!(is_cloud_url("wss://myapp.livekit.cloud").unwrap()); - assert!(is_cloud_url("wss://myapp.livekit.run").unwrap()); - assert!(is_cloud_url("https://myapp.livekit.cloud").unwrap()); - - assert!(!is_cloud_url("wss://localhost:7880").unwrap()); - assert!(!is_cloud_url("wss://example.com").unwrap()); - assert!(!is_cloud_url("wss://livekit.cloud.example.com").unwrap()); - } - #[test] fn test_region_host() { assert_eq!(region_host("wss://myapp.livekit.cloud").unwrap(), "myapp.livekit.cloud"); @@ -597,16 +552,6 @@ mod tests { assert!(!Arc::ptr_eq(&a1, &b), "different hosts get distinct fetch locks"); } - #[test] - fn test_parse_max_age() { - assert_eq!(parse_max_age("max-age=300"), Some(Duration::from_secs(300))); - assert_eq!(parse_max_age("public, max-age=300"), Some(Duration::from_secs(300))); - assert_eq!(parse_max_age("MAX-AGE=0, no-cache"), Some(Duration::ZERO)); - assert_eq!(parse_max_age("no-store"), None); - assert_eq!(parse_max_age("max-age=notanumber"), None); - assert_eq!(parse_max_age(""), None); - } - #[test] fn test_region_endpoint() { assert_eq!( From 0b74585389cd415a820beada4b493f15bfd732fd Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 17:20:41 +0200 Subject: [PATCH 02/13] rename for clarity --- livekit-api/src/lib.rs | 3 ++- livekit-api/src/region.rs | 2 +- livekit-api/src/signal_client/mod.rs | 12 ++++++------ .../{region.rs => region_url_provider.rs} | 0 4 files changed, 9 insertions(+), 8 deletions(-) rename livekit-api/src/signal_client/{region.rs => region_url_provider.rs} (100%) diff --git a/livekit-api/src/lib.rs b/livekit-api/src/lib.rs index 386bce68f..878da8147 100644 --- a/livekit-api/src/lib.rs +++ b/livekit-api/src/lib.rs @@ -40,7 +40,8 @@ pub mod signal_client; mod http_client; // Region-discovery helpers shared by the signaling region provider -// (signal_client::region) and the API failover region cache (services::failover). +// (signal_client::region_url_provider) and the API failover region cache +// (services::failover). #[cfg(any( feature = "signal-client-tokio", feature = "signal-client-async", diff --git a/livekit-api/src/region.rs b/livekit-api/src/region.rs index 43dbbd7d7..9d32412af 100644 --- a/livekit-api/src/region.rs +++ b/livekit-api/src/region.rs @@ -13,7 +13,7 @@ // limitations under the License. //! Region-discovery primitives shared by the two `/settings/regions` consumers: -//! the signaling region provider ([`crate::signal_client::region`]) and the API +//! the signaling region provider ([`crate::signal_client::region_url_provider`]) and the API //! failover region cache ([`crate::services::failover`]). //! //! Only the feature-independent pieces live here. The caches themselves are diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index e2448d1d1..76b7ba491 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -41,10 +41,10 @@ use async_tungstenite::tungstenite::Error as WsError; use crate::{http_client, signal_client::signal_stream::SignalStream}; -mod region; +mod region_url_provider; mod signal_stream; -pub use region::RegionUrlProvider; +pub use region_url_provider::RegionUrlProvider; pub type SignalEmitter = mpsc::UnboundedSender; pub type SignalEvents = mpsc::UnboundedReceiver; @@ -1307,7 +1307,7 @@ mod tests { }); let endpoint = format!("http://127.0.0.1:{}/settings/regions", addr.port()); - let result = region::fetch_from_endpoint(&endpoint, "fake-token").await; + let result = region_url_provider::fetch_from_endpoint(&endpoint, "fake-token").await; let (urls, _max_age) = result.unwrap(); assert_eq!( @@ -1339,7 +1339,7 @@ mod tests { }); let endpoint = format!("http://127.0.0.1:{}/settings/regions", addr.port()); - let result = region::fetch_from_endpoint(&endpoint, "fake-token").await; + let result = region_url_provider::fetch_from_endpoint(&endpoint, "fake-token").await; assert!(result.is_err()); let err = result.unwrap_err(); @@ -1358,7 +1358,7 @@ mod tests { // Try to connect to a port that's definitely not listening // This simulates a network-level failure let endpoint = "http://127.0.0.1:1/settings/regions"; - let result = region::fetch_from_endpoint(endpoint, "fake-token").await; + let result = region_url_provider::fetch_from_endpoint(endpoint, "fake-token").await; assert!(result.is_err()); let err = result.unwrap_err(); @@ -1416,7 +1416,7 @@ mod tests { }); let endpoint = format!("http://127.0.0.1:{}/settings/regions", addr.port()); - let result = region::fetch_from_endpoint(&endpoint, "fake-token").await; + let result = region_url_provider::fetch_from_endpoint(&endpoint, "fake-token").await; assert!(result.is_err()); let err = result.unwrap_err(); diff --git a/livekit-api/src/signal_client/region.rs b/livekit-api/src/signal_client/region_url_provider.rs similarity index 100% rename from livekit-api/src/signal_client/region.rs rename to livekit-api/src/signal_client/region_url_provider.rs From 08806fb92c20dc69f31094798fe1b417be4d1353 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 08:21:00 -0700 Subject: [PATCH 03/13] Create feat_auto_failover_apis_with_lk_cloud.md --- .changeset/feat_auto_failover_apis_with_lk_cloud.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/feat_auto_failover_apis_with_lk_cloud.md diff --git a/.changeset/feat_auto_failover_apis_with_lk_cloud.md b/.changeset/feat_auto_failover_apis_with_lk_cloud.md new file mode 100644 index 000000000..7931afc10 --- /dev/null +++ b/.changeset/feat_auto_failover_apis_with_lk_cloud.md @@ -0,0 +1,8 @@ +--- +livekit: patch +livekit-api: patch +livekit-ffi: patch +livekit-uniffi: patch +--- + +feat: auto failover APIs with LK Cloud - #1196 (@davidzhao) From d3da895a8f9304bf31d5e8afddaf308c26295dfa Mon Sep 17 00:00:00 2001 From: David Zhao Date: Mon, 29 Jun 2026 21:01:11 +0200 Subject: [PATCH 04/13] skip perms check --- livekit-api/src/services/api_test.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/livekit-api/src/services/api_test.rs b/livekit-api/src/services/api_test.rs index 73b1bcb70..b4883619d 100644 --- a/livekit-api/src/services/api_test.rs +++ b/livekit-api/src/services/api_test.rs @@ -57,6 +57,8 @@ async fn call( let client = TwirpClient::new(base, LIVEKIT_PACKAGE, None).with_failover_config(cfg); let mut headers = HeaderMap::new(); headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer test-token")); + // These tests exercise failover, not authz; skip the mock's permission check. + headers.insert(HeaderName::from_static("x-lk-mock-skip-auth"), HeaderValue::from_static("true")); for (k, v) in directives { headers.insert(HeaderName::from_static(k), HeaderValue::from_str(v).unwrap()); } From f67ad77193ef34604e4bb47448cd67360c2ed46e Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 30 Jun 2026 01:26:14 +0200 Subject: [PATCH 05/13] fmt --- livekit-api/src/services/api_test.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/livekit-api/src/services/api_test.rs b/livekit-api/src/services/api_test.rs index b4883619d..36d52fa9f 100644 --- a/livekit-api/src/services/api_test.rs +++ b/livekit-api/src/services/api_test.rs @@ -58,7 +58,8 @@ async fn call( let mut headers = HeaderMap::new(); headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer test-token")); // These tests exercise failover, not authz; skip the mock's permission check. - headers.insert(HeaderName::from_static("x-lk-mock-skip-auth"), HeaderValue::from_static("true")); + headers + .insert(HeaderName::from_static("x-lk-mock-skip-auth"), HeaderValue::from_static("true")); for (k, v) in directives { headers.insert(HeaderName::from_static(k), HeaderValue::from_str(v).unwrap()); } From 7aa409e12ef4235302b3107cb9c6a124b10fea59 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 30 Jun 2026 17:48:15 +0200 Subject: [PATCH 06/13] implement timeouts --- livekit-api/src/http_client.rs | 6 + livekit-api/src/services/agent_dispatch.rs | 6 + livekit-api/src/services/api_test.rs | 31 +++++ livekit-api/src/services/connector.rs | 6 + livekit-api/src/services/egress.rs | 6 + livekit-api/src/services/failover.rs | 24 +++- livekit-api/src/services/ingress.rs | 6 + livekit-api/src/services/room.rs | 6 + livekit-api/src/services/sip.rs | 139 ++++++++++++++------- livekit-api/src/services/twirp_client.rs | 36 +++++- 10 files changed, 212 insertions(+), 54 deletions(-) diff --git a/livekit-api/src/http_client.rs b/livekit-api/src/http_client.rs index a39cc7c26..1ab8826c0 100644 --- a/livekit-api/src/http_client.rs +++ b/livekit-api/src/http_client.rs @@ -163,6 +163,12 @@ mod async_std { self } + pub fn timeout(mut self, timeout: std::time::Duration) -> Self { + use isahc::config::Configurable; + self.builder = self.builder.timeout(timeout); + self + } + pub async fn send(self) -> io::Result { let request = self.builder.body(self.body).unwrap(); let response = self.client.send_async(request).await?; diff --git a/livekit-api/src/services/agent_dispatch.rs b/livekit-api/src/services/agent_dispatch.rs index 45cb7b94b..5f0501a20 100644 --- a/livekit-api/src/services/agent_dispatch.rs +++ b/livekit-api/src/services/agent_dispatch.rs @@ -48,6 +48,12 @@ impl AgentDispatchClient { self } + /// Overrides the default per-request timeout (10s) for calls on this client. + pub fn with_request_timeout(mut self, timeout: std::time::Duration) -> Self { + self.client = self.client.with_request_timeout(timeout); + self + } + /// Creates an explicit dispatch for an agent to join a room. /// /// To use explicit dispatch, your agent must be registered with an `agent_name`. diff --git a/livekit-api/src/services/api_test.rs b/livekit-api/src/services/api_test.rs index 36d52fa9f..b86ccfde6 100644 --- a/livekit-api/src/services/api_test.rs +++ b/livekit-api/src/services/api_test.rs @@ -178,3 +178,34 @@ async fn disabled() { .await .expect_err("disabled failover should not retry"); } + +// Pure unit test (no mock server): cloud-gating and the thundering-herd guard. +#[test] +fn attempts_gating_and_timeout_guard() { + use super::failover::{DEFAULT_REQUEST_TIMEOUT, MAX_ATTEMPTS, MIN_FAILOVER_TIMEOUT}; + + let cloud = "myproject.livekit.cloud"; + let ok = DEFAULT_REQUEST_TIMEOUT; // comfortably above the guard threshold + + // Enabled (the default): only *.livekit.cloud project domains fail over. + assert_eq!(config(true, false).attempts(Some(cloud), ok), MAX_ATTEMPTS); + assert_eq!( + config(true, false).attempts(Some("myproject.region.livekit.cloud"), ok), + MAX_ATTEMPTS + ); + assert_eq!(config(true, false).attempts(Some("myproject.livekit.io"), ok), 1); + assert_eq!(config(true, false).attempts(Some("example.com"), ok), 1); + assert_eq!(config(true, false).attempts(Some("127.0.0.1"), ok), 1); + assert_eq!(config(true, false).attempts(Some("notlivekit.cloud"), ok), 1); + + // force bypasses the cloud-host check; disabled never fails over. + assert_eq!(config(true, true).attempts(Some("127.0.0.1"), ok), MAX_ATTEMPTS); + assert_eq!(config(false, true).attempts(Some(cloud), ok), 1); + assert_eq!(config(false, false).attempts(Some(cloud), ok), 1); + + // Thundering-herd guard: a sub-threshold per-attempt timeout collapses to a + // single attempt even on a cloud host; exactly the threshold still fails over. + let below = MIN_FAILOVER_TIMEOUT - Duration::from_millis(1); + assert_eq!(config(true, true).attempts(Some(cloud), below), 1); + assert_eq!(config(true, true).attempts(Some(cloud), MIN_FAILOVER_TIMEOUT), MAX_ATTEMPTS); +} diff --git a/livekit-api/src/services/connector.rs b/livekit-api/src/services/connector.rs index 4a7535146..294f4f796 100644 --- a/livekit-api/src/services/connector.rs +++ b/livekit-api/src/services/connector.rs @@ -105,6 +105,12 @@ impl ConnectorClient { self } + /// Overrides the default per-request timeout (10s) for calls on this client. + pub fn with_request_timeout(mut self, timeout: std::time::Duration) -> Self { + self.client = self.client.with_request_timeout(timeout); + self + } + /// Dials a WhatsApp call /// /// # Arguments diff --git a/livekit-api/src/services/egress.rs b/livekit-api/src/services/egress.rs index ccd1b9114..9834b0f2a 100644 --- a/livekit-api/src/services/egress.rs +++ b/livekit-api/src/services/egress.rs @@ -128,6 +128,12 @@ impl EgressClient { self } + /// Overrides the default per-request timeout (10s) for calls on this client. + pub fn with_request_timeout(mut self, timeout: std::time::Duration) -> Self { + self.client = self.client.with_request_timeout(timeout); + self + } + pub async fn start_room_composite_egress( &self, room: &str, diff --git a/livekit-api/src/services/failover.rs b/livekit-api/src/services/failover.rs index 518069bf6..906fa4719 100644 --- a/livekit-api/src/services/failover.rs +++ b/livekit-api/src/services/failover.rs @@ -36,6 +36,15 @@ use crate::region::{is_cloud_host, parse_max_age, RegionsResponse}; pub(crate) const MAX_ATTEMPTS: u32 = 3; pub(crate) const BACKOFF_BASE: Duration = Duration::from_millis(200); +/// Default per-request timeout, applied to each attempt. Calls that dial a +/// phone (see [`crate::services::sip`]) override it with a longer budget. +pub(crate) const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// Below this per-request timeout a retry is unlikely to help, and many clients +/// would retry in lockstep across regions, so a short request gets a single +/// attempt (thundering-herd guard). +pub(crate) const MIN_FAILOVER_TIMEOUT: Duration = Duration::from_secs(5); + /// Internal region-failover configuration. The public API exposes only the /// `enabled` toggle (default true); `force` and `backoff_base` are test-only. #[derive(Debug, Clone, Copy)] @@ -55,14 +64,17 @@ impl Default for FailoverConfig { impl FailoverConfig { /// Total request attempts for a host; 1 means no failover. Failover only - /// engages when enabled and the host is a LiveKit Cloud domain. `force` + /// engages when enabled, the host is a LiveKit Cloud domain, and the + /// per-attempt `timeout` is long enough that retrying is worthwhile. `force` /// bypasses the cloud-host check and is for internal testing only. - pub(crate) fn attempts(&self, host: Option<&str>) -> u32 { - if self.enabled && (self.force || host.map(is_cloud_host).unwrap_or(false)) { - MAX_ATTEMPTS - } else { - 1 + pub(crate) fn attempts(&self, host: Option<&str>, timeout: Duration) -> u32 { + if !(self.enabled && (self.force || host.map(is_cloud_host).unwrap_or(false))) { + return 1; + } + if timeout < MIN_FAILOVER_TIMEOUT { + return 1; } + MAX_ATTEMPTS } } diff --git a/livekit-api/src/services/ingress.rs b/livekit-api/src/services/ingress.rs index e885e8da9..ecce4e1f9 100644 --- a/livekit-api/src/services/ingress.rs +++ b/livekit-api/src/services/ingress.rs @@ -79,6 +79,12 @@ impl IngressClient { self } + /// Overrides the default per-request timeout (10s) for calls on this client. + pub fn with_request_timeout(mut self, timeout: std::time::Duration) -> Self { + self.client = self.client.with_request_timeout(timeout); + self + } + pub async fn create_ingress( &self, input_type: proto::IngressInput, diff --git a/livekit-api/src/services/room.rs b/livekit-api/src/services/room.rs index 28fa48a28..e54fc5eb6 100644 --- a/livekit-api/src/services/room.rs +++ b/livekit-api/src/services/room.rs @@ -80,6 +80,12 @@ impl RoomClient { self } + /// Overrides the default per-request timeout (10s) for calls on this client. + pub fn with_request_timeout(mut self, timeout: std::time::Duration) -> Self { + self.client = self.client.with_request_timeout(timeout); + self + } + pub async fn create_room( &self, name: &str, diff --git a/livekit-api/src/services/sip.rs b/livekit-api/src/services/sip.rs index 83b25e443..85ac4afaf 100644 --- a/livekit-api/src/services/sip.rs +++ b/livekit-api/src/services/sip.rs @@ -24,6 +24,26 @@ use pbjson_types::Duration as ProtoDuration; const SVC: &str = "SIP"; +/// CreateSIPParticipant with `wait_until_answered` dials a phone and waits for +/// the answer, which takes longer than a normal request. +const SIP_DIAL_TIMEOUT: Duration = Duration::from_secs(30); + +/// A dialing request must outlast the ringing window, or it would abort before +/// the call can be answered. Keep the request timeout at least this far above +/// the ringing timeout. +const RINGING_TIMEOUT_MARGIN: Duration = Duration::from_secs(2); + +/// Request timeout for a phone-dialing call: the caller's `timeout` (or the dial +/// default) raised, when needed, to stay at least [`RINGING_TIMEOUT_MARGIN`] +/// above the ringing timeout. +fn sip_dial_timeout(timeout: Option, ringing_timeout: Option) -> Duration { + let mut effective = timeout.unwrap_or(SIP_DIAL_TIMEOUT); + if let Some(ringing) = ringing_timeout { + effective = effective.max(ringing + RINGING_TIMEOUT_MARGIN); + } + effective +} + #[derive(Debug)] pub struct SIPClient { base: ServiceBase, @@ -154,6 +174,17 @@ pub struct CreateSIPParticipantOptions { pub ringing_timeout: Option, pub max_call_duration: Option, pub enable_krisp: Option, + /// SIP headers sent as-is on the INVITE; may help the SIP endpoint identify + /// the call as coming from LiveKit. + pub headers: Option>, + /// Which SIP response headers to map to `sip.h.*` participant attributes. + pub include_headers: Option, + /// Media encryption policy for the call. + pub media_encryption: Option, + /// Per-request timeout override. Defaults to a longer value when + /// `wait_until_answered` is set (dialing takes time), otherwise the client + /// default. Raised, if needed, to stay above `ringing_timeout`. + pub timeout: Option, } impl SIPClient { @@ -176,6 +207,14 @@ impl SIPClient { self } + /// Overrides the default per-request timeout (10s) for calls on this client. + /// `create_sip_participant` can still override it per call via + /// [`CreateSIPParticipantOptions::timeout`]. + pub fn with_request_timeout(mut self, timeout: Duration) -> Self { + self.client = self.client.with_request_timeout(timeout); + self + } + fn duration_to_proto(d: Option) -> Option { d.map(|d| ProtoDuration { seconds: d.as_secs() as i64, nanos: d.subsec_nanos() as i32 }) } @@ -442,49 +481,61 @@ impl SIPClient { options: CreateSIPParticipantOptions, outbound_trunk_config: Option, ) -> ServiceResult { - self.client - .request( - SVC, - "CreateSIPParticipant", - proto::CreateSipParticipantRequest { - sip_trunk_id: sip_trunk_id.to_owned(), - trunk: outbound_trunk_config, - sip_call_to: call_to.to_owned(), - sip_number: options.sip_number.to_owned().unwrap_or_default(), - room_name: room_name.to_owned(), - participant_identity: options.participant_identity.to_owned(), - participant_name: options.participant_name.to_owned().unwrap_or_default(), - participant_metadata: options - .participant_metadata - .to_owned() - .unwrap_or_default(), - participant_attributes: options - .participant_attributes - .to_owned() - .unwrap_or_default(), - dtmf: options.dtmf.to_owned().unwrap_or_default(), - wait_until_answered: options.wait_until_answered.unwrap_or(false), - play_ringtone: options.play_dialtone.unwrap_or(false), - play_dialtone: options.play_dialtone.unwrap_or(false), - hide_phone_number: options.hide_phone_number.unwrap_or(false), - max_call_duration: Self::duration_to_proto(options.max_call_duration), - ringing_timeout: Self::duration_to_proto(options.ringing_timeout), - - // TODO: rename local proto as well - krisp_enabled: options.enable_krisp.unwrap_or(false), - - // TODO: support these attributes - headers: Default::default(), - include_headers: Default::default(), - media_encryption: Default::default(), - ..Default::default() - }, - self.base.auth_header( - Default::default(), - Some(SIPGrants { call: true, ..Default::default() }), - )?, - ) - .await - .map_err(Into::into) + let wait_until_answered = options.wait_until_answered.unwrap_or(false); + let user_timeout = options.timeout; + let ringing_timeout = options.ringing_timeout; + let request = proto::CreateSipParticipantRequest { + sip_trunk_id: sip_trunk_id.to_owned(), + trunk: outbound_trunk_config, + sip_call_to: call_to.to_owned(), + sip_number: options.sip_number.to_owned().unwrap_or_default(), + room_name: room_name.to_owned(), + participant_identity: options.participant_identity.to_owned(), + participant_name: options.participant_name.to_owned().unwrap_or_default(), + participant_metadata: options.participant_metadata.to_owned().unwrap_or_default(), + participant_attributes: options.participant_attributes.to_owned().unwrap_or_default(), + dtmf: options.dtmf.to_owned().unwrap_or_default(), + wait_until_answered, + play_ringtone: options.play_dialtone.unwrap_or(false), + play_dialtone: options.play_dialtone.unwrap_or(false), + hide_phone_number: options.hide_phone_number.unwrap_or(false), + max_call_duration: Self::duration_to_proto(options.max_call_duration), + ringing_timeout: Self::duration_to_proto(ringing_timeout), + krisp_enabled: options.enable_krisp.unwrap_or(false), + headers: options.headers.unwrap_or_default(), + include_headers: options.include_headers.map(|h| h as i32).unwrap_or_default(), + media_encryption: options.media_encryption.map(|e| e as i32).unwrap_or_default(), + ..Default::default() + }; + let headers = self.base.auth_header( + Default::default(), + Some(SIPGrants { call: true, ..Default::default() }), + )?; + + // A user-specified timeout wins; otherwise waiting for an answer dials a + // phone, which takes longer and must outlast ringing. Without waiting the + // request returns immediately, so the client default applies. + if wait_until_answered { + self.client + .request_with_timeout( + SVC, + "CreateSIPParticipant", + request, + headers, + sip_dial_timeout(user_timeout, ringing_timeout), + ) + .await + .map_err(Into::into) + } else if let Some(timeout) = user_timeout { + self.client + .request_with_timeout(SVC, "CreateSIPParticipant", request, headers, timeout) + .await + .map_err(Into::into) + } else { + self.client + .request(SVC, "CreateSIPParticipant", request, headers) + .await + .map_err(Into::into) + } } } diff --git a/livekit-api/src/services/twirp_client.rs b/livekit-api/src/services/twirp_client.rs index a3f1f4d06..954a97583 100644 --- a/livekit-api/src/services/twirp_client.rs +++ b/livekit-api/src/services/twirp_client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Display; +use std::{fmt::Display, time::Duration}; use http::{ header::{HeaderMap, HeaderValue, CONTENT_TYPE}, @@ -85,6 +85,7 @@ pub struct TwirpClient { prefix: String, client: http_client::Client, failover: FailoverConfig, + request_timeout: Duration, } impl TwirpClient { @@ -95,6 +96,7 @@ impl TwirpClient { prefix: prefix.unwrap_or(DEFAULT_PREFIX).to_owned(), client: http_client::Client::new(), failover: FailoverConfig::default(), + request_timeout: failover::DEFAULT_REQUEST_TIMEOUT, } } @@ -105,6 +107,13 @@ impl TwirpClient { self } + /// Overrides the default per-attempt request timeout (10s) applied to calls + /// that don't pass their own. Each failover attempt gets the full budget. + pub fn with_request_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + /// Overrides the full failover configuration, including the internal /// test-only `force` and `backoff_base` knobs. #[cfg(test)] @@ -119,11 +128,24 @@ impl TwirpClient { /// against the next untried region, with exponential backoff. A 4xx is /// returned immediately. pub async fn request( + &self, + service: &str, + method: &str, + data: D, + headers: HeaderMap, + ) -> TwirpResult { + self.request_with_timeout(service, method, data, headers, self.request_timeout).await + } + + /// Like [`request`](Self::request) but with an explicit per-attempt timeout, + /// for calls (e.g. SIP dialing) that need a longer budget than the default. + pub async fn request_with_timeout( &self, service: &str, method: &str, data: D, mut headers: HeaderMap, + timeout: Duration, ) -> TwirpResult { let original = Url::parse(&self.host)?; let path = format!("{}/{}.{}/{}", self.prefix, self.pkg, service, method); @@ -131,7 +153,7 @@ impl TwirpClient { headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/protobuf")); let body = data.encode_to_vec(); - let max_attempts = self.failover.attempts(original.host_str()); + let max_attempts = self.failover.attempts(original.host_str(), timeout); let mut attempted = vec![failover::host_key(&original)]; let mut region_urls: Option> = None; let mut current = original.clone(); @@ -141,8 +163,14 @@ impl TwirpClient { let mut url = current.clone(); url.set_path(&path); - let send = - self.client.post(url).headers(headers.clone()).body(body.clone()).send().await; + let send = self + .client + .post(url) + .headers(headers.clone()) + .body(body.clone()) + .timeout(timeout) + .send() + .await; match send { Ok(resp) => { let status = resp.status(); From 7cd47e1544ebfc6ff5c21a02c0834f97701c033a Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 1 Jul 2026 10:21:47 +0200 Subject: [PATCH 07/13] handle connectors timeout correctly too --- livekit-api/src/services/connector.rs | 91 ++++++++++++++++-------- livekit-api/src/services/dial_timeout.rs | 42 +++++++++++ livekit-api/src/services/mod.rs | 1 + livekit-api/src/services/sip.rs | 23 +----- 4 files changed, 108 insertions(+), 49 deletions(-) create mode 100644 livekit-api/src/services/dial_timeout.rs diff --git a/livekit-api/src/services/connector.rs b/livekit-api/src/services/connector.rs index 294f4f796..bfdc47bea 100644 --- a/livekit-api/src/services/connector.rs +++ b/livekit-api/src/services/connector.rs @@ -13,9 +13,12 @@ // limitations under the License. use livekit_protocol as proto; +use pbjson_types::Duration as ProtoDuration; use std::collections::HashMap; +use std::time::Duration; use super::{ServiceBase, ServiceResult, LIVEKIT_PACKAGE}; +use crate::services::dial_timeout::dial_timeout; use crate::{access_token::VideoGrants, get_env_keys, services::twirp_client::TwirpClient}; const SVC: &str = "Connector"; @@ -60,6 +63,15 @@ pub struct AcceptWhatsAppCallOptions { pub participant_attributes: Option>, /// Optional - Country where the call terminates as ISO 3166-1 alpha-2 pub destination_country: Option, + /// Optional - Max time for the callee to answer the call + pub ringing_timeout: Option, + /// Optional - Wait for the call to be answered before returning. When set, + /// the request blocks until the call is answered or fails. + pub wait_until_answered: Option, + /// Optional - Per-request timeout override. When `wait_until_answered` is + /// set, defaults to a longer value (dialing takes time) and is raised, if + /// needed, to stay above `ringing_timeout`; otherwise the client default. + pub timeout: Option, } /// Options for connecting a Twilio call @@ -236,34 +248,57 @@ impl ConnectorClient { sdp: proto::SessionDescription, options: AcceptWhatsAppCallOptions, ) -> ServiceResult { - self.client - .request( - SVC, - "AcceptWhatsAppCall", - proto::AcceptWhatsAppCallRequest { - whatsapp_phone_number_id: phone_number_id.into(), - whatsapp_api_key: api_key.into(), - whatsapp_cloud_api_version: cloud_api_version.into(), - whatsapp_call_id: call_id.into(), - whatsapp_biz_opaque_callback_data: options - .biz_opaque_callback_data - .unwrap_or_default(), - sdp: Some(sdp), - room_name: options.room_name.unwrap_or_default(), - agents: options.agents.unwrap_or_default(), - participant_identity: options.participant_identity.unwrap_or_default(), - participant_name: options.participant_name.unwrap_or_default(), - participant_metadata: options.participant_metadata.unwrap_or_default(), - participant_attributes: options.participant_attributes.unwrap_or_default(), - destination_country: options.destination_country.unwrap_or_default(), - ringing_timeout: Default::default(), - wait_until_answered: Default::default(), - }, - self.base - .auth_header(VideoGrants { room_create: true, ..Default::default() }, None)?, - ) - .await - .map_err(Into::into) + let wait_until_answered = options.wait_until_answered.unwrap_or(false); + let user_timeout = options.timeout; + let ringing_timeout = options.ringing_timeout; + let request = proto::AcceptWhatsAppCallRequest { + whatsapp_phone_number_id: phone_number_id.into(), + whatsapp_api_key: api_key.into(), + whatsapp_cloud_api_version: cloud_api_version.into(), + whatsapp_call_id: call_id.into(), + whatsapp_biz_opaque_callback_data: options.biz_opaque_callback_data.unwrap_or_default(), + sdp: Some(sdp), + room_name: options.room_name.unwrap_or_default(), + agents: options.agents.unwrap_or_default(), + participant_identity: options.participant_identity.unwrap_or_default(), + participant_name: options.participant_name.unwrap_or_default(), + participant_metadata: options.participant_metadata.unwrap_or_default(), + participant_attributes: options.participant_attributes.unwrap_or_default(), + destination_country: options.destination_country.unwrap_or_default(), + ringing_timeout: ringing_timeout.map(|d| ProtoDuration { + seconds: d.as_secs() as i64, + nanos: d.subsec_nanos() as i32, + }), + wait_until_answered, + }; + let headers = + self.base.auth_header(VideoGrants { room_create: true, ..Default::default() }, None)?; + + // Waiting for the call to be answered dials a phone, which takes longer + // than a normal request and must outlast ringing. Without waiting the + // request returns promptly, so the client default applies. + if wait_until_answered { + self.client + .request_with_timeout( + SVC, + "AcceptWhatsAppCall", + request, + headers, + dial_timeout(user_timeout, ringing_timeout), + ) + .await + .map_err(Into::into) + } else if let Some(timeout) = user_timeout { + self.client + .request_with_timeout(SVC, "AcceptWhatsAppCall", request, headers, timeout) + .await + .map_err(Into::into) + } else { + self.client + .request(SVC, "AcceptWhatsAppCall", request, headers) + .await + .map_err(Into::into) + } } /// Connects a Twilio call diff --git a/livekit-api/src/services/dial_timeout.rs b/livekit-api/src/services/dial_timeout.rs new file mode 100644 index 000000000..e10121b4f --- /dev/null +++ b/livekit-api/src/services/dial_timeout.rs @@ -0,0 +1,42 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Request-timeout handling shared by calls that dial a phone and wait for an +//! answer (SIP CreateSIPParticipant/TransferSIPParticipant, WhatsApp +//! AcceptWhatsAppCall). These take longer than a normal request, and the request +//! must outlast ringing or it would abort before the call can be answered. + +use std::time::Duration; + +/// Default per-request timeout for a call that dials a phone and waits. +pub(crate) const DIAL_TIMEOUT: Duration = Duration::from_secs(30); + +/// A dialing request must outlast the ringing window, or it would abort before +/// the call can be answered. Keep the request timeout at least this far above +/// the ringing timeout. +pub(crate) const RINGING_TIMEOUT_MARGIN: Duration = Duration::from_secs(2); + +/// Request timeout for a phone-dialing call: the caller's `timeout` (or the dial +/// default) raised, when needed, to stay at least [`RINGING_TIMEOUT_MARGIN`] +/// above the ringing timeout. +pub(crate) fn dial_timeout( + timeout: Option, + ringing_timeout: Option, +) -> Duration { + let mut effective = timeout.unwrap_or(DIAL_TIMEOUT); + if let Some(ringing) = ringing_timeout { + effective = effective.max(ringing + RINGING_TIMEOUT_MARGIN); + } + effective +} diff --git a/livekit-api/src/services/mod.rs b/livekit-api/src/services/mod.rs index 257ac05b2..6ef262543 100644 --- a/livekit-api/src/services/mod.rs +++ b/livekit-api/src/services/mod.rs @@ -28,6 +28,7 @@ pub mod ingress; pub mod room; pub mod sip; +mod dial_timeout; mod failover; mod twirp_client; diff --git a/livekit-api/src/services/sip.rs b/livekit-api/src/services/sip.rs index 85ac4afaf..7a79b2cc6 100644 --- a/livekit-api/src/services/sip.rs +++ b/livekit-api/src/services/sip.rs @@ -18,32 +18,13 @@ use std::time::Duration; use crate::access_token::SIPGrants; use crate::get_env_keys; +use crate::services::dial_timeout::dial_timeout; use crate::services::twirp_client::TwirpClient; use crate::services::{ServiceBase, ServiceResult, LIVEKIT_PACKAGE}; use pbjson_types::Duration as ProtoDuration; const SVC: &str = "SIP"; -/// CreateSIPParticipant with `wait_until_answered` dials a phone and waits for -/// the answer, which takes longer than a normal request. -const SIP_DIAL_TIMEOUT: Duration = Duration::from_secs(30); - -/// A dialing request must outlast the ringing window, or it would abort before -/// the call can be answered. Keep the request timeout at least this far above -/// the ringing timeout. -const RINGING_TIMEOUT_MARGIN: Duration = Duration::from_secs(2); - -/// Request timeout for a phone-dialing call: the caller's `timeout` (or the dial -/// default) raised, when needed, to stay at least [`RINGING_TIMEOUT_MARGIN`] -/// above the ringing timeout. -fn sip_dial_timeout(timeout: Option, ringing_timeout: Option) -> Duration { - let mut effective = timeout.unwrap_or(SIP_DIAL_TIMEOUT); - if let Some(ringing) = ringing_timeout { - effective = effective.max(ringing + RINGING_TIMEOUT_MARGIN); - } - effective -} - #[derive(Debug)] pub struct SIPClient { base: ServiceBase, @@ -522,7 +503,7 @@ impl SIPClient { "CreateSIPParticipant", request, headers, - sip_dial_timeout(user_timeout, ringing_timeout), + dial_timeout(user_timeout, ringing_timeout), ) .await .map_err(Into::into) From 3ab7b08c39afd6ed9faa790ac13b3460307b90c2 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 1 Jul 2026 10:55:31 +0200 Subject: [PATCH 08/13] set default ringingTimeout --- livekit-api/src/services/connector.rs | 7 +++++-- livekit-api/src/services/dial_timeout.rs | 20 ++++++++++---------- livekit-api/src/services/sip.rs | 7 +++++-- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/livekit-api/src/services/connector.rs b/livekit-api/src/services/connector.rs index bfdc47bea..e2400e91c 100644 --- a/livekit-api/src/services/connector.rs +++ b/livekit-api/src/services/connector.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::time::Duration; use super::{ServiceBase, ServiceResult, LIVEKIT_PACKAGE}; -use crate::services::dial_timeout::dial_timeout; +use crate::services::dial_timeout::{dial_timeout, DEFAULT_RINGING_TIMEOUT}; use crate::{access_token::VideoGrants, get_env_keys, services::twirp_client::TwirpClient}; const SVC: &str = "Connector"; @@ -250,7 +250,10 @@ impl ConnectorClient { ) -> ServiceResult { let wait_until_answered = options.wait_until_answered.unwrap_or(false); let user_timeout = options.timeout; - let ringing_timeout = options.ringing_timeout; + // When waiting for an answer, pin the ring window explicitly so our request + // timeout doesn't depend on the server's default (which could change). + let ringing_timeout = + options.ringing_timeout.or(wait_until_answered.then_some(DEFAULT_RINGING_TIMEOUT)); let request = proto::AcceptWhatsAppCallRequest { whatsapp_phone_number_id: phone_number_id.into(), whatsapp_api_key: api_key.into(), diff --git a/livekit-api/src/services/dial_timeout.rs b/livekit-api/src/services/dial_timeout.rs index e10121b4f..2665b5dbb 100644 --- a/livekit-api/src/services/dial_timeout.rs +++ b/livekit-api/src/services/dial_timeout.rs @@ -19,24 +19,24 @@ use std::time::Duration; -/// Default per-request timeout for a call that dials a phone and waits. -pub(crate) const DIAL_TIMEOUT: Duration = Duration::from_secs(30); +/// Ring window assumed when a request doesn't set a ringing timeout; matches the +/// server default. A dialing request must outlast it. +pub(crate) const DEFAULT_RINGING_TIMEOUT: Duration = Duration::from_secs(30); /// A dialing request must outlast the ringing window, or it would abort before /// the call can be answered. Keep the request timeout at least this far above /// the ringing timeout. pub(crate) const RINGING_TIMEOUT_MARGIN: Duration = Duration::from_secs(2); -/// Request timeout for a phone-dialing call: the caller's `timeout` (or the dial -/// default) raised, when needed, to stay at least [`RINGING_TIMEOUT_MARGIN`] -/// above the ringing timeout. +/// Request timeout for a phone-dialing call: the ring window plus a margin, so +/// the request doesn't abort before the call can be answered. The ring window is +/// `ringing_timeout` when set, else [`DEFAULT_RINGING_TIMEOUT`]. A longer caller +/// `timeout` is honored; a shorter one is raised to the floor. pub(crate) fn dial_timeout( timeout: Option, ringing_timeout: Option, ) -> Duration { - let mut effective = timeout.unwrap_or(DIAL_TIMEOUT); - if let Some(ringing) = ringing_timeout { - effective = effective.max(ringing + RINGING_TIMEOUT_MARGIN); - } - effective + let ring = ringing_timeout.unwrap_or(DEFAULT_RINGING_TIMEOUT); + let floor = ring + RINGING_TIMEOUT_MARGIN; + timeout.unwrap_or(floor).max(floor) } diff --git a/livekit-api/src/services/sip.rs b/livekit-api/src/services/sip.rs index 7a79b2cc6..f41baf861 100644 --- a/livekit-api/src/services/sip.rs +++ b/livekit-api/src/services/sip.rs @@ -18,7 +18,7 @@ use std::time::Duration; use crate::access_token::SIPGrants; use crate::get_env_keys; -use crate::services::dial_timeout::dial_timeout; +use crate::services::dial_timeout::{dial_timeout, DEFAULT_RINGING_TIMEOUT}; use crate::services::twirp_client::TwirpClient; use crate::services::{ServiceBase, ServiceResult, LIVEKIT_PACKAGE}; use pbjson_types::Duration as ProtoDuration; @@ -464,7 +464,10 @@ impl SIPClient { ) -> ServiceResult { let wait_until_answered = options.wait_until_answered.unwrap_or(false); let user_timeout = options.timeout; - let ringing_timeout = options.ringing_timeout; + // When waiting for an answer, pin the ring window explicitly so our request + // timeout doesn't depend on the server's default (which could change). + let ringing_timeout = + options.ringing_timeout.or(wait_until_answered.then_some(DEFAULT_RINGING_TIMEOUT)); let request = proto::CreateSipParticipantRequest { sip_trunk_id: sip_trunk_id.to_owned(), trunk: outbound_trunk_config, From 056913f103476c3f67ed11ce856d8c3e2e1994e3 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 1 Jul 2026 16:16:51 +0200 Subject: [PATCH 09/13] correctly handle whatsapp dial/accept flow --- livekit-api/src/services/connector.rs | 46 +++++++++++---------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/livekit-api/src/services/connector.rs b/livekit-api/src/services/connector.rs index e2400e91c..84b2ec4a5 100644 --- a/livekit-api/src/services/connector.rs +++ b/livekit-api/src/services/connector.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::time::Duration; use super::{ServiceBase, ServiceResult, LIVEKIT_PACKAGE}; -use crate::services::dial_timeout::{dial_timeout, DEFAULT_RINGING_TIMEOUT}; +use crate::services::dial_timeout::DEFAULT_RINGING_TIMEOUT; use crate::{access_token::VideoGrants, get_env_keys, services::twirp_client::TwirpClient}; const SVC: &str = "Connector"; @@ -249,11 +249,6 @@ impl ConnectorClient { options: AcceptWhatsAppCallOptions, ) -> ServiceResult { let wait_until_answered = options.wait_until_answered.unwrap_or(false); - let user_timeout = options.timeout; - // When waiting for an answer, pin the ring window explicitly so our request - // timeout doesn't depend on the server's default (which could change). - let ringing_timeout = - options.ringing_timeout.or(wait_until_answered.then_some(DEFAULT_RINGING_TIMEOUT)); let request = proto::AcceptWhatsAppCallRequest { whatsapp_phone_number_id: phone_number_id.into(), whatsapp_api_key: api_key.into(), @@ -268,7 +263,7 @@ impl ConnectorClient { participant_metadata: options.participant_metadata.unwrap_or_default(), participant_attributes: options.participant_attributes.unwrap_or_default(), destination_country: options.destination_country.unwrap_or_default(), - ringing_timeout: ringing_timeout.map(|d| ProtoDuration { + ringing_timeout: options.ringing_timeout.map(|d| ProtoDuration { seconds: d.as_secs() as i64, nanos: d.subsec_nanos() as i32, }), @@ -277,30 +272,27 @@ impl ConnectorClient { let headers = self.base.auth_header(VideoGrants { room_create: true, ..Default::default() }, None)?; - // Waiting for the call to be answered dials a phone, which takes longer - // than a normal request and must outlast ringing. Without waiting the - // request returns promptly, so the client default applies. - if wait_until_answered { - self.client - .request_with_timeout( - SVC, - "AcceptWhatsAppCall", - request, - headers, - dial_timeout(user_timeout, ringing_timeout), - ) - .await - .map_err(Into::into) - } else if let Some(timeout) = user_timeout { - self.client + // Accept can block until the call is answered, so default the request + // timeout to the standard ring window; the caller overrides via + // `options.timeout` and should set it above the ringing_timeout passed to + // `dial_whatsapp_call`. Without waiting, the request returns promptly and + // the client default applies. + let timeout = if wait_until_answered { + Some(options.timeout.unwrap_or(DEFAULT_RINGING_TIMEOUT)) + } else { + options.timeout + }; + match timeout { + Some(timeout) => self + .client .request_with_timeout(SVC, "AcceptWhatsAppCall", request, headers, timeout) .await - .map_err(Into::into) - } else { - self.client + .map_err(Into::into), + None => self + .client .request(SVC, "AcceptWhatsAppCall", request, headers) .await - .map_err(Into::into) + .map_err(Into::into), } } From 2dd47a9e2310d8f4f518c52f085be8b2542a82c9 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 2 Jul 2026 15:39:45 +0200 Subject: [PATCH 10/13] address comment --- livekit-api/src/services/twirp_client.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/livekit-api/src/services/twirp_client.rs b/livekit-api/src/services/twirp_client.rs index 954a97583..f028529eb 100644 --- a/livekit-api/src/services/twirp_client.rs +++ b/livekit-api/src/services/twirp_client.rs @@ -240,9 +240,10 @@ impl TwirpClient { region_urls: &mut Option>, attempted: &[String], ) -> Option { - if region_urls.is_none() { - *region_urls = Some(failover::region_urls(original, forward).await); - } - failover::pick_next(region_urls.as_ref().unwrap(), attempted) + let region_urls = match region_urls { + Some(urls) => urls, + None => region_urls.insert(failover::region_urls(original, forward).await), + }; + failover::pick_next(region_urls, attempted) } } From 1075be094f857e16f6be32b49e5b996b84d5f761 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 2 Jul 2026 18:52:36 +0200 Subject: [PATCH 11/13] use common cache implementation --- Cargo.lock | 11 +- livekit-api/Cargo.toml | 4 +- livekit-api/src/region.rs | 203 ++++++++++++++- livekit-api/src/services/failover.rs | 87 +++---- .../src/signal_client/region_url_provider.rs | 239 +++--------------- 5 files changed, 278 insertions(+), 266 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e716b92d..f0dc4eef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2516,6 +2516,12 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" +[[package]] +name = "futures-timer" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af43fadb8a98512d547e37b4e92e0ced13e205c061b87b4623eff01d918d6968" + [[package]] name = "futures-util" version = "0.3.32" @@ -3856,6 +3862,7 @@ dependencies = [ "bytes", "device-info", "flate2", + "futures-timer", "futures-util", "hmac", "http 1.4.0", @@ -5648,7 +5655,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap", @@ -5668,7 +5675,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "itertools 0.14.0", "log", "multimap", diff --git a/livekit-api/Cargo.toml b/livekit-api/Cargo.toml index 4487e4d76..8afd55fc7 100644 --- a/livekit-api/Cargo.toml +++ b/livekit-api/Cargo.toml @@ -46,7 +46,7 @@ __signal-client-async-compatible = [ services-tokio = ["dep:reqwest", "dep:tokio", "tokio/time"] -services-async = ["dep:isahc"] +services-async = ["dep:isahc", "dep:futures-timer"] access-token = ["dep:jsonwebtoken", "dep:hmac", "dep:signature"] webhooks = ["access-token", "dep:serde_json", "dep:base64"] @@ -134,6 +134,8 @@ bytes = { workspace = true, optional = true } http = "1.1" reqwest = { version = "0.12", default-features = false, features = [ "json" ], optional = true } isahc = { version = "1.7.2", default-features = false, features = [ "json", "text-decoding" ], optional = true } +# Runtime-agnostic timer for failover backoff on the non-tokio (isahc) backend. +futures-timer = { version = "3", optional = true } flate2 = { version = "1", optional = true } scopeguard = "1.2.0" diff --git a/livekit-api/src/region.rs b/livekit-api/src/region.rs index 9d32412af..e0e9848c2 100644 --- a/livekit-api/src/region.rs +++ b/livekit-api/src/region.rs @@ -12,18 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Region-discovery primitives shared by the two `/settings/regions` consumers: -//! the signaling region provider ([`crate::signal_client::region_url_provider`]) and the API -//! failover region cache ([`crate::services::failover`]). +//! Region discovery shared by the two `/settings/regions` consumers: the +//! signaling region provider ([`crate::signal_client::region_url_provider`]) and +//! the API failover ([`crate::services::failover`]). //! -//! Only the feature-independent pieces live here. The caches themselves are -//! deliberately separate: the signaling path keeps `wss://` URLs, de-duplicates -//! in-flight fetches and prunes failed regions, while the API path rewrites URLs -//! to `http(s)` and forwards the caller's request headers. The two are also in -//! independently compiled feature islands (the API SDK builds without the signal -//! client), so neither can depend on the other. +//! This holds the runtime-independent pieces both need: the response types, the +//! cloud-host / `Cache-Control` helpers, and the [`RegionCache`] (TTL lookups, +//! stale fallback, pruning). Each consumer owns its own cache instance — they +//! store URLs in different schemes (`wss://` for signaling, `http(s)` for the API) +//! — but the caching logic lives here once. The network fetch stays with each +//! consumer: they authenticate differently (bearer token vs. forwarded headers) +//! and run on different HTTP clients / feature islands. -use std::time::Duration; +use std::{ + collections::HashMap, + sync::Mutex, + time::{Duration, Instant}, +}; use serde::Deserialize; @@ -59,6 +64,94 @@ pub(crate) fn parse_max_age(cache_control: &str) -> Option { }) } +struct CachedRegions { + urls: Vec, + fetched_at: Instant, + /// Effective lifetime of this entry: the server's `Cache-Control: max-age` + /// when present, otherwise [`RegionCache::DEFAULT_TTL`]. + ttl: Duration, +} + +/// Outcome of a [`RegionCache::get`] lookup. +pub(crate) enum Cached { + /// Entry exists and is within the TTL — safe to use without re-fetching. + Fresh(Vec), + /// Entry exists but is older than the TTL — the caller should re-fetch, but + /// may fall back to these URLs if the re-fetch fails. + Stale(Vec), + /// No entry for this host. + Miss, +} + +/// Process-wide region-list cache keyed by host. Each consumer owns its own +/// instance (see the module docs); this type holds the shared caching logic: +/// TTL derived from the server's `Cache-Control: max-age`, fresh/stale/miss +/// lookups (stale entries are retained for fallback), and pruning of failed +/// regions. +pub(crate) struct RegionCache { + entries: Mutex>, + default_ttl: Duration, +} + +impl RegionCache { + /// Fallback entry lifetime, used when the region response carries no + /// `Cache-Control: max-age`. Matches client-sdk-js's `DEFAULT_MAX_AGE_MS`. + pub(crate) const DEFAULT_TTL: Duration = Duration::from_secs(5); + + pub(crate) fn new(default_ttl: Duration) -> Self { + Self { entries: Mutex::new(HashMap::new()), default_ttl } + } + + /// Looks up the cached region URLs for `host`, reporting whether the entry is + /// fresh (within its TTL), stale, or absent. + pub(crate) fn get(&self, host: &str) -> Cached { + let entries = self.entries.lock().unwrap(); + match entries.get(host) { + Some(e) if e.fetched_at.elapsed() < e.ttl => Cached::Fresh(e.urls.clone()), + Some(e) => Cached::Stale(e.urls.clone()), + None => Cached::Miss, + } + } + + /// Stores `urls` for `host`, honouring the server's `Cache-Control: max-age` + /// (`max_age`) as the entry's TTL and falling back to [`Self::DEFAULT_TTL`] + /// when the header is absent. + pub(crate) fn insert(&self, host: String, urls: Vec, max_age: Option) { + let ttl = max_age.unwrap_or(self.default_ttl); + self.entries + .lock() + .unwrap() + .insert(host, CachedRegions { urls, fetched_at: Instant::now(), ttl }); + } + + /// Removes `failed_url` from the cached list for `host` so it is not handed + /// out again. If that empties the list, the entry is dropped entirely, + /// forcing a re-fetch on the next lookup. + // Only the signaling consumer prunes individual failed regions. + #[allow(dead_code)] + pub(crate) fn mark_failed(&self, host: &str, failed_url: &str) { + let mut entries = self.entries.lock().unwrap(); + if let Some(entry) = entries.get_mut(host) { + entry.urls.retain(|u| u != failed_url); + if entry.urls.is_empty() { + entries.remove(host); + } + } + } + + /// Drops the cached entry for `host`, forcing a re-fetch on the next lookup. + #[allow(dead_code)] + pub(crate) fn invalidate(&self, host: &str) { + self.entries.lock().unwrap().remove(host); + } + + /// Drops every cached entry. + #[allow(dead_code)] + pub(crate) fn clear(&self) { + self.entries.lock().unwrap().clear(); + } +} + #[cfg(test)] mod tests { use super::*; @@ -83,4 +176,94 @@ mod tests { assert_eq!(parse_max_age("max-age=notanumber"), None); assert_eq!(parse_max_age(""), None); } + + #[test] + fn region_cache_reports_fresh_stale_and_miss() { + let cache = RegionCache::new(RegionCache::DEFAULT_TTL); + + let host = "cache-fresh.livekit.cloud"; + assert!(matches!(cache.get(host), Cached::Miss), "unknown host is a miss"); + + let urls = vec!["wss://r1.livekit.cloud".to_string(), "wss://r2.livekit.cloud".to_string()]; + cache.insert(host.to_string(), urls.clone(), None); + assert!( + matches!(cache.get(host), Cached::Fresh(u) if u == urls), + "fresh entry is a fresh hit" + ); + + // An entry older than its TTL is reported as stale (retained for fallback). + let stale_host = "cache-stale.livekit.cloud"; + let stale_urls = vec!["wss://old.livekit.cloud".to_string()]; + if let Some(past) = Instant::now().checked_sub(RegionCache::DEFAULT_TTL * 2) { + cache.entries.lock().unwrap().insert( + stale_host.to_string(), + CachedRegions { + urls: stale_urls.clone(), + fetched_at: past, + ttl: RegionCache::DEFAULT_TTL, + }, + ); + assert!( + matches!(cache.get(stale_host), Cached::Stale(u) if u == stale_urls), + "expired entry is a stale hit" + ); + } + } + + #[test] + fn region_cache_honors_server_max_age() { + // A short max-age expires before the (longer) default TTL would, proving + // the server's Cache-Control wins over the default. + let cache = RegionCache::new(Duration::from_secs(3600)); + let host = "max-age.livekit.cloud"; + let urls = vec!["wss://r1.livekit.cloud".to_string()]; + + cache.insert(host.to_string(), urls.clone(), Some(Duration::ZERO)); + assert!( + matches!(cache.get(host), Cached::Stale(u) if u == urls), + "max-age=0 entry is immediately stale despite the long default TTL" + ); + } + + #[test] + fn region_cache_mark_failed_prunes_then_drops() { + let cache = RegionCache::new(RegionCache::DEFAULT_TTL); + let host = "mark-failed.livekit.cloud"; + let r1 = "wss://r1.livekit.cloud".to_string(); + let r2 = "wss://r2.livekit.cloud".to_string(); + cache.insert(host.to_string(), vec![r1.clone(), r2.clone()], None); + + // Pruning one URL keeps the entry with the survivors. + cache.mark_failed(host, &r1); + assert!( + matches!(cache.get(host), Cached::Fresh(u) if u == vec![r2.clone()]), + "failed URL is pruned, the rest remain" + ); + + // Removing the last URL drops the entry entirely, forcing a re-fetch. + cache.mark_failed(host, &r2); + assert!(matches!(cache.get(host), Cached::Miss), "emptied entry is dropped"); + + // Marking an unknown host is a no-op. + cache.mark_failed("unknown.livekit.cloud", &r1); + } + + #[test] + fn region_cache_invalidate_and_clear() { + let cache = RegionCache::new(RegionCache::DEFAULT_TTL); + let a = "a.livekit.cloud"; + let b = "b.livekit.cloud"; + let urls = vec!["wss://r.livekit.cloud".to_string()]; + cache.insert(a.to_string(), urls.clone(), None); + cache.insert(b.to_string(), urls.clone(), None); + + // invalidate drops only the targeted host. + cache.invalidate(a); + assert!(matches!(cache.get(a), Cached::Miss), "invalidated host is a miss"); + assert!(matches!(cache.get(b), Cached::Fresh(_)), "other host is untouched"); + + // clear drops everything. + cache.clear(); + assert!(matches!(cache.get(b), Cached::Miss), "clear removes all entries"); + } } diff --git a/livekit-api/src/services/failover.rs b/livekit-api/src/services/failover.rs index 906fa4719..b06056f17 100644 --- a/livekit-api/src/services/failover.rs +++ b/livekit-api/src/services/failover.rs @@ -19,16 +19,12 @@ //! replays the request against the next region, with exponential backoff. 4xx //! responses are returned immediately. See [`TwirpClient::request`]. -use std::{ - collections::HashMap, - sync::{Mutex, OnceLock}, - time::{Duration, Instant}, -}; +use std::{sync::OnceLock, time::Duration}; use http::header::{HeaderMap, CONTENT_LENGTH, CONTENT_TYPE}; use url::Url; -use crate::region::{is_cloud_host, parse_max_age, RegionsResponse}; +use crate::region::{is_cloud_host, parse_max_age, Cached, RegionCache, RegionsResponse}; /// Total attempts (the original request plus fallback regions) and the base /// retry backoff are fixed, not user-configurable, so retries can't be tuned to @@ -107,8 +103,9 @@ pub(crate) fn pick_next(region_urls: &[String], attempted: &[String]) -> Option< None } -/// Sleeps for `d` before a retry. Backoff is applied on the tokio backend; the -/// async backend retries without delay (the failover path is short). +/// Sleeps for `d` before a retry, so failover backs off between attempts on +/// either backend (retrying with no delay would risk hammering the server). +/// `futures-timer` keeps the async path runtime-agnostic. pub(crate) async fn backoff_sleep(d: Duration) { if d.is_zero() { return; @@ -117,9 +114,9 @@ pub(crate) async fn backoff_sleep(d: Duration) { { tokio::time::sleep(d).await; } - #[cfg(not(feature = "services-tokio"))] + #[cfg(all(feature = "services-async", not(feature = "services-tokio")))] { - let _ = d; + futures_timer::Delay::new(d).await; } } @@ -127,15 +124,11 @@ pub(crate) async fn backoff_sleep(d: Duration) { /// unreachable endpoint doesn't stall the failover path. const DISCOVERY_TIMEOUT: Duration = Duration::from_secs(2); -struct CacheEntry { - urls: Vec, - fetched_at: Instant, - ttl: Duration, -} - -fn cache() -> &'static Mutex> { - static CACHE: OnceLock>> = OnceLock::new(); - CACHE.get_or_init(|| Mutex::new(HashMap::new())) +/// Process-wide region cache for the API failover path. Owns the API instance of +/// the shared [`RegionCache`] (which stores `http(s)` URLs; see [`crate::region`]). +fn region_cache() -> &'static RegionCache { + static CACHE: OnceLock = OnceLock::new(); + CACHE.get_or_init(|| RegionCache::new(RegionCache::DEFAULT_TTL)) } /// Returns the alternative region URLs for `base`, fetching `/settings/regions` @@ -145,28 +138,26 @@ fn cache() -> &'static Mutex> { /// directives — reach the discovery endpoint. pub(crate) async fn region_urls(base: &Url, headers: &HeaderMap) -> Vec { let key = host_key(base); + let cache = region_cache(); - { - let c = cache().lock().unwrap(); - if let Some(entry) = c.get(&key) { - if entry.fetched_at.elapsed() < entry.ttl { - return entry.urls.clone(); - } - } - } + let stale = match cache.get(&key) { + Cached::Fresh(urls) => return urls, + Cached::Stale(urls) => Some(urls), + Cached::Miss => None, + }; match fetch(base, headers).await { - Ok((urls, ttl)) => { - // A zero TTL (e.g. Cache-Control: max-age=0) means "do not cache". - if !ttl.is_zero() { - cache().lock().unwrap().insert( - key, - CacheEntry { urls: urls.clone(), fetched_at: Instant::now(), ttl }, - ); + Ok((urls, max_age)) => { + // A zero max-age (e.g. `Cache-Control: max-age=0`) means "do not + // cache"; skip the insert so the next call re-fetches rather than + // serving a stale entry. An absent max-age uses the cache default. + if max_age != Some(Duration::ZERO) { + cache.insert(key, urls.clone(), max_age); } urls } - Err(()) => cache().lock().unwrap().get(&key).map(|e| e.urls.clone()).unwrap_or_default(), + // The fresh fetch failed; fall back to the stale entry if we have one. + Err(()) => stale.unwrap_or_default(), } } @@ -185,14 +176,14 @@ fn normalize(list: RegionsResponse) -> Vec { list.regions.into_iter().filter(|r| !r.url.is_empty()).map(|r| to_http_url(&r.url)).collect() } -/// Reads the cache TTL from a `Cache-Control` header value; absent or -/// unparseable means a zero TTL ("do not cache"). -fn ttl_from_cache_control(value: Option<&str>) -> Duration { - value.and_then(parse_max_age).unwrap_or(Duration::ZERO) +/// Reads the `Cache-Control: max-age` from a header value for use as the cache +/// TTL; `None` (absent or unparseable) falls back to the cache's default TTL. +fn max_age_from_cache_control(value: Option<&str>) -> Option { + value.and_then(parse_max_age) } #[cfg(feature = "services-tokio")] -async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Duration), ()> { +async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Option), ()> { let mut url = base.clone(); url.set_path("/settings/regions"); @@ -206,14 +197,15 @@ async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Duration if !resp.status().is_success() { return Err(()); } - let ttl = - ttl_from_cache_control(resp.headers().get("cache-control").and_then(|v| v.to_str().ok())); + let max_age = max_age_from_cache_control( + resp.headers().get("cache-control").and_then(|v| v.to_str().ok()), + ); let list: RegionsResponse = resp.json().await.map_err(|_| ())?; - Ok((normalize(list), ttl)) + Ok((normalize(list), max_age)) } #[cfg(all(feature = "services-async", not(feature = "services-tokio")))] -async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Duration), ()> { +async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Option), ()> { use isahc::config::Configurable; use isahc::AsyncReadResponseExt; @@ -229,8 +221,9 @@ async fn fetch(base: &Url, headers: &HeaderMap) -> Result<(Vec, Duration if !resp.status().is_success() { return Err(()); } - let ttl = - ttl_from_cache_control(resp.headers().get("cache-control").and_then(|v| v.to_str().ok())); + let max_age = max_age_from_cache_control( + resp.headers().get("cache-control").and_then(|v| v.to_str().ok()), + ); let list: RegionsResponse = resp.json().await.map_err(|_| ())?; - Ok((normalize(list), ttl)) + Ok((normalize(list), max_age)) } diff --git a/livekit-api/src/signal_client/region_url_provider.rs b/livekit-api/src/signal_client/region_url_provider.rs index d0b3e4835..c27515aa7 100644 --- a/livekit-api/src/signal_client/region_url_provider.rs +++ b/livekit-api/src/signal_client/region_url_provider.rs @@ -16,7 +16,7 @@ use std::{ collections::HashMap, error::Error as StdError, sync::{Arc, OnceLock}, - time::{Duration, Instant}, + time::Duration, }; use http::header::{HeaderMap, HeaderValue, AUTHORIZATION, CACHE_CONTROL}; @@ -24,114 +24,32 @@ use parking_lot::Mutex; use tokio::sync::Mutex as AsyncMutex; use crate::http_client; -use crate::region::{is_cloud_host, parse_max_age, RegionsResponse}; +use crate::region::{is_cloud_host, parse_max_age, Cached, RegionCache, RegionsResponse}; use super::{SignalError, SignalResult, REGION_FETCH_TIMEOUT}; -struct CachedRegions { - urls: Vec, - fetched_at: Instant, - /// Effective lifetime of this entry: the server's `Cache-Control: max-age` - /// when present, otherwise [`RegionCache::default_ttl`]. - ttl: Duration, +/// Process-wide region cache for the signaling path. Persisting it here (rather +/// than on a per-connection object) means it survives across reconnect attempts +/// — each of which rebuilds the SignalClient — so the reconnect loop does not +/// re-pay the region fetch on every attempt. The caching logic lives in +/// [`RegionCache`]; this owns the signaling instance (which stores `wss://` URLs). +fn region_cache() -> &'static RegionCache { + static CACHE: OnceLock = OnceLock::new(); + CACHE.get_or_init(|| RegionCache::new(RegionCache::DEFAULT_TTL)) } -/// Outcome of a [`RegionCache::get`] lookup. -enum Cached { - /// Entry exists and is within the TTL — safe to use without re-fetching. - Fresh(Vec), - /// Entry exists but is older than the TTL — the caller should re-fetch, but - /// may fall back to these URLs if the re-fetch fails. - Stale(Vec), - /// No entry for this host. - Miss, -} - -/// Process-wide region-list cache keyed by host, mirroring client-sdk-js's -/// static `RegionUrlProvider.cache`. Persisting it here (rather than on a -/// per-connection object) means it survives across reconnect attempts — each of -/// which rebuilds the SignalClient — so the reconnect loop does not re-pay the -/// region fetch on every attempt. -struct RegionCache { - entries: Mutex>, - /// Per-host locks that serialise in-flight fetches, so concurrent cache - /// misses for the same host collapse into a single network request rather - /// than each issuing their own (single-flight). - fetch_locks: Mutex>>>, - default_ttl: Duration, -} - -impl RegionCache { - /// Fallback entry lifetime, used when the server's region response carries - /// no `Cache-Control: max-age`. Matches client-sdk-js's `DEFAULT_MAX_AGE_MS`. - const DEFAULT_TTL: Duration = Duration::from_secs(5); - - fn shared() -> &'static RegionCache { - static CACHE: OnceLock = OnceLock::new(); - CACHE.get_or_init(|| Self::new(Self::DEFAULT_TTL)) - } - - fn new(default_ttl: Duration) -> Self { - Self { - entries: Mutex::new(HashMap::new()), - fetch_locks: Mutex::new(HashMap::new()), - default_ttl, - } - } - - /// Returns the per-host fetch lock for `host`, creating it on first use. - /// Held across the network request so only one fetch per host runs at a - /// time; callers that wait on it then pick up the result from the cache. - fn fetch_lock(&self, host: &str) -> Arc> { - self.fetch_locks - .lock() - .entry(host.to_string()) - .or_insert_with(|| Arc::new(AsyncMutex::new(()))) - .clone() - } - - /// Looks up the cached region URLs for `host`, reporting whether the entry - /// is fresh (within its TTL), stale, or absent. A stale entry is retained so - /// callers can fall back to it when a re-fetch fails. - fn get(&self, host: &str) -> Cached { - let entries = self.entries.lock(); - match entries.get(host) { - Some(e) if e.fetched_at.elapsed() < e.ttl => Cached::Fresh(e.urls.clone()), - Some(e) => Cached::Stale(e.urls.clone()), - None => Cached::Miss, - } - } - - /// Stores `urls` for `host`, honouring the server's `Cache-Control: max-age` - /// (`max_age`) as the entry's TTL and falling back to [`Self::default_ttl`] - /// when the header is absent. - fn insert(&self, host: String, urls: Vec, max_age: Option) { - let ttl = max_age.unwrap_or(self.default_ttl); - self.entries.lock().insert(host, CachedRegions { urls, fetched_at: Instant::now(), ttl }); - } - - /// Removes `failed_url` from the cached list for `host` so it is not handed - /// out again. If that empties the list, the entry is dropped entirely, - /// forcing a re-fetch on the next lookup. - fn mark_failed(&self, host: &str, failed_url: &str) { - let mut entries = self.entries.lock(); - if let Some(entry) = entries.get_mut(host) { - entry.urls.retain(|u| u != failed_url); - if entry.urls.is_empty() { - entries.remove(host); - } - } - } - - /// Drops the cached entry for `host`, forcing a re-fetch on the next lookup. - fn invalidate(&self, host: &str) { - self.entries.lock().remove(host); - } - - /// Drops every cached entry. - fn clear(&self) { - self.entries.lock().clear(); - } +/// Returns the per-host fetch lock, creating it on first use. Held across the +/// network request so only one fetch per host runs at a time — concurrent cache +/// misses for the same host collapse into a single request (single-flight) — +/// after which the waiters pick up the result from the cache. +fn fetch_lock(host: &str) -> Arc> { + static LOCKS: OnceLock>>>> = OnceLock::new(); + LOCKS + .get_or_init(|| Mutex::new(HashMap::new())) + .lock() + .entry(host.to_string()) + .or_insert_with(|| Arc::new(AsyncMutex::new(()))) + .clone() } fn region_host(url: &str) -> SignalResult { @@ -178,7 +96,7 @@ impl RegionUrlProvider { return Ok(vec![]); } - let cache = RegionCache::shared(); + let cache = region_cache(); // Fast path: a fresh entry needs neither a fetch nor the fetch lock. let stale = match cache.get(&host) { @@ -189,8 +107,8 @@ impl RegionUrlProvider { // Single-flight: serialise concurrent fetches for the same host so they // collapse into one network request. - let fetch_lock = cache.fetch_lock(&host); - let _guard = fetch_lock.lock().await; + let host_lock = fetch_lock(&host); + let _guard = host_lock.lock().await; // Another caller may have refreshed the entry while we waited on the lock. if let Cached::Fresh(urls) = cache.get(&host) { @@ -223,7 +141,7 @@ impl RegionUrlProvider { /// entry is invalidated, forcing a fresh fetch on the next attempt. pub fn mark_failed(url: &str, failed_url: &str) { if let Ok(host) = region_host(url) { - RegionCache::shared().mark_failed(&host, failed_url); + region_cache().mark_failed(&host, failed_url); } } @@ -231,7 +149,7 @@ impl RegionUrlProvider { /// fetch on the next [`Self::fetch_region_urls`] call. pub fn invalidate(url: &str) { if let Ok(host) = region_host(url) { - RegionCache::shared().invalidate(&host); + region_cache().invalidate(&host); } } @@ -240,7 +158,7 @@ impl RegionUrlProvider { /// can invalidate every cached host at once. #[allow(dead_code)] pub fn clear() { - RegionCache::shared().clear(); + region_cache().clear(); } } @@ -449,105 +367,14 @@ mod tests { assert!(region_host("not a url").is_err()); } - #[test] - fn region_cache_reports_fresh_stale_and_miss() { - let cache = RegionCache::new(RegionCache::DEFAULT_TTL); - - let host = "cache-fresh.livekit.cloud"; - assert!(matches!(cache.get(host), Cached::Miss), "unknown host is a miss"); - - let urls = vec!["wss://r1.livekit.cloud".to_string(), "wss://r2.livekit.cloud".to_string()]; - cache.insert(host.to_string(), urls.clone(), None); - assert!( - matches!(cache.get(host), Cached::Fresh(u) if u == urls), - "fresh entry is a fresh hit" - ); - - // An entry older than its TTL is reported as stale (retained for fallback). - let stale_host = "cache-stale.livekit.cloud"; - let stale_urls = vec!["wss://old.livekit.cloud".to_string()]; - if let Some(past) = Instant::now().checked_sub(RegionCache::DEFAULT_TTL * 2) { - cache.entries.lock().insert( - stale_host.to_string(), - CachedRegions { - urls: stale_urls.clone(), - fetched_at: past, - ttl: RegionCache::DEFAULT_TTL, - }, - ); - assert!( - matches!(cache.get(stale_host), Cached::Stale(u) if u == stale_urls), - "expired entry is a stale hit" - ); - } - } - - #[test] - fn region_cache_honors_server_max_age() { - // A short max-age expires before the (longer) default TTL would, proving - // the server's Cache-Control wins over the default. - let cache = RegionCache::new(Duration::from_secs(3600)); - let host = "max-age.livekit.cloud"; - let urls = vec!["wss://r1.livekit.cloud".to_string()]; - - cache.insert(host.to_string(), urls.clone(), Some(Duration::ZERO)); - assert!( - matches!(cache.get(host), Cached::Stale(u) if u == urls), - "max-age=0 entry is immediately stale despite the long default TTL" - ); - } - - #[test] - fn region_cache_mark_failed_prunes_then_drops() { - let cache = RegionCache::new(RegionCache::DEFAULT_TTL); - let host = "mark-failed.livekit.cloud"; - let r1 = "wss://r1.livekit.cloud".to_string(); - let r2 = "wss://r2.livekit.cloud".to_string(); - cache.insert(host.to_string(), vec![r1.clone(), r2.clone()], None); - - // Pruning one URL keeps the entry with the survivors. - cache.mark_failed(host, &r1); - assert!( - matches!(cache.get(host), Cached::Fresh(u) if u == vec![r2.clone()]), - "failed URL is pruned, the rest remain" - ); - - // Removing the last URL drops the entry entirely, forcing a re-fetch. - cache.mark_failed(host, &r2); - assert!(matches!(cache.get(host), Cached::Miss), "emptied entry is dropped"); - - // Marking an unknown host is a no-op. - cache.mark_failed("unknown.livekit.cloud", &r1); - } - - #[test] - fn region_cache_invalidate_and_clear() { - let cache = RegionCache::new(RegionCache::DEFAULT_TTL); - let a = "a.livekit.cloud"; - let b = "b.livekit.cloud"; - let urls = vec!["wss://r.livekit.cloud".to_string()]; - cache.insert(a.to_string(), urls.clone(), None); - cache.insert(b.to_string(), urls.clone(), None); - - // invalidate drops only the targeted host. - cache.invalidate(a); - assert!(matches!(cache.get(a), Cached::Miss), "invalidated host is a miss"); - assert!(matches!(cache.get(b), Cached::Fresh(_)), "other host is untouched"); - - // clear drops everything. - cache.clear(); - assert!(matches!(cache.get(b), Cached::Miss), "clear removes all entries"); - } - #[test] fn fetch_lock_is_shared_per_host() { - let cache = RegionCache::new(RegionCache::DEFAULT_TTL); - // Same host hands back the same lock, so concurrent callers contend on a - // single fetch; distinct hosts get independent locks. - let a1 = cache.fetch_lock("a.livekit.cloud"); - let a2 = cache.fetch_lock("a.livekit.cloud"); - let b = cache.fetch_lock("b.livekit.cloud"); + // single fetch; distinct hosts get independent locks. (RegionCache's own + // caching behavior is unit-tested in crate::region.) + let a1 = fetch_lock("a.livekit.cloud"); + let a2 = fetch_lock("a.livekit.cloud"); + let b = fetch_lock("b.livekit.cloud"); assert!(Arc::ptr_eq(&a1, &a2), "same host shares one fetch lock"); assert!(!Arc::ptr_eq(&a1, &b), "different hosts get distinct fetch locks"); } From 1d94f4eef8bf73e36b4516835a8040a7ba499230 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Fri, 3 Jul 2026 00:27:39 +0200 Subject: [PATCH 12/13] simplify failover logic --- livekit-api/src/services/twirp_client.rs | 70 +++++++++++------------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/livekit-api/src/services/twirp_client.rs b/livekit-api/src/services/twirp_client.rs index f028529eb..2d4443071 100644 --- a/livekit-api/src/services/twirp_client.rs +++ b/livekit-api/src/services/twirp_client.rs @@ -171,58 +171,50 @@ impl TwirpClient { .timeout(timeout) .send() .await; - match send { + // The next untried region to fail over to, and a description of the + // failure for logging. `None` next means give up and surface the error. + let (next, reason) = match send { Ok(resp) => { let status = resp.status(); if status == StatusCode::OK { return Ok(R::decode(resp.bytes().await?)?); } // 4xx is terminal; only 5xx is retryable. - if is_last || status.as_u16() < 500 { + let next = if is_last || status.as_u16() < 500 { + None + } else { + self.next_region(&original, &forward, &mut region_urls, &attempted).await + }; + // No fallback: surface the server's error (needs the body). + let Some(next) = next else { let err: TwirpErrorCode = resp.json().await?; return Err(TwirpError::Twirp(err)); - } - match self.next_region(&original, &forward, &mut region_urls, &attempted).await - { - Some(next) => { - log::warn!( - "livekit API request to {} failed with status {}, retrying with fallback url {}", - current.host_str().unwrap_or_default(), - status, - next, - ); - drop(resp); - failover::backoff_sleep(self.backoff(attempt)).await; - attempted.push(failover::host_key(&next)); - current = next; - } - None => { - let err: TwirpErrorCode = resp.json().await?; - return Err(TwirpError::Twirp(err)); - } - } + }; + drop(resp); // release the connection before backing off + (next, format!("status {status}")) } Err(err) => { - if is_last { - return Err(err.into()); - } - match self.next_region(&original, &forward, &mut region_urls, &attempted).await - { - Some(next) => { - log::warn!( - "livekit API request to {} failed ({}), retrying with fallback url {}", - current.host_str().unwrap_or_default(), - err, - next, - ); - failover::backoff_sleep(self.backoff(attempt)).await; - attempted.push(failover::host_key(&next)); - current = next; - } + let next = if is_last { + None + } else { + self.next_region(&original, &forward, &mut region_urls, &attempted).await + }; + match next { + Some(next) => (next, err.to_string()), None => return Err(err.into()), } } - } + }; + + log::warn!( + "livekit API request to {} failed ({}), retrying with fallback url {}", + current.host_str().unwrap_or_default(), + reason, + next, + ); + failover::backoff_sleep(self.backoff(attempt)).await; + attempted.push(failover::host_key(&next)); + current = next; } unreachable!("failover loop always returns within the attempt budget") } From 5fb340a8931c90a32f3dc444b8cb76e88d94b3d7 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Fri, 3 Jul 2026 13:19:11 +0200 Subject: [PATCH 13/13] fix changeset --- .changeset/feat_auto_failover_apis_with_lk_cloud.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.changeset/feat_auto_failover_apis_with_lk_cloud.md b/.changeset/feat_auto_failover_apis_with_lk_cloud.md index 7931afc10..f3dcb0adb 100644 --- a/.changeset/feat_auto_failover_apis_with_lk_cloud.md +++ b/.changeset/feat_auto_failover_apis_with_lk_cloud.md @@ -1,8 +1,7 @@ --- livekit: patch -livekit-api: patch +livekit-api: minor livekit-ffi: patch -livekit-uniffi: patch --- feat: auto failover APIs with LK Cloud - #1196 (@davidzhao)