Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeBlocksConan(ConanFile):
name = "homeblocks"
version = "6.0.0"
version = "6.0.1"

homepage = "https://github.com/eBay/HomeBlocks"
description = "Block Store built on HomeStore"
Expand Down
14 changes: 11 additions & 3 deletions src/include/homeblks/home_blocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,18 @@ ENUM(volume_state, uint32_t,
DESTROYED, // fully torn down
READONLY);

// Selects the replication backend for a volume.
// DISABLED — existing ReplDev (raid1-style, data through RAFT log).
// CRAFT — new CraftReplDev (data via client broadcast; RAFT carries only sync-LSN and login entries).
ENUM(replication_mode, uint8_t, DISABLED = 1, CRAFT);

struct volume_info {
volume_id_t id;
uint64_t size_bytes{0};
uint64_t page_size{0}; // logical block size for this volume (a per-volume runtime setting)
std::string name;
uint64_t ordinal{0}; // internal: chunk-selector ordinal, assigned by homeblocks on create/recover
replication_mode repl_mode{replication_mode::DISABLED};

volume_info() = default;
volume_info(const volume_info&) = delete;
Expand All @@ -90,7 +96,8 @@ struct volume_info {
size_bytes(rhs.size_bytes),
page_size(rhs.page_size),
name(std::move(rhs.name)),
ordinal(rhs.ordinal) {}
ordinal(rhs.ordinal),
repl_mode(rhs.repl_mode) {}
volume_info(volume_id_t id_in, uint64_t size, uint64_t psize, std::string in_name) :
id(id_in), size_bytes(size), page_size(psize), name(std::move(in_name)) {}
volume_info(volume_id_t id_in, uint64_t size, uint64_t psize, std::string in_name, uint64_t ord) :
Expand All @@ -101,8 +108,9 @@ struct volume_info {
}
auto operator==(volume_info const& rhs) const { return id == rhs.id; }
std::string to_string() const {
return fmt::format("volume_info: id={} size_bytes={}, page_size={}, name={} ordinal={}",
boost::uuids::to_string(id), size_bytes, page_size, name, ordinal);
return fmt::format("volume_info: id={} size_bytes={}, page_size={}, name={} ordinal={} repl_mode={}",
boost::uuids::to_string(id), size_bytes, page_size, name, ordinal,
enum_name(repl_mode));
}
};

Expand Down
1 change: 1 addition & 0 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ settings_gen_cpp(

#add_subdirectory(homestore_backend)
#add_subdirectory(memory_backend)
add_subdirectory(craft)
add_subdirectory(volume)
add_subdirectory(tests)
9 changes: 9 additions & 0 deletions src/lib/craft/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
cmake_minimum_required (VERSION 3.11)

add_library(${PROJECT_NAME}_craft OBJECT)
target_sources(${PROJECT_NAME}_craft PRIVATE
craft_repl_dev.cpp
)
target_link_libraries(${PROJECT_NAME}_craft
${COMMON_DEPS}
)
110 changes: 110 additions & 0 deletions src/lib/craft/craft_repl_dev.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*********************************************************************************
* Modifications Copyright 2026 eBay Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/

#include "craft_repl_dev.hpp"

namespace homeblocks {

// ─── constructor ──────────────────────────────────────────────────────────────

CraftReplDev::CraftReplDev(volume_id_t vol_id, unique< CraftJournalBackend > journal)
: vol_id_{vol_id}, journal_{std::move(journal)}, raft_listener_{this} {}

// ─── get_lsns / get_rs_commit_lsn ────────────────────────────────────────────
// These are real: they just snapshot the in-memory partition state.

async_result< LSNPair > CraftReplDev::get_lsns(volume_id_t /* vol_id */) {
co_return LSNPair{state_.commit_lsn, state_.last_append_lsn};
}

async_result< LSNPair > CraftReplDev::get_rs_commit_lsn() {
co_return LSNPair{state_.commit_lsn, state_.last_append_lsn};
}

// ─── stubs (S2–S7 will implement these) ──────────────────────────────────────

async_result< LoginResult > CraftReplDev::login(uint64_t /* client_token */,
volume_id_t /* vol_id */) {
LOGW("CraftReplDev::login not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

async_status CraftReplDev::write(uint64_t /* term */, int64_t /* lsn */, int64_t /* glsn */,
lba_t /* lba */, lba_count_t /* len */,
sisl::sg_list /* data */) {
LOGW("CraftReplDev::write not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

async_result< sisl::sg_list > CraftReplDev::read(uint64_t /* term */,
int64_t /* min_commit_lsn */,
lba_t /* lba */, lba_count_t /* len */) {
LOGW("CraftReplDev::read not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

async_status CraftReplDev::commit(uint64_t /* term */, int64_t /* lsn */) {
LOGW("CraftReplDev::commit not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

async_status CraftReplDev::keep_alive(int64_t /* commit_lsn */) {
LOGW("CraftReplDev::keep_alive not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

async_status CraftReplDev::truncate(int64_t /* lsn */) {
LOGW("CraftReplDev::truncate not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

async_status CraftReplDev::append(int64_t /* sync_to */, uint64_t /* client_token */) {
LOGW("CraftReplDev::append not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

async_result< std::vector< JournalSlot > >
CraftReplDev::fetch_data(std::vector< int64_t > /* lsns */) {
LOGW("CraftReplDev::fetch_data not yet implemented");
co_return std::unexpected(std::make_error_condition(std::errc::not_supported));
}

// ─── RAFT listener ────────────────────────────────────────────────────────────

void CraftReplDev::CraftRaftListener::on_commit(
int64_t lsn, sisl::blob const& /* header */, sisl::blob const& /* key */,
std::vector< homestore::multi_blk_id > const& /* blkids */,
cintrusive< homestore::repl_req_ctx >& /* ctx */) {
// S5 will parse the entry type from `header` and dispatch to
// owner_->apply_sync_rs_commit_lsn() or owner_->apply_internal_login().
LOGD("CraftRaftListener::on_commit lsn={} (entry dispatch not yet implemented)", lsn);
}

// ─── RAFT apply helpers (S5 will implement) ───────────────────────────────────

void CraftReplDev::apply_sync_rs_commit_lsn(int64_t rs_commit_lsn,
uint64_t /* client_token */) {
// Advance commit_lsn to rs_commit_lsn, calling fetch_data() for any missing slots.
LOGD("apply_sync_rs_commit_lsn rs_commit_lsn={} (not yet implemented)", rs_commit_lsn);
}

void CraftReplDev::apply_internal_login(uint64_t client_token, uint64_t term) {
// Update state_.client_token and state_.term; subsequent IOs with a different
// term will be rejected with ETERM.
LOGD("apply_internal_login client_token={} term={} (not yet implemented)", client_token,
term);
}

} // namespace homeblocks
202 changes: 202 additions & 0 deletions src/lib/craft/craft_repl_dev.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*********************************************************************************
* Modifications Copyright 2026 eBay Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/
#pragma once

#include "../hb_internal.hpp"
#include <homestore/replication/repl_dev.hpp>

#include <atomic>
#include <mutex>
#include <set>
#include <vector>

namespace homeblocks {

// ─── wire-protocol types ──────────────────────────────────────────────────────

// Network address of a replica returned by login().
struct replica_endpoint {
peer_id_t id;
std::string addr; // "host:port"
};

// Per-partition in-memory state. Authoritative in memory; recovered from
// journal + superblock on restart.
struct CraftPartitionState {
int64_t commit_lsn {-1}; // highest committed dLSN (applied to LBA index)
int64_t last_append_lsn {-1}; // highest appended dLSN (may be uncommitted)
uint64_t client_token {0}; // token from the last successful InternalLogin
uint64_t term {0}; // current RAFT term
};

// Returned by get_lsns() / get_rs_commit_lsn().
struct LSNPair {
int64_t commit_lsn;
int64_t last_append_lsn;
};

// Returned by login().
struct LoginResult {
std::vector< replica_endpoint > members;
int64_t dLSN; // starting LSN for new I/O
int64_t gLSN; // global (volume-level) LSN
uint64_t term;
};

// One journal slot, returned by fetch_data(). is_empty == true means the LSN
// never reached any replica and the data fields are invalid.
struct JournalSlot {
int64_t lsn;
bool is_empty{false};
lba_t lba{0};
lba_count_t len{0};
sisl::sg_list data;
};

// ─── journal backend abstraction ─────────────────────────────────────────────
//
// Injected into CraftReplDev so unit tests can supply a mock without touching
// HomeStore. Production code passes HomeStoreCraftJournalBackend (defined in
// craft_repl_dev.cpp).

class CraftJournalBackend {
public:
virtual async_status write_slot(int64_t lsn, lba_t lba, lba_count_t len,
sisl::sg_list data) = 0;
virtual async_result<JournalSlot> read_slot(int64_t lsn) = 0;
virtual async_status truncate_to(int64_t lsn) = 0;
virtual ~CraftJournalBackend() = default;
};

// ─── CraftReplDev ─────────────────────────────────────────────────────────────
//
// Parallel to HomeStore's ReplDisk. Each CRAFT-mode volume owns one instance
// instead of the solo repl_dev. Non-CRAFT volumes are unaffected.

class CraftReplDev {
public:
explicit CraftReplDev(volume_id_t vol_id, unique< CraftJournalBackend > journal);
~CraftReplDev() = default;

// ── client-facing ──────────────────────────────────────────────────────

// Full login sequence (leader-only). Serialized: at most one in-flight
// login per partition at a time.
async_result< LoginResult > login(uint64_t client_token, volume_id_t vol_id);

// Append data to the journal at the client-assigned LSN slot. Zero-copy;
// does NOT apply data to the LBA index.
async_status write(uint64_t term, int64_t lsn, int64_t glsn,
lba_t lba, lba_count_t len, sisl::sg_list data);

// Inline-commit up to min_commit_lsn if needed, then serve from the LBA index.
async_result< sisl::sg_list > read(uint64_t term, int64_t min_commit_lsn,
lba_t lba, lba_count_t len);

// Apply journal entries (current_commit, lsn] to the LBA index.
async_status commit(uint64_t term, int64_t lsn);

// Same as commit + reset the client-timeout watchdog.
async_status keep_alive(int64_t commit_lsn);

// ── internal / peer API ────────────────────────────────────────────────

// Return {commit_lsn, last_append_lsn} for the local partition.
async_result< LSNPair > get_lsns(volume_id_t vol_id);

// Alias of get_lsns exposed to peer servers during GetRSCommitLSN broadcast.
async_result< LSNPair > get_rs_commit_lsn();

// Drop all journal entries with dLSN > lsn; clear missing-set entries above lsn.
async_status truncate(int64_t lsn);

// Propose a SyncRSCommitLSN RAFT entry (called by watchdog or leader during login).
async_status append(int64_t sync_to, uint64_t client_token);

// Return raw journal data for the requested LSNs. Empty slots return
// JournalSlot{.is_empty=true} rather than an error.
async_result< std::vector< JournalSlot > > fetch_data(std::vector< int64_t > lsns);

private:
// ── RAFT listener ──────────────────────────────────────────────────────
//
// Handles the two CRAFT RAFT entry types (SyncRSCommitLSN, InternalLogin).
// All other HomeStore callbacks are no-ops for this backend.

class CraftRaftListener : public homestore::repl_dev_listener {
public:
explicit CraftRaftListener(CraftReplDev* owner) : owner_{owner} {}

// Dispatches on entry type; the real work is in the two apply_* helpers below.
void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
std::vector< homestore::multi_blk_id > const& blkids,
cintrusive< homestore::repl_req_ctx >& ctx) override;

// ── no-ops ────────────────────────────────────────────────────────
bool on_pre_commit(int64_t, const sisl::blob&, const sisl::blob&,
cintrusive< homestore::repl_req_ctx >&) override { return true; }
void on_error(homestore::ReplServiceError, const sisl::blob&, const sisl::blob&,
cintrusive< homestore::repl_req_ctx >&) override {}
homestore::result< homestore::blk_alloc_hints >
get_blk_alloc_hints(sisl::blob const&, uint32_t,
cintrusive< homestore::repl_req_ctx >&) override {
return homestore::blk_alloc_hints{};
}
void on_destroy(const homestore::group_id_t&) override {}
void on_start_replace_member(const std::string&, const homestore::replica_member_info&,
const homestore::replica_member_info&,
homestore::trace_id_t) override {}
void on_complete_replace_member(const std::string&, const homestore::replica_member_info&,
const homestore::replica_member_info&,
homestore::trace_id_t) override {}
void on_clean_replace_member_task(const std::string&, const homestore::replica_member_info&,
const homestore::replica_member_info&,
homestore::trace_id_t) override {}
void on_remove_member(const homestore::replica_id_t&, homestore::trace_id_t) override {}
void on_rollback(int64_t, const sisl::blob&, const sisl::blob&,
cintrusive< homestore::repl_req_ctx >&) override {}
void on_restart() override {}
homestore::async_status
create_snapshot(std::shared_ptr< homestore::snapshot_context >) override { co_return homestore::ok(); }
bool apply_snapshot(std::shared_ptr< homestore::snapshot_context >) override { return true; }
std::shared_ptr< homestore::snapshot_context > last_snapshot() override { return nullptr; }
int read_snapshot_obj(std::shared_ptr< homestore::snapshot_context >,
std::shared_ptr< homestore::snapshot_obj >) override { return 0; }
void write_snapshot_obj(std::shared_ptr< homestore::snapshot_context >,
std::shared_ptr< homestore::snapshot_obj >) override {}
void free_user_snp_ctx(void*&) override {}
void on_no_space_left(homestore::repl_lsn_t, sisl::blob const&) override {}
void notify_committed_lsn(int64_t) override {}
void on_config_rollback(int64_t) override {}

private:
CraftReplDev* owner_;
};

// Called from CraftRaftListener::on_commit after deserialising the entry type.
void apply_sync_rs_commit_lsn(int64_t rs_commit_lsn, uint64_t client_token);
void apply_internal_login(uint64_t client_token, uint64_t term);

volume_id_t vol_id_;
unique< CraftJournalBackend > journal_;
CraftPartitionState state_;
std::set< int64_t > missing_lsns_; // gaps between commit_lsn and last_append_lsn
std::mutex missing_mu_;
bool login_in_progress_{false};
std::mutex login_mu_;
CraftRaftListener raft_listener_;
};

} // namespace homeblocks
1 change: 1 addition & 0 deletions src/lib/volume/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ target_sources("${PROJECT_NAME}" PRIVATE
volume.cpp
volume_chunk_selector.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
$<TARGET_OBJECTS:${PROJECT_NAME}_craft>
)

target_link_libraries("${PROJECT_NAME}" PUBLIC
Expand Down
Loading