Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .misc import is_dev_mode, is_given, is_hosted, nodename, shortuuid, time_ms
from .moving_average import MovingAverage
from .participant import wait_for_agent, wait_for_participant, wait_for_track_publication
from .robust_microphone import RobustMicrophone

EventEmitter = rtc.EventEmitter

Expand Down Expand Up @@ -41,6 +42,7 @@
"wait_for_participant",
"wait_for_track_publication",
"resolve_env_var",
"RobustMicrophone",
]

# Cleanup docs of unexported modules
Expand Down
110 changes: 110 additions & 0 deletions livekit-agents/livekit/agents/utils/robust_microphone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
import time
from typing import Any

from livekit import rtc
from livekit.agents.log import logger
from livekit.agents.utils import aio


class RobustMicrophone:
"""
A robust microphone capture utility that wraps rtc.MediaDevices().open_input()
and automatically restarts the audio stream if it stalls (e.g. if the microphone
cable becomes loose).

This class exposes an `rtc.AudioSource` as `.source` which you can use to
create your LocalAudioTrack.
"""

def __init__(
self,
*,
sample_rate: int = 48000,
num_channels: int = 1,
stall_timeout: float = 2.0,
**open_input_kwargs: Any,
) -> None:
self._sample_rate = sample_rate
self._num_channels = num_channels
self._stall_timeout = stall_timeout
self._kwargs = open_input_kwargs

self._devices = rtc.MediaDevices()
self._source = rtc.AudioSource(self._sample_rate, self._num_channels)

self._mic_obj: Any = None
self._mic_track: rtc.LocalAudioTrack | None = None
self._mic_stream: rtc.AudioStream | None = None

self._running = False
self._monitor_task: asyncio.Task[None] | None = None
self._last_frame_time: float = time.monotonic()

@property
def source(self) -> rtc.AudioSource:
return self._source

def start(self) -> None:
if self._running:
return
self._running = True
self._monitor_task = asyncio.create_task(self._monitor_loop())

async def aclose(self) -> None:
if not self._running:
return
self._running = False
if self._monitor_task:
await aio.cancel_and_wait(self._monitor_task)
await self._close_internal()

async def _close_internal(self) -> None:
if self._mic_stream:
await self._mic_stream.aclose()
self._mic_stream = None
self._mic_track = None
self._mic_obj = None

def _start_internal(self) -> None:
# Provide defaults for sample_rate and num_channels if not provided by user
kwargs = dict(self._kwargs)
if "sample_rate" not in kwargs:
kwargs["sample_rate"] = self._sample_rate
if "num_channels" not in kwargs:
kwargs["num_channels"] = self._num_channels

self._mic_obj = self._devices.open_input(**kwargs)
self._mic_track = rtc.LocalAudioTrack.create_audio_track("robust-mic-internal", self._mic_obj.source)
self._mic_stream = rtc.AudioStream.from_track(self._mic_track)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 AudioStream.from_track called without sample_rate/num_channels unlike codebase pattern

The existing usage of rtc.AudioStream.from_track in livekit-agents/livekit/agents/voice/room_io/_input.py:303-308 passes sample_rate, num_channels, and frame_size_ms explicitly. The new code at line 79 calls rtc.AudioStream.from_track(self._mic_track) without these parameters. If the from_track defaults differ from self._sample_rate/self._num_channels, the frames forwarded to self._source could have mismatched format. This might be benign if the local mic track already outputs at the configured rate, but it deviates from established patterns and could cause subtle audio issues.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

self._last_frame_time = time.monotonic()

async def _monitor_loop(self) -> None:
self._start_internal()

while self._running:
try:
assert self._mic_stream is not None

# Wait for the next audio event with a timeout
event = await asyncio.wait_for(
self._mic_stream.__anext__(), timeout=self._stall_timeout
)
self._last_frame_time = time.monotonic()

# Forward the captured frame to our own source
await self._source.capture_frame(event.frame)

except asyncio.TimeoutError:
# Stall detected!
logger.warning(
f"RobustMicrophone: No audio frames received for {self._stall_timeout}s. Restarting microphone..."
)
await self._close_internal()
await asyncio.sleep(0.5) # Brief pause before reconnecting
self._start_internal()
except Exception as e:
logger.error(f"RobustMicrophone error in monitor loop: {e}", exc_info=e)
await asyncio.sleep(1.0)
await self._close_internal()
self._start_internal()
Comment on lines +98 to +110

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Unprotected _start_internal() in error handlers crashes the monitor loop permanently

In _monitor_loop, both exception handlers call self._start_internal() (lines 105 and 110) without any try/except protection. If _start_internal() raises an exception (e.g., device unavailable after disconnect, permission error, or any failure in open_input()/create_audio_track()/from_track()), the exception propagates out of the while self._running: loop, causing the asyncio task to silently terminate. The _running flag remains True, so the RobustMicrophone appears operational but is permanently dead with no recovery possible. This completely defeats the class's stated purpose of automatically restarting on failure.

Affected code paths

Line 105 (after timeout stall detection):

except asyncio.TimeoutError:
    await self._close_internal()
    await asyncio.sleep(0.5)
    self._start_internal()  # unprotected - crashes loop on failure

