Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ class Connection
//! ThreadMap.makeThread) used to service requests to clients.
::capnp::CapabilityServerSet<Thread> m_threads;

//! Hook called on the worker thread inside makeThread, right after
//! set_value. Used by tests to verify the waiter mutex is held when
//! the thread context is published.
std::function<void()> testing_hook_makethread;

//! Canceler for canceling promises that we want to discard when the
//! connection is destroyed. This is used to interrupt method calls that are
//! still executing at time of disconnection.
Expand Down
5 changes: 5 additions & 0 deletions include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ struct ProxyContext
Connection* connection;
EventLoopRef loop;
CleanupList cleanup_fns;
//! Hook called on the worker thread just before loop->sync() in PassField
//! for Context arguments. Used by tests to inject precise disconnect timing.
std::function<void()> testing_hook_before_sync;
//! Hook called on the worker thread just before returning results.
std::function<void()> testing_hook_after_cleanup;

ProxyContext(Connection* connection);
};
Expand Down
22 changes: 18 additions & 4 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
kj::Promise<typename ServerContext::CallContext>>::type
{
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto& server = server_context.proxy_server;
int req = server_context.req;
// Keep a reference to the ProxyServer instance by assigning it to the self
Expand All @@ -74,8 +72,10 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto self = server.thisCap();
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
if (server.m_context.testing_hook_before_sync) server.m_context.testing_hook_before_sync();
// Save testing_hook_after_cleanup to a local because the
// server may be freed in the cleanup sync() below.
auto testing_hook_after_cleanup = std::move(server.m_context.testing_hook_after_cleanup);
ServerContext server_context{server, call_context, req};
{
// Before invoking the function, store a reference to the
Expand Down Expand Up @@ -127,6 +127,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
server_context.request_canceled = true;
};
// Update requests_threads map if not canceled.
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
Expand All @@ -153,6 +155,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
// Clear cancellation callback. At this point the
// method invocation finished and the result is
// either being returned, or discarded if a
// cancellation happened. So we do not need to be
// notified of cancellations after this point. Also
// we do not want to be notified because
// cancel_mutex and server_context could be out of
// scope when it happens.
cancel_monitor.m_on_cancel = nullptr;
auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
Expand Down Expand Up @@ -183,12 +194,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
}
// End of scope: if KJ_DEFER was reached, it runs here
}
if (testing_hook_after_cleanup) testing_hook_after_cleanup();
return call_context;
};

// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto thread_client = context_arg.getThread();
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
Expand Down
3 changes: 2 additions & 1 deletion src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,9 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
Lock lock(g_thread_context.waiter->m_mutex);
thread_context.set_value(&g_thread_context);
if (this->m_connection.testing_hook_makethread) this->m_connection.testing_hook_makethread();
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
// is just waiter getting set to null.)
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
Expand Down
107 changes: 107 additions & 0 deletions test/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <kj/memory.h>
#include <kj/test.h>
#include <memory>
#include <mutex>
#include <mp/proxy.h>
#include <mp/proxy.capnp.h>
#include <mp/proxy-io.h>
Expand Down Expand Up @@ -325,6 +326,112 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
signal.set_value();
}

KJ_TEST("Worker thread destroyed before it is initialized")
{
// Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756
// (worker thread destroyed before it acquires the waiter mutex). The
// fix acquires the lock before calling set_value so the
// ProxyServer<Thread> destructor cannot null the waiter while the
// worker is between set_value and Lock.
//
// The testing_hook_makethread fires right after set_value in
// makeThread's worker thread. A checker thread uses try_lock to
// verify the waiter mutex is held at that point. With the fix
// (Lock before set_value) the mutex is held, so try_lock fails.
// Without the fix (set_value before Lock) the hook fires before
// Lock, so try_lock succeeds, indicating the race window exists.
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

std::promise<std::mutex*> mutex_promise;
std::promise<void> check_done;
Connection* conn = setup.server->m_context.connection;
conn->testing_hook_makethread = [&] {
mutex_promise.set_value(&g_thread_context.waiter->m_mutex.m_mutex);
check_done.get_future().wait();
};

std::atomic<bool> lock_was_held{false};
std::thread check_thread{[&] {
std::mutex* m = mutex_promise.get_future().get();
bool locked = m->try_lock();
if (locked) m->unlock();
lock_was_held = !locked;
check_done.set_value();
}};

foo->callFnAsync();
check_thread.join();
KJ_EXPECT(lock_was_held);
}

KJ_TEST("Calling async IPC method, with server disconnect racing the call")
{
// Regression test for bitcoin/bitcoin#34777 (heap-use-after-free where
// getParams() was called on the worker thread after the event loop thread
// freed the RpcCallContext on disconnect). The fix moves getParams() inside
// loop->sync() so it always runs on the event loop thread.
//
// Use testing_hook_before_sync to pause the worker thread just before it
// enters loop->sync(), then disconnect the server from a separate thread.
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

std::promise<void> worker_ready;
std::promise<void> disconnect_done;
auto disconnect_done_future = disconnect_done.get_future().share();
setup.server->m_context.testing_hook_before_sync = [&worker_ready, disconnect_done_future] {
worker_ready.set_value();
disconnect_done_future.wait();
};

std::thread disconnect_thread{[&] {
worker_ready.get_future().wait();
setup.server_disconnect();
disconnect_done.set_value();
}};

try {
foo->callFnAsync();
KJ_EXPECT(false);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
}
disconnect_thread.join();
}

KJ_TEST("Calling async IPC method, with server disconnect after cleanup")
{
// Regression test for bitcoin/bitcoin#34782 (stack-use-after-return where
// the m_on_cancel callback accessed stack-local cancel_mutex and
// server_context after the invoke lambda's inner scope exited). The fix
// clears m_on_cancel in the cleanup loop->sync() so it is null by the
// time the scope exits.
//
// Use testing_hook_after_cleanup to trigger a server disconnect after the
// inner scope exits (cancel_mutex destroyed). Without the fix, the
// disconnect fires m_on_cancel which accesses the destroyed mutex.
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

setup.server->m_context.testing_hook_after_cleanup = [&] {
setup.server_disconnect();
};

try {
foo->callFnAsync();
KJ_EXPECT(false);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
}
}

KJ_TEST("Make simultaneous IPC calls on single remote thread")
{
TestSetup setup;
Expand Down
Loading