diff --git a/CHANGELOG.md b/CHANGELOG.md index 1066eba2..50a1a277 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [14.5.0] - 2026-06-03 +- Shared awaitable + ## [14.1.0] - 2026-05-10 - Add header-only async building blocks for sender-driven storage code. cqe_state bridges io_uring CQEs to callback-owned operation state; io_uring_scheduler exposes single-threaded schedule_at() and async_submit() senders on a caller-owned ring, batches submission in poll_once(), and flushes pending SQEs when the diff --git a/conanfile.py b/conanfile.py index 33e1e2ea..fd03849f 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class SISLConan(ConanFile): name = "sisl" - version = "14.4.1" + version = "14.5.0" homepage = "https://github.com/eBay/sisl" description = "Library for fast data structures, utilities" diff --git a/include/sisl/async/shared_awaitable.hpp b/include/sisl/async/shared_awaitable.hpp new file mode 100644 index 00000000..3ebec4a2 --- /dev/null +++ b/include/sisl/async/shared_awaitable.hpp @@ -0,0 +1,98 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace sisl::async { + +// Cross-thread, multi-consumer (broadcast) awaitable carrying a value of type T. +// +// The multi-waiter generalization of value_awaitable: where value_awaitable resumes exactly ONE waiter and +// MOVES its result out (single-shot), shared_awaitable resumes ALL installed waiters and hands each a COPY of +// the result (broadcast). It is the folly::SharedPromise replacement -- the pattern where several callers +// await the same in-flight operation and all observe its single completion (e.g. N threads triggering one +// checkpoint flush, or several openers of one log store). T must therefore be copyable. +// +// USAGE: one producer holds the shared_awaitable (typically inside a std::shared_ptr) and calls complete(value) +// once when the operation finishes. Each consumer co_awaits the SAME object (the object IS the awaitable, like +// value_awaitable) -- usually a coroutine that captures a std::shared_ptr to it so it stays alive across the +// suspend. A consumer that co_awaits AFTER completion takes the fast path and resumes inline. +// +// THREAD SAFETY: complete() and any number of await_suspend()/await_resume() may run on different threads. A +// single mutex guards the waiter list and the result; resumes are performed OUTSIDE the lock (the waiter list +// is swapped out under the lock, then drained) so a resumed coroutine can re-enter (await again, or complete a +// nested awaitable) without self-deadlock. The handshake has no lost-wakeup: a consumer whose await_suspend +// races a concurrent complete() either gets installed-then-resumed, or observes _done and resumes itself -- +// exactly once either way. +// +// LIFETIME: the object must outlive every waiter's resume AND any late (post-completion) co_await. complete() +// resumes installed waiters inline, so they have run past await_resume by the time it returns; the canonical +// arrangement is a std::shared_ptr held by BOTH the producer and each awaiting coroutine frame, so the last +// reference (producer or consumer) keeps the result readable. +// +// EXACTLY-ONCE: complete() must be called; a second call is a no-op (the broadcast already fired). await_resume +// copies the result, so the awaitable can be observed any number of times after completion. +// +// EXCEPTION DISCIPLINE: complete() resumes waiters via handle.resume(); an exception escaping a resumed frame +// propagates out of complete() (same contract as value_awaitable). A producer running on a noexcept completion +// boundary must ensure the resumed coroutine bodies cannot throw across it. +template < typename T > +struct shared_awaitable { + mutable std::mutex _mtx{}; + bool _done{false}; + std::optional< T > _result{}; + std::vector< std::coroutine_handle<> > _waiters{}; + + shared_awaitable() = default; + shared_awaitable(const shared_awaitable&) = delete; + shared_awaitable& operator=(const shared_awaitable&) = delete; + + // Producer side (any thread). Publishes the value, marks done, and resumes every installed waiter. The + // waiters are swapped out under the lock and resumed outside it so a resumed coroutine may re-enter safely. + // A second complete() is ignored. + void complete(T value) { + std::vector< std::coroutine_handle<> > to_resume; + { + std::lock_guard lg{_mtx}; + if (_done) { return; } + _result.emplace(std::move(value)); + _done = true; + to_resume.swap(_waiters); + } + for (auto h : to_resume) { + h.resume(); + } + } + + // True once complete() has fired. Also the awaiter fast-path check. + [[nodiscard]] bool is_ready() const { + std::lock_guard lg{_mtx}; + return _done; + } + + // ----- awaiter interface (the object is co_await-ed directly; many coroutines may await one object) ----- + + [[nodiscard]] bool await_ready() const { + std::lock_guard lg{_mtx}; + return _done; + } + + // Install this coroutine as a waiter, unless completion already landed (return false -> resume immediately). + bool await_suspend(std::coroutine_handle<> h) { + std::lock_guard lg{_mtx}; + if (_done) { return false; } + _waiters.push_back(h); + return true; + } + + // Broadcast: hand back a COPY so every waiter observes the result independently. + T await_resume() const { + std::lock_guard lg{_mtx}; + return *_result; + } +}; + +} // namespace sisl::async diff --git a/src/async/CMakeLists.txt b/src/async/CMakeLists.txt index aa3f85ad..066fc30a 100644 --- a/src/async/CMakeLists.txt +++ b/src/async/CMakeLists.txt @@ -17,6 +17,13 @@ target_sources(test_value_awaitable PRIVATE tests/test_value_awaitable.cpp) target_link_libraries(test_value_awaitable PRIVATE GTest::gtest GTest::gtest_main) add_test(NAME ValueAwaitable COMMAND test_value_awaitable) +# shared_awaitable is the multi-consumer (broadcast) sibling of value_awaitable (the folly::SharedPromise +# replacement); also header-only and stdexec-free. +add_executable(test_shared_awaitable) +target_sources(test_shared_awaitable PRIVATE tests/test_shared_awaitable.cpp) +target_link_libraries(test_shared_awaitable PRIVATE GTest::gtest GTest::gtest_main) +add_test(NAME SharedAwaitable COMMAND test_shared_awaitable) + find_package(stdexec QUIET) if (TARGET stdexec::stdexec) add_executable(test_cqe_awaitable) diff --git a/src/async/tests/test_shared_awaitable.cpp b/src/async/tests/test_shared_awaitable.cpp new file mode 100644 index 00000000..70f5cd22 --- /dev/null +++ b/src/async/tests/test_shared_awaitable.cpp @@ -0,0 +1,135 @@ +// Unit tests for sisl::async::shared_awaitable -- the multi-consumer (broadcast) generalization of +// value_awaitable used to replace folly::SharedPromise (N callers awaiting one in-flight completion). +// +// Mirrors test_value_awaitable.cpp: std::noop_coroutine() stands in for suspended coroutine handles (no real +// frames), and the protocol is driven through the public complete()/await_* API so both completion orderings +// and the broadcast (many waiters, one complete) are exercised deterministically. Header-only, stdexec-free. + +#include +#include +#include +#include +#include +#include + +#include + +#include + +namespace { + +using string_await = sisl::async::shared_awaitable< std::string >; + +// not-ready before completion +TEST(shared_awaitable, AwaitReadyFalseWhenNotReady) { + string_await state{}; + EXPECT_FALSE(state.await_ready()); + EXPECT_FALSE(state.is_ready()); +} + +// synchronous fast path: completion BEFORE the consumer co_awaits +TEST(shared_awaitable, CompleteThenAwaitReadyTrue) { + string_await state{}; + state.complete("hello"); + EXPECT_TRUE(state.await_ready()); + EXPECT_EQ(state.await_resume(), "hello"); +} + +// suspend-then-complete: consumer suspends first, completer resumes it +TEST(shared_awaitable, SuspendThenCompleteResumesWaiter) { + string_await state{}; + auto const h = std::noop_coroutine(); + EXPECT_TRUE(state.await_suspend(h)); // not yet completed -> stay suspended + state.complete("world"); + EXPECT_EQ(state.await_resume(), "world"); +} + +// complete-then-suspend race: completion lands before await_suspend installs the waiter; no lost wakeup. +TEST(shared_awaitable, CompleteThenSuspendDoesNotSuspend) { + string_await state{}; + state.complete("early"); + auto const h = std::noop_coroutine(); + EXPECT_FALSE(state.await_suspend(h)); // do not suspend; result already available + EXPECT_EQ(state.await_resume(), "early"); +} + +// broadcast: MANY waiters installed before completion are ALL resumed, each observing a copy. +TEST(shared_awaitable, BroadcastResumesAllWaiters) { + sisl::async::shared_awaitable< int > state{}; + + // A handful of distinct suspended coroutines (noop handles all compare/resume fine). + constexpr int kWaiters = 8; + for (int i = 0; i < kWaiters; ++i) { + EXPECT_TRUE(state.await_suspend(std::noop_coroutine())); + } + state.complete(77); // single completion fans out to all installed waiters + + // Every (late) observation returns the same broadcast value -- the result was copied, not moved out. + for (int i = 0; i < kWaiters; ++i) { + EXPECT_EQ(state.await_resume(), 77); + } +} + +// second complete() is a no-op (the broadcast already fired) +TEST(shared_awaitable, SecondCompleteIsIgnored) { + sisl::async::shared_awaitable< int > state{}; + state.complete(1); + state.complete(2); // ignored + EXPECT_TRUE(state.await_ready()); + EXPECT_EQ(state.await_resume(), 1); +} + +// copyable payload broadcasts to multiple late observers (the shared_ptr shape) +TEST(shared_awaitable, CopyablePayloadBroadcasts) { + sisl::async::shared_awaitable< std::shared_ptr< int > > state{}; + state.complete(std::make_shared< int >(123)); + auto a = state.await_resume(); + auto b = state.await_resume(); + ASSERT_TRUE(a); + ASSERT_TRUE(b); + EXPECT_EQ(a.get(), b.get()); // same underlying object, shared + EXPECT_EQ(*a, 123); +} + +// refcounted shared state: the producer drops its ref after completing while a consumer ref keeps it readable. +TEST(shared_awaitable, SharedStateSurvivesProducerDestruction) { + auto state = std::make_shared< string_await >(); + { + auto producer_ref = state; + producer_ref->complete("persisted"); + } + ASSERT_EQ(state.use_count(), 1); + EXPECT_TRUE(state->await_ready()); + EXPECT_EQ(state->await_resume(), "persisted"); +} + +// Cross-thread broadcast: several threads install waiters while one delivers the completion. Every waiter is +// resumed exactly once and observes the value without a data race. Run under TSAN to catch ordering regressions. +TEST(shared_awaitable, CrossThreadBroadcastIsRaceFree) { + constexpr int kIters = 1000; + for (int i = 0; i < kIters; ++i) { + sisl::async::shared_awaitable< int > state{}; + std::atomic< bool > go{false}; + + std::vector< std::thread > waiters; + for (int w = 0; w < 4; ++w) { + waiters.emplace_back([&] { + while (!go.load(std::memory_order_acquire)) {} + (void)state.await_suspend(std::noop_coroutine()); + }); + } + std::thread completer([&] { + while (!go.load(std::memory_order_acquire)) {} + state.complete(i); + }); + + go.store(true, std::memory_order_release); + for (auto& t : waiters) { + t.join(); + } + completer.join(); + EXPECT_EQ(state.await_resume(), i); + } +} + +} // namespace