Skip to content

Commit 31e0ea1

Browse files
Fix FfiClient listener use-after-free during Room teardown (#171)
Fixes a use-after-free crash when a Room (or any listener owner) is destroyed while the FFI thread is still dispatching an event into its FfiClient listener. Observed both in flaky integration tests and user-reported. * Introduce ListenerSlot (additional state around Listener) to track in-flight listener callbacks; removeListener() and shutdown() block until active callbacks finish before teardown proceeds * Set a removed flag before draining so stale pushEvent snapshots do not start new callbacks after unregister * Replace the initialized_ bool with a LifecycleState enum and compare_exchange_strong for single-flight initialize() / shutdown(), and to drop events / reject new listeners during shutdown * Added 2 unit tests to verify fixes are resiliant
1 parent 3a63ba6 commit 31e0ea1

3 files changed

Lines changed: 439 additions & 26 deletions

File tree

src/ffi_client.cpp

Lines changed: 159 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <cassert>
2020
#include <csignal>
21+
#include <cstdio>
2122

2223
#include "data_track.pb.h"
2324
#include "e2ee.pb.h"
@@ -146,44 +147,147 @@ FfiClient& FfiClient::instance() noexcept {
146147
return instance;
147148
}
148149

149-
// clang-tidy flags this as a trivial destructor in release mode
150-
// due to the assert being pre-processed out
151-
// NOLINTNEXTLINE(modernize-use-equals-default)
152150
FfiClient::~FfiClient() {
153-
assert(!initialized_.load() &&
154-
"LiveKit SDK was not shut down before process exit. "
155-
"Call livekit::shutdown().");
151+
if (lifecycle_state_.load() == LifecycleState::Initialized) {
152+
// Explicitly use this over spdlog/std::cerr which can throw
153+
// Wrapping spdlog try/catch also flags "empty catch" clang-tidy check
154+
std::fputs("[livekit] [warning] SDK was not shut down before process exit. Use livekit::shutdown()\n", stderr);
155+
std::fflush(stderr);
156+
}
156157
}
157158

158159
void FfiClient::shutdown() noexcept {
159-
if (!isInitialized()) {
160-
return;
160+
// Don't use string to avoid exceptions
161+
// (Also cleaner with exception.what() and printing)
162+
const char* shutdown_error = nullptr;
163+
try {
164+
// compare_exchange_strong atomically claims Initialized -> ShuttingDown so only one
165+
// concurrent shutdown() drains listeners and disposes the FFI.
166+
LifecycleState expected = LifecycleState::Initialized;
167+
if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::ShuttingDown, std::memory_order_acq_rel)) {
168+
// If not Initialized, return early to avoid unnecessary cleanup
169+
std::fputs("[livekit] [warning] SDK was shutdown while not initialized\n", stderr);
170+
std::fflush(stderr);
171+
return;
172+
}
173+
174+
std::vector<std::shared_ptr<ListenerSlot>> listeners_to_drain;
175+
std::vector<std::unique_ptr<PendingBase>> pending_to_cancel;
176+
{
177+
const std::scoped_lock<std::mutex> guard(lock_);
178+
listeners_to_drain.reserve(listeners_.size());
179+
for (auto& [id, slot] : listeners_) {
180+
(void)id;
181+
if (slot) {
182+
// Mark the listener as removed to prevent race conditions
183+
{
184+
const std::scoped_lock<std::mutex> slot_guard(slot->mutex);
185+
slot->removed = true;
186+
}
187+
// Add the listener to the list of listeners to drain
188+
listeners_to_drain.push_back(std::move(slot));
189+
}
190+
}
191+
listeners_.clear();
192+
193+
// Add the pending operations to the list of pending operations to cancel
194+
pending_to_cancel.reserve(pending_by_id_.size());
195+
for (auto& [async_id, pending] : pending_by_id_) {
196+
(void)async_id;
197+
if (pending) {
198+
pending_to_cancel.push_back(std::move(pending));
199+
}
200+
}
201+
pending_by_id_.clear();
202+
}
203+
204+
// Cancel the pending operations
205+
for (auto& pending : pending_to_cancel) {
206+
pending->cancel();
207+
}
208+
209+
const auto this_thread = std::this_thread::get_id();
210+
// Wait for all in-flight listener callbacks to complete
211+
for (const auto& slot : listeners_to_drain) {
212+
std::unique_lock<std::mutex> slot_lock(slot->mutex);
213+
214+
// When shutdown() isn't on a listener thread, self_active is 0 and we wait for active_callbacks == 0. When it's
215+
// called from inside a listener, self_active is 1 and the wait succeeds immediately with active_callbacks == 1,
216+
// so we don't wait on our own in-flight callback
217+
slot->cv.wait(slot_lock, [&slot, this_thread] {
218+
const auto thread_it = slot->active_threads.find(this_thread);
219+
const int self_active = thread_it == slot->active_threads.end() ? 0 : thread_it->second;
220+
return slot->active_callbacks == self_active;
221+
});
222+
}
223+
} catch (const std::exception& e) {
224+
shutdown_error = e.what();
225+
} catch (...) {
226+
shutdown_error = "unknown exception";
161227
}
162-
initialized_.store(false, std::memory_order_release);
228+
163229
livekit_ffi_dispose();
230+
lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release);
231+
232+
if (shutdown_error != nullptr) {
233+
// Explicitly use this over spdlog (method is noexcept)
234+
(void)std::fputs("[livekit] [error] SDK shutdown failed during local cleanup: ", stderr);
235+
(void)std::fputs(shutdown_error, stderr);
236+
(void)std::fputs("\n", stderr);
237+
(void)std::fflush(stderr);
238+
}
164239
}
165240

