Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,22 @@ class EventLoop

//! External context pointer.
void* m_context;

//! Hook called when ProxyServer<ThreadMap>::makeThread() is called.
std::function<void()> testing_hook_makethread;

//! Hook called on the worker thread inside makeThread(), after the thread
//! context is set up and thread_context promise is fulfilled, but before it
//! starts waiting for requests.
std::function<void()> testing_hook_makethread_created;

//! Hook called on the worker thread when it starts to execute an async
//! request. Used by tests to control timing or inject behavior at this
//! point in execution.
std::function<void()> testing_hook_async_request_start;

//! Hook called on the worker thread just before returning results.
std::function<void()> testing_hook_async_request_done;
};

//! Single element task queue used to handle recursive capnp calls. (If the
Expand Down
20 changes: 16 additions & 4 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
kj::Promise<typename ServerContext::CallContext>>::type
{
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto& server = server_context.proxy_server;
int req = server_context.req;
// Keep a reference to the ProxyServer instance by assigning it to the self
Expand All @@ -74,8 +72,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto self = server.thisCap();
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
EventLoop& loop = *server.m_context.loop;
if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start();
ServerContext server_context{server, call_context, req};
{
// Before invoking the function, store a reference to the
Expand Down Expand Up @@ -127,6 +125,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
server_context.request_canceled = true;
};
// Update requests_threads map if not canceled.
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
Expand All @@ -153,6 +153,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
// Clear cancellation callback. At this point the
// method invocation finished and the result is
// either being returned, or discarded if a
// cancellation happened. So we do not need to be
// notified of cancellations after this point. Also
// we do not want to be notified because
// cancel_mutex and server_context could be out of
// scope when it happens.
cancel_monitor.m_on_cancel = nullptr;
auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
Expand Down Expand Up @@ -183,12 +192,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
}
// End of scope: if KJ_DEFER was reached, it runs here
}
if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done();
return call_context;
};

// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto thread_client = context_arg.getThread();
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
Expand Down
7 changes: 5 additions & 2 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,16 @@ ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(conne

kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
{
if (m_connection.m_loop->testing_hook_makethread) m_connection.m_loop->testing_hook_makethread();
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
EventLoop& loop{*m_connection.m_loop};
g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
Lock lock(g_thread_context.waiter->m_mutex);
thread_context.set_value(&g_thread_context);
if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
// is just waiter getting set to null.)
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
Expand Down
99 changes: 99 additions & 0 deletions test/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <capnp/capability.h>
#include <capnp/rpc.h>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstring>
Expand Down Expand Up @@ -63,6 +64,7 @@ class TestSetup
{
public:
std::function<void()> server_disconnect;
std::function<void()> server_disconnect_later;
std::function<void()> client_disconnect;
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
Expand All @@ -88,6 +90,10 @@ class TestSetup
return capnp::Capability::Client(kj::mv(server_proxy));
});
server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); };
server_disconnect_later = [&] {
assert(std::this_thread::get_id() == loop.m_thread_id);
loop.m_task_set->add(kj::evalLater([&] { server_connection.reset(); }));
};
// Set handler to destroy the server when the client disconnects. This
// is ignored if server_disconnect() is called instead.
server_connection->onDisconnect([&] { server_connection.reset(); });
Expand Down Expand Up @@ -325,6 +331,99 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
signal.set_value();
}

KJ_TEST("Worker thread destroyed before it is initialized")
{
// Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756
// where worker thread is destroyed before it starts.
//
// The test works by using the `makethread` hook to start a disconnect as
// soon as ProxyServer<ThreadMap>::makeThread is called, and using the
// `makethread_created` hook to sleep 100ms after the thread is created but
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 88cacd4 test: worker thread destroyed before it is initialized: nit, it's only waiting 10ms

// before it starts waiting, so without the bugfix,
// ProxyServer<Thread>::~ProxyServer would run and destroy the waiter,
// causing a SIGSEGV in the worker thread after the sleep.
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

EventLoop& loop = *setup.server->m_context.connection->m_loop;
loop.testing_hook_makethread = [&] {
setup.server_disconnect_later();
};
loop.testing_hook_makethread_created = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
};

bool disconnected{false};
try {
foo->callFnAsync();
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}

KJ_TEST("Calling async IPC method, with server disconnect racing the call")
{
// Regression test for bitcoin/bitcoin#34777 heap-use-after-free where
// an async request is canceled before it starts to execute.
//
// Use testing_hook_async_request_start to trigger a disconnect from the
// worker thread as soon as it begins to execute an async request. Without
// the bugfix, the worker thread would trigger a SIGSEGV after this by
// calling call_context.getParams().
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

EventLoop& loop = *setup.server->m_context.connection->m_loop;
loop.testing_hook_async_request_start = [&] {
setup.server_disconnect();
// Sleep is neccessary to let the event loop fully clean up after the
// disconnect and trigger the SIGSEGV.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
};

try {
foo->callFnAsync();
KJ_EXPECT(false);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
}
}

KJ_TEST("Calling async IPC method, with server disconnect after cleanup")
{
// Regression test for bitcoin/bitcoin#34782 stack-use-after-return where
// an async request is canceled after it finishes executing but before the
// response is sent.
//
// Use testing_hook_async_request_done to trigger a disconnect from the
// worker thread after it execute an async requests but before it returns.
// Without the bugfix, the m_on_cancel callback would be called at this
// point accessing the cancel_mutex stack variable that had gone out of
// scope.
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

EventLoop& loop = *setup.server->m_context.connection->m_loop;
loop.testing_hook_async_request_done = [&] {
setup.server_disconnect();
};

try {
foo->callFnAsync();
KJ_EXPECT(false);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
}
}

KJ_TEST("Make simultaneous IPC calls on single remote thread")
{
TestSetup setup;
Expand Down
Loading