Skip to content

Commit 5bd5856

Browse files
authored
feat: auto failover APIs with LK Cloud (#733)
1 parent 0fa2e6e commit 5bd5856

14 files changed

Lines changed: 506 additions & 43 deletions

.github/workflows/test-api.yml

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Copyright 2026 LiveKit, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
name: Test API
16+
17+
permissions:
18+
contents: read
19+
20+
on:
21+
workflow_dispatch:
22+
push:
23+
branches: [main]
24+
pull_request:
25+
branches: [main]
26+
27+
jobs:
28+
failover:
29+
runs-on: ubuntu-latest
30+
services:
31+
mock-server:
32+
image: livekit/test-server:latest
33+
ports:
34+
- 9999:9999
35+
- 10000:10000
36+
- 10001:10001
37+
- 10002:10002
38+
steps:
39+
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
40+
with:
41+
submodules: true
42+
43+
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6
44+
with:
45+
python-version: "3.12"
46+
47+
- name: Install uv
48+
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
49+
50+
- name: Install livekit-api (without livekit-rtc)
51+
run: |
52+
uv venv .failover-venv
53+
source .failover-venv/bin/activate
54+
uv pip install ./livekit-protocol ./livekit-api pytest aiohttp
55+
56+
- name: Wait for mock server
57+
run: |
58+
for i in $(seq 1 30); do
59+
curl -sf http://127.0.0.1:9999/settings/regions >/dev/null && exit 0
60+
sleep 1
61+
done
62+
echo "mock server did not become ready" && exit 1
63+
64+
- name: Run API tests
65+
run: |
66+
source .failover-venv/bin/activate
67+
pytest tests/api -v
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2026 LiveKit, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Region failover for the Twirp API clients.
16+
17+
On a retryable failure (any transport error or HTTP 5xx) the client discovers
18+
alternative LiveKit Cloud regions via ``/settings/regions`` and replays the
19+
request against the next region, with exponential backoff. 4xx responses are
20+
returned immediately.
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import time
26+
from dataclasses import dataclass
27+
from typing import Dict, List, Optional
28+
from urllib.parse import urlparse
29+
30+
import aiohttp
31+
32+
FAILOVER_MAX_ATTEMPTS = 3
33+
FAILOVER_BACKOFF_BASE = 0.2 # seconds
34+
35+
36+
def failover_attempts(enabled: bool, host: Optional[str], force: bool = False) -> int:
37+
"""Total request attempts for a host; 1 means no failover. Failover only
38+
engages when enabled and the host is a LiveKit Cloud domain. ``force``
39+
bypasses the cloud-host check and is for internal testing only.
40+
"""
41+
if enabled and (force or (host is not None and is_cloud(host))):
42+
return FAILOVER_MAX_ATTEMPTS
43+
return 1
44+
45+
46+
def is_cloud(host: str) -> bool:
47+
# Failover only engages for LiveKit Cloud project domains.
48+
return host.endswith(".livekit.cloud")
49+
50+
51+
def to_http(url: str) -> str:
52+
"""Normalizes a region URL to an http(s) scheme (ws -> http, wss -> https)."""
53+
if url.startswith("ws"):
54+
return "http" + url[2:]
55+
return url
56+
57+
58+
def origin_of(url: str) -> str:
59+
"""Returns the scheme://host[:port] origin of a URL, dropping any path."""
60+
parsed = urlparse(url)
61+
return f"{parsed.scheme}://{parsed.netloc}"
62+
63+
64+
def host_key(url: str) -> str:
65+
"""A stable key identifying a host (including port) for dedup across attempts."""
66+
return urlparse(url).netloc.lower()
67+
68+
69+
def pick_next(region_origins: List[str], attempted: set[str]) -> Optional[str]:
70+
"""Returns the first region origin whose host has not yet been attempted."""
71+
for origin in region_origins:
72+
if host_key(origin) not in attempted:
73+
return origin
74+
return None
75+
76+
77+
@dataclass
78+
class _CacheEntry:
79+
origins: List[str]
80+
fetched_at: float
81+
ttl: float
82+
83+
84+
class RegionCache:
85+
"""Process-wide cache of the LiveKit Cloud region list, keyed by host."""
86+
87+
def __init__(self) -> None:
88+
self._entries: Dict[str, _CacheEntry] = {}
89+
90+
async def region_origins(
91+
self,
92+
session: aiohttp.ClientSession,
93+
origin: str,
94+
headers: Dict[str, str],
95+
) -> List[str]:
96+
"""Returns alternative region origins for ``origin``, fetching
97+
``/settings/regions`` if the cache is stale. Best-effort: on a fetch
98+
failure it serves a stale cached list when available, otherwise an empty
99+
list. Forwards ``headers`` so a valid token — and any test directives —
100+
reach the discovery endpoint."""
101+
key = host_key(origin)
102+
entry = self._entries.get(key)
103+
if entry is not None and (time.monotonic() - entry.fetched_at) < entry.ttl:
104+
return entry.origins
105+
106+
try:
107+
origins, ttl = await self._fetch(session, origin, headers)
108+
except Exception:
109+
return entry.origins if entry is not None else []
110+
111+
# A zero TTL (e.g. Cache-Control: max-age=0) means "do not cache".
112+
if ttl > 0:
113+
self._entries[key] = _CacheEntry(origins, time.monotonic(), ttl)
114+
return origins
115+
116+
async def _fetch(
117+
self,
118+
session: aiohttp.ClientSession,
119+
origin: str,
120+
headers: Dict[str, str],
121+
) -> tuple[List[str], float]:
122+
fetch_headers = {
123+
k: v for k, v in headers.items() if k.lower() not in ("content-type", "content-length")
124+
}
125+
# Short timeout so a slow/unreachable discovery endpoint doesn't stall
126+
# the failover path.
127+
async with session.get(
128+
f"{origin}/settings/regions",
129+
headers=fetch_headers,
130+
timeout=aiohttp.ClientTimeout(total=2),
131+
) as resp:
132+
if resp.status != 200:
133+
raise RuntimeError(f"region discovery failed: {resp.status}")
134+
ttl = _parse_max_age(resp.headers.get("Cache-Control"))
135+
body = await resp.json()
136+
origins = [origin_of(to_http(r["url"])) for r in body.get("regions", []) if r.get("url")]
137+
return origins, ttl
138+
139+
140+
def _parse_max_age(cache_control: Optional[str]) -> float:
141+
if not cache_control:
142+
return 0.0
143+
for directive in cache_control.split(","):
144+
directive = directive.strip().lower()
145+
if directive.startswith("max-age="):
146+
try:
147+
return float(int(directive[len("max-age=") :]))
148+
except ValueError:
149+
return 0.0
150+
return 0.0

livekit-api/livekit/api/_service.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,15 @@
99

1010

1111
class Service(ABC):
12-
def __init__(self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str):
13-
self._client = TwirpClient(session, host, "livekit")
12+
def __init__(
13+
self,
14+
session: aiohttp.ClientSession,
15+
host: str,
16+
api_key: str,
17+
api_secret: str,
18+
failover: bool = True,
19+
):
20+
self._client = TwirpClient(session, host, "livekit", failover=failover)
1421
self.api_key = api_key
1522
self.api_secret = api_secret
1623

livekit-api/livekit/api/access_token.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,10 @@ def verify(self, token: str, *, verify_signature: bool = True) -> Claims:
271271

272272
if claims.get("roomPreset"):
273273
grant_claims.room_preset = claims.get("roomPreset")
274-
if claims.get("roomConfig"):
274+
room_config = claims.get("roomConfig")
275+
if room_config:
275276
grant_claims.room_config = ParseDict(
276-
claims.get("roomConfig"),
277+
room_config,
277278
RoomConfiguration(),
278279
ignore_unknown_fields=True,
279280
)

livekit-api/livekit/api/agent_dispatch_service.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,15 @@ class AgentDispatchService(Service):
2626
```
2727
"""
2828

29-
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
30-
super().__init__(session, url, api_key, api_secret)
29+
def __init__(
30+
self,
31+
session: aiohttp.ClientSession,
32+
url: str,
33+
api_key: str,
34+
api_secret: str,
35+
failover: bool = True,
36+
):
37+
super().__init__(session, url, api_key, api_secret, failover=failover)
3138

3239
async def create_dispatch(self, req: CreateAgentDispatchRequest) -> AgentDispatch:
3340
"""Create an explicit dispatch for an agent to join a room.

livekit-api/livekit/api/connector_service.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,15 @@ class ConnectorService(Service):
3535
```
3636
"""
3737

38-
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
39-
super().__init__(session, url, api_key, api_secret)
38+
def __init__(
39+
self,
40+
session: aiohttp.ClientSession,
41+
url: str,
42+
api_key: str,
43+
api_secret: str,
44+
failover: bool = True,
45+
):
46+
super().__init__(session, url, api_key, api_secret, failover=failover)
4047

4148
async def dial_whatsapp_call(
4249
self, request: DialWhatsAppCallRequest

livekit-api/livekit/api/egress_service.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,15 @@ class EgressService(Service):
3333
Also see https://docs.livekit.io/home/egress/overview/
3434
"""
3535

36-
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
37-
super().__init__(session, url, api_key, api_secret)
36+
def __init__(
37+
self,
38+
session: aiohttp.ClientSession,
39+
url: str,
40+
api_key: str,
41+
api_secret: str,
42+
failover: bool = True,
43+
):
44+
super().__init__(session, url, api_key, api_secret, failover=failover)
3845

3946
async def start_room_composite_egress(self, start: RoomCompositeEgressRequest) -> EgressInfo:
4047
"""Starts a composite recording of a room."""

livekit-api/livekit/api/ingress_service.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,15 @@ class IngressService(Service):
2828
Also see https://docs.livekit.io/home/ingress/overview/
2929
"""
3030

31-
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
32-
super().__init__(session, url, api_key, api_secret)
31+
def __init__(
32+
self,
33+
session: aiohttp.ClientSession,
34+
url: str,
35+
api_key: str,
36+
api_secret: str,
37+
failover: bool = True,
38+
):
39+
super().__init__(session, url, api_key, api_secret, failover=failover)
3340

3441
async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo:
3542
return await self._client.request(

livekit-api/livekit/api/livekit_api.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
*,
3232
timeout: Optional[aiohttp.ClientTimeout] = None,
3333
session: Optional[aiohttp.ClientSession] = None,
34+
failover: bool = True,
3435
):
3536
"""Create a new LiveKitAPI instance.
3637
@@ -59,12 +60,14 @@ def __init__(
5960
timeout = aiohttp.ClientTimeout(total=60)
6061
self._session = aiohttp.ClientSession(timeout=timeout)
6162

62-
self._room = RoomService(self._session, url, api_key, api_secret)
63-
self._ingress = IngressService(self._session, url, api_key, api_secret)
64-
self._egress = EgressService(self._session, url, api_key, api_secret)
65-
self._sip = SipService(self._session, url, api_key, api_secret)
66-
self._agent_dispatch = AgentDispatchService(self._session, url, api_key, api_secret)
67-
self._connector = ConnectorService(self._session, url, api_key, api_secret)
63+
self._room = RoomService(self._session, url, api_key, api_secret, failover)
64+
self._ingress = IngressService(self._session, url, api_key, api_secret, failover)
65+
self._egress = EgressService(self._session, url, api_key, api_secret, failover)
66+
self._sip = SipService(self._session, url, api_key, api_secret, failover)
67+
self._agent_dispatch = AgentDispatchService(
68+
self._session, url, api_key, api_secret, failover
69+
)
70+
self._connector = ConnectorService(self._session, url, api_key, api_secret, failover)
6871

6972
@property
7073
def agent_dispatch(self) -> AgentDispatchService:

livekit-api/livekit/api/room_service.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,15 @@ class RoomService(Service):
4545
Also see https://docs.livekit.io/home/server/managing-rooms/ and https://docs.livekit.io/home/server/managing-participants/
4646
"""
4747

48-
def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
49-
super().__init__(session, url, api_key, api_secret)
48+
def __init__(
49+
self,
50+
session: aiohttp.ClientSession,
51+
url: str,
52+
api_key: str,
53+
api_secret: str,
54+
failover: bool = True,
55+
):
56+
super().__init__(session, url, api_key, api_secret, failover=failover)
5057

5158
async def create_room(
5259
self,

0 commit comments

Comments
 (0)