166241
bool FfiClient::initialize(bool capture_logs) {
167-
if (isInitialized()) {
242+
LifecycleState expected = LifecycleState::Uninitialized;
243+
if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::Initializing, std::memory_order_acq_rel)) {
168244
return false;
169245
}
170-
initialized_.store(true, std::memory_order_release);
171-
livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION);
246+
247+
try {
248+
livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION);
249+
} catch (...) {
250+
lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release);
251+
throw;
252+
}
253+
254+
lifecycle_state_.store(LifecycleState::Initialized, std::memory_order_release);
172255
return true;
173256
}
174257

175-
bool FfiClient::isInitialized() const noexcept { return initialized_.load(std::memory_order_acquire); }
258+
bool FfiClient::isInitialized() const noexcept {
259+
return lifecycle_state_.load(std::memory_order_acquire) == LifecycleState::Initialized;
260+
}
176261

177262
FfiClient::ListenerId FfiClient::addListener(const FfiClient::Listener& listener) {
178263
const std::scoped_lock<std::mutex> guard(lock_);
264+
if (lifecycle_state_.load(std::memory_order_acquire) == LifecycleState::ShuttingDown) {
265+
logAndThrow("FfiClient::addListener failed: LiveKit is shutting down");
266+
}
179267
const FfiClient::ListenerId id = next_listener_id++;
180-
listeners_[id] = listener;
268+
listeners_[id] = std::make_shared<ListenerSlot>(listener);
181269
return id;
182270
}
183271

184272
void FfiClient::removeListener(ListenerId id) {
185-
const std::scoped_lock<std::mutex> guard(lock_);
186-
listeners_.erase(id);
273+
std::shared_ptr<ListenerSlot> slot;
274+
{
275+
const std::scoped_lock<std::mutex> guard(lock_);
276+
auto it = listeners_.find(id);
277+
if (it == listeners_.end()) {
278+
return;
279+
}
280+
slot = std::move(it->second);
281+
listeners_.erase(it);
282+
}
283+
284+
const auto this_thread = std::this_thread::get_id();
285+
std::unique_lock<std::mutex> slot_lock(slot->mutex);
286+
slot->cv.wait(slot_lock, [&slot, this_thread] {
287+
const auto self_active = slot->active_threads.count(this_thread) != 0;
288+
return slot->active_callbacks == 0 || (self_active && slot->active_callbacks == 1);
289+
});
290+
slot->removed = true;
187291
}
188292

189293
proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) const {
@@ -221,9 +325,12 @@ proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) cons
221325

