diff --git a/conanfile.py b/conanfile.py index 87a52abba..4ec340dfb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.5.10" + version = "7.5.11" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index f59c77d3e..a83d8e00a 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -645,19 +645,6 @@ void RaftReplService::start_repl_service_timers() { gc_repl_devs(); }); - // Check for queued fetches at the minimum every second - uint64_t interval_ns = std::min( - HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000); - m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer(interval_ns, true /* recurring */, nullptr, - [this](void*, uint64_t exp_count) { - if (exp_count > 1) { - LOGINFOMOD(replication, - "fetch pending data timer expired {} times, running once", - exp_count); - } - fetch_pending_data(); - }); - // Flush durable commit lsns to superblock // FIXUP: what is the best value for flush_durable_commit_interval_ms? m_flush_durable_commit_timer_hdl = iomanager.schedule_thread_timer( @@ -682,13 +669,43 @@ void RaftReplService::start_repl_service_timers() { } }); latch.wait(); + + // Dedicated reactor for fetch_pending_data so that a blocking flush_durable_commit_lsn() or + // GC on the reaper fiber cannot delay queued fetch batches past consensus.data_receive_timeout_ms + // (default 10s), which would otherwise cause "Data fetch timeout" assertion / TIMEOUT errors. + std::latch fetcher_latch{1}; + iomanager.create_reactor("raft_repl_fetcher", iomgr::INTERRUPT_LOOP, 1u, + [this, &fetcher_latch](bool is_started) { + if (is_started) { + m_fetcher_fiber = iomanager.iofiber_self(); + // Check for queued fetches at the minimum every second + uint64_t interval_ns = std::min( + HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, + 1ul * 1000 * 1000 * 1000); + m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer( + interval_ns, true /* recurring */, nullptr, + [this](void*, uint64_t exp_count) { + if (exp_count > 1) { + LOGINFOMOD(replication, + "fetch pending data timer expired {} times, running once", + exp_count); + } + fetch_pending_data(); + }); + fetcher_latch.count_down(); + } + }); + fetcher_latch.wait(); } void RaftReplService::stop_repl_service_timers() { + iomanager.run_on_wait(m_fetcher_fiber, [this]() { + LOGINFOMOD(replication, "Fetcher Thread: Stopping timer"); + iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true); + }); iomanager.run_on_wait(m_reaper_fiber, [this]() { LOGINFOMOD(replication, "Reaper Thread: Stopping timers"); iomanager.cancel_timer(m_rdev_gc_timer_hdl, true); - iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true); iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true); iomanager.cancel_timer(m_replace_member_sync_check_timer_hdl, true); }); diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index 59fe786f8..1fdd62134 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -58,6 +58,10 @@ class RaftReplService : public GenericReplService, iomgr::timer_handle_t m_flush_durable_commit_timer_hdl; iomgr::timer_handle_t m_replace_member_sync_check_timer_hdl; iomgr::io_fiber_t m_reaper_fiber; + // Dedicated fiber for the fetch_pending_data timer. Kept separate from the reaper fiber so that + // a slow flush_durable_commit_lsn() (which does synchronous metablk I/O under m_rd_map_mtx) cannot + // delay queued fetch batches past consensus.data_receive_timeout_ms. + iomgr::io_fiber_t m_fetcher_fiber; std::atomic< int32_t > restart_counter{0}; std::mutex raft_restart_mutex;