Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.5.10"
version = "7.5.11"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
45 changes: 31 additions & 14 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,19 +645,6 @@ void RaftReplService::start_repl_service_timers() {
gc_repl_devs();
});

// Check for queued fetches at the minimum every second
uint64_t interval_ns = std::min(
HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000);
m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer(interval_ns, true /* recurring */, nullptr,
[this](void*, uint64_t exp_count) {
if (exp_count > 1) {
LOGINFOMOD(replication,
"fetch pending data timer expired {} times, running once",
exp_count);
}
fetch_pending_data();
});

// Flush durable commit lsns to superblock
// FIXUP: what is the best value for flush_durable_commit_interval_ms?
m_flush_durable_commit_timer_hdl = iomanager.schedule_thread_timer(
Expand All @@ -682,13 +669,43 @@ void RaftReplService::start_repl_service_timers() {
}
});
latch.wait();

// Dedicated reactor for fetch_pending_data so that a blocking flush_durable_commit_lsn() or
// GC on the reaper fiber cannot delay queued fetch batches past consensus.data_receive_timeout_ms
// (default 10s), which would otherwise cause "Data fetch timeout" assertion / TIMEOUT errors.
std::latch fetcher_latch{1};
iomanager.create_reactor("raft_repl_fetcher", iomgr::INTERRUPT_LOOP, 1u,
[this, &fetcher_latch](bool is_started) {
if (is_started) {
m_fetcher_fiber = iomanager.iofiber_self();
// Check for queued fetches at the minimum every second
uint64_t interval_ns = std::min(
HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000,
1ul * 1000 * 1000 * 1000);
m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer(
interval_ns, true /* recurring */, nullptr,
[this](void*, uint64_t exp_count) {
if (exp_count > 1) {
LOGINFOMOD(replication,
"fetch pending data timer expired {} times, running once",
exp_count);
}
fetch_pending_data();
});
fetcher_latch.count_down();
}
});
fetcher_latch.wait();
}

void RaftReplService::stop_repl_service_timers() {
iomanager.run_on_wait(m_fetcher_fiber, [this]() {
LOGINFOMOD(replication, "Fetcher Thread: Stopping timer");
iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true);
});
iomanager.run_on_wait(m_reaper_fiber, [this]() {
LOGINFOMOD(replication, "Reaper Thread: Stopping timers");
iomanager.cancel_timer(m_rdev_gc_timer_hdl, true);
iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true);
iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true);
iomanager.cancel_timer(m_replace_member_sync_check_timer_hdl, true);
});
Expand Down
4 changes: 4 additions & 0 deletions src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class RaftReplService : public GenericReplService,
iomgr::timer_handle_t m_flush_durable_commit_timer_hdl;
iomgr::timer_handle_t m_replace_member_sync_check_timer_hdl;
iomgr::io_fiber_t m_reaper_fiber;
// Dedicated fiber for the fetch_pending_data timer. Kept separate from the reaper fiber so that
// a slow flush_durable_commit_lsn() (which does synchronous metablk I/O under m_rd_map_mtx) cannot
// delay queued fetch batches past consensus.data_receive_timeout_ms.
iomgr::io_fiber_t m_fetcher_fiber;
std::atomic< int32_t > restart_counter{0};
std::mutex raft_restart_mutex;

Expand Down
Loading