222326
void FfiClient::pushEvent(const proto::FfiEvent& event) const {
223327
std::unique_ptr<PendingBase> to_complete;
224-
std::vector<Listener> listeners_copy;
328+
std::vector<std::shared_ptr<ListenerSlot>> listeners_copy;
225329
{
226330
const std::scoped_lock<std::mutex> guard(lock_);
331+
if (lifecycle_state_.load(std::memory_order_acquire) != LifecycleState::Initialized) {
332+
return;
333+
}
227334

228335
// Complete pending future if this event is a callback with async_id
229336
if (auto async_id = ExtractAsyncId(event)) {
@@ -246,8 +353,41 @@ void FfiClient::pushEvent(const proto::FfiEvent& event) const {
246353
}
247354

248355
// Notify listeners outside lock
249-
for (auto& listener : listeners_copy) {
250-
listener(event);
356+
for (const auto& slot : listeners_copy) {
357+
Listener listener;
358+
const auto this_thread = std::this_thread::get_id();
359+
{
360+
const std::scoped_lock<std::mutex> slot_guard(slot->mutex);
361+
if (slot->removed) {
362+
continue;
363+
}
364+
++slot->active_callbacks;
365+
++slot->active_threads[this_thread];
366+
listener = slot->listener;
367+
}
368+
369+
try {
370+
listener(event);
371+
} catch (const std::exception& e) {
372+
LK_LOG_ERROR("FfiClient listener threw: {}", e.what());
373+
} catch (...) {
374+
LK_LOG_ERROR("FfiClient listener threw: unknown exception");
375+
}
376+
377+
{
378+
const std::scoped_lock<std::mutex> slot_guard(slot->mutex);
379+
const auto thread_it = slot->active_threads.find(this_thread);
380+
if (thread_it != slot->active_threads.end()) {
381+
--thread_it->second;
382+
if (thread_it->second == 0) {
383+
slot->active_threads.erase(thread_it);
384+
}
385+
}
386+
--slot->active_callbacks;
387+
}
388+
389+
// Notify in case this listener was marked for removal during the callback (will be waiting on this)
390+
slot->cv.notify_all();
251391
}
252392
}
253393

src/ffi_client.h

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
#pragma once
1818

1919
#include <atomic>
20+
#include <condition_variable>
2021
#include <cstdint>
2122
#include <functional>
2223
#include <future>
2324
#include <memory>
2425
#include <mutex>
2526
#include <optional>
2627
#include <stdexcept>
28+
#include <thread>
2729
#include <unordered_map>
30+
#include <utility>
2831

2932
#include "data_track.pb.h"
3033
#include "livekit/data_track_error.h"
@@ -147,6 +150,15 @@ class LIVEKIT_INTERNAL_API FfiClient {
147150
private:
148151
FfiClient() = default;
149152

153+
/// Lifecycle state of the FfiClient
154+
/// This is used to prevent race conditions/use-after-free scenarios
155+
enum class LifecycleState : std::uint8_t {
156+
Uninitialized,
157+
Initializing,
158+
Initialized,
159+
ShuttingDown,
160+
};
161+
150162
// Base class for type-erased pending ops
151163
struct PendingBase {
152164
AsyncId async_id = 0; // Client-generated async ID for cancellation
@@ -176,6 +188,25 @@ class LIVEKIT_INTERNAL_API FfiClient {
176188
}
177189
};
178190

191+
/// Additional data structure to track listener callbacks and their state.
192+
/// This is used to coordinate the FFI thread and the app thread, and prevent race conditions/use-after-free scenarios
193+
struct ListenerSlot {
194+
explicit ListenerSlot(Listener cb) : listener(std::move(cb)) {}
195+
196+
/// The user-provided listener callback
197+
Listener listener;
198+
/// Mutex to protect the listener slot
199+
std::mutex mutex;
200+
/// Condition variable to wait for the listener to finish
201+
std::condition_variable cv;
202+
/// Map of thread IDs to the number of active threads
203+
std::unordered_map<std::thread::id, int> active_threads;
204+
/// Number of active callbacks
205+
int active_callbacks = 0;
206+
/// Whether the listener has been removed (used for race mitigation before removal)
207+
bool removed = false;
208+
};
209+
179210
template <typename T>
180211
std::future<T> registerAsync(AsyncId async_id, std::function<bool(const proto::FfiEvent&)> match,
181212
std::function<void(const proto::FfiEvent&, std::promise<T>&)> handler);
@@ -187,14 +218,18 @@ class LIVEKIT_INTERNAL_API FfiClient {
187218
// removed.
188219
bool cancelPendingByAsyncId(AsyncId async_id);
189220

190-
std::unordered_map<ListenerId, Listener> listeners_;
221+
/// Map of listener IDs to listener slots
222+
std::unordered_map<ListenerId, std::shared_ptr<ListenerSlot>> listeners_;
223+
/// Next listener ID to generate
191224
std::atomic<ListenerId> next_listener_id{1};
192225
mutable std::mutex lock_;
226+
/// Map of async IDs to pending operations
193227
mutable std::unordered_map<AsyncId, std::unique_ptr<PendingBase>> pending_by_id_;
228+
/// Next async ID to generate
194229
std::atomic<AsyncId> next_async_id_{1};
195230

196231
void pushEvent(const proto::FfiEvent& event) const;
197232
friend void ffiEventCallback(const uint8_t* buf, size_t len);
198-
std::atomic<bool> initialized_{false};
233+
std::atomic<LifecycleState> lifecycle_state_{LifecycleState::Uninitialized};
199234
};
200235
} // namespace livekit

0 commit comments

Comments
 (0)