Line 110 (after generic error):

except Exception as e:
    await asyncio.sleep(1.0)
    await self._close_internal()
    self._start_internal()  # unprotected - crashes loop on failure

The same issue exists at line 83 where the initial _start_internal() is called without protection.

Prompt for agents
The _start_internal() calls inside both exception handlers (lines 105 and 110) and at line 83 need to be wrapped in try/except. If _start_internal() fails, the monitor loop should continue running and retry after a delay, rather than letting the exception propagate out and kill the task.

A possible approach:
1. Wrap each _start_internal() call in a try/except that catches Exception
2. On failure, log the error and continue the while loop (which will try again on the next iteration)
3. Consider adding a retry delay and/or exponential backoff for repeated _start_internal() failures
4. Consider adding a max_retries parameter to eventually give up and signal failure

The initial _start_internal() at line 83 should also be inside the while loop's try/except, or have its own retry logic, since device initialization can fail at startup too.

Relevant file: livekit-agents/livekit/agents/utils/robust_microphone.py, function _monitor_loop (lines 82-110).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

70 changes: 70 additions & 0 deletions tests/test_robust_microphone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from livekit import rtc

Check failure on line 5 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (F401)

tests/test_robust_microphone.py:5:21: F401 `livekit.rtc` imported but unused help: Remove unused import: `livekit.rtc`
from livekit.agents.utils.robust_microphone import RobustMicrophone

Check failure on line 6 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (I001)

tests/test_robust_microphone.py:1:1: I001 Import block is un-sorted or un-formatted help: Organize imports

pytestmark = pytest.mark.unit


@pytest.mark.asyncio
async def test_robust_microphone_startup_and_shutdown():
with patch("livekit.rtc.MediaDevices", autospec=True) as mock_media_devices, \
patch("livekit.rtc.AudioSource", autospec=True) as mock_audio_source, \

Check failure on line 14 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (F841)

tests/test_robust_microphone.py:14:61: F841 Local variable `mock_audio_source` is assigned to but never used help: Remove assignment to unused variable `mock_audio_source`
patch("livekit.rtc.LocalAudioTrack.create_audio_track", autospec=True) as mock_create_track, \
patch("livekit.rtc.AudioStream.from_track", autospec=True) as mock_from_track:

# Mock the stream iterator to just hang (we'll shut down before it times out)
mock_stream = AsyncMock()
mock_stream.__anext__.side_effect = asyncio.TimeoutError
mock_from_track.return_value = mock_stream

mic = RobustMicrophone(stall_timeout=10.0)
mic.start()

Check failure on line 25 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (W293)

tests/test_robust_microphone.py:25:1: W293 Blank line contains whitespace help: Remove whitespace from blank line
# Give it a moment to start the internal stream
await asyncio.sleep(0.1)

Check failure on line 28 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (W293)

tests/test_robust_microphone.py:28:1: W293 Blank line contains whitespace help: Remove whitespace from blank line
assert mock_media_devices.called
assert mock_create_track.called
assert mock_from_track.called

await mic.aclose()


@pytest.mark.asyncio
async def test_robust_microphone_restarts_on_stall():
with patch("livekit.rtc.MediaDevices", autospec=True) as mock_media_devices, \

Check failure on line 38 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (F841)

tests/test_robust_microphone.py:38:62: F841 Local variable `mock_media_devices` is assigned to but never used help: Remove assignment to unused variable `mock_media_devices`
patch("livekit.rtc.AudioSource", autospec=True) as mock_audio_source, \

Check failure on line 39 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (F841)

tests/test_robust_microphone.py:39:61: F841 Local variable `mock_audio_source` is assigned to but never used help: Remove assignment to unused variable `mock_audio_source`
patch("livekit.rtc.LocalAudioTrack.create_audio_track", autospec=True) as mock_create_track, \

Check failure on line 40 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (F841)

tests/test_robust_microphone.py:40:84: F841 Local variable `mock_create_track` is assigned to but never used help: Remove assignment to unused variable `mock_create_track`
patch("livekit.rtc.AudioStream.from_track", autospec=True) as mock_from_track:

mock_stream = AsyncMock()
mock_stream.aclose = AsyncMock()

Check failure on line 45 in tests/test_robust_microphone.py

View workflow job for this annotation

GitHub Actions / ruff

ruff (W293)

tests/test_robust_microphone.py:45:1: W293 Blank line contains whitespace help: Remove whitespace from blank line
# First call times out immediately, causing a restart
# Second call hangs so we can shut down
call_count = 0
async def mock_anext():
nonlocal call_count
call_count += 1
if call_count == 1:
raise asyncio.TimeoutError()
else:
await asyncio.sleep(10)
return MagicMock()

mock_stream.__anext__ = mock_anext
mock_from_track.return_value = mock_stream

mic = RobustMicrophone(stall_timeout=0.1)
mic.start()

# Wait for the timeout and restart to happen
await asyncio.sleep(0.5)

# MediaDevices.open_input should have been called twice (initial + restart)
assert mock_from_track.call_count >= 2

await mic.aclose()
Loading