Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/version_change_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ on:
types: [opened, edited, synchronize]
branches:
- stable/v4.x
- dev/refactor_create_seal_shard
jobs:
check-file:
runs-on: ubuntu-latest
Expand Down
7 changes: 4 additions & 3 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn; // created_lsn
uint64_t sealed_lsn{INT64_MAX}; // sealed_lsn
uint64_t lsn; // created_lsn
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
uint64_t total_capacity_bytes;
std::optional< peer_id_t > current_leader{std::nullopt};
uint8_t meta[meta_length]{};
uint64_t sealed_lsn{INT64_MAX}; // appended after all v2 fields for forward compat

auto operator<=>(ShardInfo const& rhs) const { return id <=> rhs.id; }
auto operator==(ShardInfo const& rhs) const { return id == rhs.id; }
Expand All @@ -58,7 +58,8 @@ class ShardManager : public Manager< ShardError > {

virtual AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid = 0) const = 0;
virtual AsyncResult< InfoList > list_shards(pg_id_t id, trace_id_t tid = 0) const = 0;
virtual AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, std::string meta, trace_id_t tid = 0) = 0;
virtual AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, std::string meta,
trace_id_t tid = 0) = 0;
virtual AsyncResult< ShardInfo > seal_shard(shard_id_t id, trace_id_t tid = 0) = 0;
};

Expand Down
33 changes: 11 additions & 22 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ class HSHomeObject : public HomeObjectImpl {
std::vector< shard_id_t > shards_to_migrate_;

public:
// Old version shard_info_superblk (v0.01) - for backward compatibility testing and migration
// v1 ShardInfo did not have the meta field
struct v1_ShardInfo {
// Old version shard_info_superblk (v0.02) - for backward compatibility testing and migration
// v2 ShardInfo did not have the sealed_lsn
struct v2_ShardInfo {
shard_id_t id;
pg_id_t placement_group;
ShardInfo::State state;
Expand All @@ -118,7 +118,7 @@ class HSHomeObject : public HomeObjectImpl {
uint64_t available_capacity_bytes;
uint64_t total_capacity_bytes;
std::optional< peer_id_t > current_leader{std::nullopt};
// Note: meta field was added in v2
uint8_t meta[ShardInfo::meta_length]{};
};

#pragma pack(1)
Expand Down Expand Up @@ -207,20 +207,21 @@ class HSHomeObject : public HomeObjectImpl {
};

struct shard_info_superblk : DataHeader {
// This version is a common version of DataHeader, each derived struct can have its own version.
static constexpr uint8_t shard_sb_version = 0x02;
// v2: added meta, v3: added sealed_lsn & changed the order of info to be the end
static constexpr uint8_t shard_sb_version = 0x03;

uint8_t sb_version{shard_sb_version};
ShardInfo info;
homestore::chunk_num_t p_chunk_id{0};
homestore::chunk_num_t v_chunk_id{0};

ShardInfo info;
// backward compatibility
bool valid() const { return DataHeader::valid() && sb_version <= shard_sb_version; }
};

struct v1_shard_info_superblk : DataHeader {
v1_ShardInfo info;
// v2 superblk: sb_version=0x02, v2_ShardInfo (no sealed_lsn), chunk IDs after info
struct v2_shard_info_superblk : DataHeader {
uint8_t sb_version{0x02};
v2_ShardInfo info;
homestore::chunk_num_t p_chunk_id{0};
homestore::chunk_num_t v_chunk_id{0};
};
Expand Down Expand Up @@ -948,18 +949,6 @@ class HSHomeObject : public HomeObjectImpl {
*/
void on_replica_restart();

/**
* @brief Releases a chunk based on the information provided in a CREATE_SHARD message.
*
* This function is invoked during log rollback or when the proposer encounters an error.
* Its primary purpose is to ensure that the state of pg_chunks is reverted to the correct state.
*
* @param header The message header that includes the shard_info_superblk, which contains the data necessary for
* extracting and mapping the chunk ID.
* @return Returns true if the chunk was successfully released, false otherwise.
*/
bool release_chunk_based_on_create_shard_message(sisl::blob const& header);

/**
* @brief check whether the chunks in a given pg can be gc.
*
Expand Down
147 changes: 70 additions & 77 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,37 @@ SISL_LOGGING_DECL(shardmgr)
#define SLOGE(trace_id, shard_id, msg, ...) SLOG(ERROR, trace_id, shard_id, msg, ##__VA_ARGS__)
#define SLOGC(trace_id, shard_id, msg, ...) SLOG(CRITICAL, trace_id, shard_id, msg, ##__VA_ARGS__)

static ShardInfo shardinfo_from_v2(const HSHomeObject::v2_ShardInfo& v2) {
ShardInfo info;
info.id = v2.id;
info.placement_group = v2.placement_group;
info.state = v2.state;
info.lsn = v2.lsn;
info.sealed_lsn = INT64_MAX;
info.created_time = v2.created_time;
info.last_modified_time = v2.last_modified_time;
info.available_capacity_bytes = v2.available_capacity_bytes;
info.total_capacity_bytes = v2.total_capacity_bytes;
info.current_leader = v2.current_leader;
std::memcpy(info.meta, v2.meta, ShardInfo::meta_length);
return info;
}

// Core decode: given a typed shard_info_superblk pointer, extract ShardInfo + chunk IDs.
// Handles all supported on-disk versions. Returns nullopt for unknown versions.
static std::optional< std::tuple< ShardInfo, homestore::chunk_num_t, homestore::chunk_num_t > >
decode_shard_sb(const HSHomeObject::shard_info_superblk* sb) {
if (sb->sb_version >= HSHomeObject::shard_info_superblk::shard_sb_version) {
return std::make_tuple(sb->info, sb->p_chunk_id, sb->v_chunk_id);
} else if (sb->sb_version == 0x02) {
const auto* v2sb = r_cast< const HSHomeObject::v2_shard_info_superblk* >(sb);
return std::make_tuple(shardinfo_from_v2(v2sb->info), v2sb->p_chunk_id, v2sb->v_chunk_id);
} else {
LOGW("Unsupported shard_info_superblk sb_version={}", sb->sb_version);
return std::nullopt;
}
}

ShardError toShardError(ReplServiceError const& e) {
switch (e) {
case ReplServiceError::BAD_REQUEST:
Expand Down Expand Up @@ -108,7 +139,9 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_
shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >();
shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >());
shard_info.lsn = shard_json["shard_info"]["lsn"].get< uint64_t >();
shard_info.sealed_lsn = shard_json["shard_info"]["sealed_lsn"].get< uint64_t >();
shard_info.sealed_lsn = shard_json["shard_info"].contains("sealed_lsn")
? shard_json["shard_info"]["sealed_lsn"].get< uint64_t >()
: INT64_MAX;
shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >();
shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >();
shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >();
Expand Down Expand Up @@ -207,7 +240,6 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
.placement_group = pg_owner,
.state = ShardInfo::State::OPEN,
.lsn = 0,
.sealed_lsn = INT64_MAX,
.created_time = create_time,
.last_modified_time = create_time,
.available_capacity_bytes = size_bytes,
Expand Down Expand Up @@ -375,8 +407,10 @@ bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& he

switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;
auto decoded_shard_sb =
decode_shard_sb(r_cast< const shard_info_superblk* >(header.cbytes() + sizeof(ReplicationMessageHeader)));
RELEASE_ASSERT(decoded_shard_sb.has_value(), "failed to decode shard superblk in pre_commit SEAL_SHARD_MSG");
auto& [shard_info, p_chunk_id_unused, v_chunk_id_unused] = decoded_shard_sb.value();

{
std::scoped_lock lock_guard(_shard_lock);
Expand Down Expand Up @@ -425,8 +459,10 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head
break;
}
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;
auto decoded_shard_sb =
decode_shard_sb(r_cast< const shard_info_superblk* >(header.cbytes() + sizeof(ReplicationMessageHeader)));
RELEASE_ASSERT(decoded_shard_sb.has_value(), "failed to decode shard superblk in rollback SEAL_SHARD_MSG");
auto& [shard_info, p_chunk_id_unused, v_chunk_id_unused] = decoded_shard_sb.value();
{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_info.id);
Expand Down Expand Up @@ -511,9 +547,10 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, sha

switch (header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader));
auto shard_info = sb->info;
auto v_chunk_id = sb->v_chunk_id;
auto decoded_shard_sb =
decode_shard_sb(r_cast< const shard_info_superblk* >(h.cbytes() + sizeof(ReplicationMessageHeader)));
RELEASE_ASSERT(decoded_shard_sb.has_value(), "failed to decode shard superblk in commit CREATE_SHARD_MSG");
auto& [shard_info, p_chunk_id_unused, v_chunk_id] = decoded_shard_sb.value();
shard_info.lsn = lsn;

local_create_shard(shard_info, v_chunk_id, tid);
Expand All @@ -525,8 +562,10 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, sha
}

case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader));
auto shard_info = sb->info;
auto decoded_shard_sb =
decode_shard_sb(r_cast< const shard_info_superblk* >(h.cbytes() + sizeof(ReplicationMessageHeader)));
RELEASE_ASSERT(decoded_shard_sb.has_value(), "failed to decode shard superblk in commit SEAL_SHARD_MSG");
auto& [shard_info, p_chunk_id_unused, v_chunk_id_unused] = decoded_shard_sb.value();

ShardInfo::State state;
{
Expand Down Expand Up @@ -584,56 +623,39 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, sha
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) {
// First peek at the version
auto* header = reinterpret_cast< const DataHeader* >(buf.bytes());
RELEASE_ASSERT(header->version == DataHeader::data_header_version, "Unknown shard superblock DataHeader version {}",
header->version);

homestore::superblk< shard_info_superblk > sb(_shard_meta_name);

// Detect and migrate old version data (DataHeader version 0x01)
if (header->version == 0x01) {
// Read data from buffer with old v1 layout
auto* old_sb = reinterpret_cast< const v1_shard_info_superblk* >(buf.bytes());
auto* shard_sb = reinterpret_cast< const shard_info_superblk* >(buf.bytes());
if (shard_sb->sb_version == 0x02) {
auto decoded_shard_sb = decode_shard_sb(shard_sb);
RELEASE_ASSERT(decoded_shard_sb.has_value(), "failed to decode v2 shard superblk");
auto [v2_info, saved_p_chunk, saved_v_chunk] = decoded_shard_sb.value();

const auto v1_info = old_sb->info;
const auto saved_p_chunk_id = old_sb->p_chunk_id;
const auto saved_v_chunk_id = old_sb->v_chunk_id;
LOGI("Detected v2 shard superblk (shard={}, pg={}, p_chunk={}, v_chunk={}), migrating to v3", v2_info.id,
v2_info.placement_group, saved_p_chunk, saved_v_chunk);

LOGI("Detected v1 shard superblk (shard={}, pg={}, p_chunk={}, v_chunk={}), migrating to v2", v1_info.id,
v1_info.placement_group, saved_p_chunk_id, saved_v_chunk_id);

// Create a new v2 superblk with the given mblk
sb.load(buf, mblk);
sb.resize(sizeof(shard_info_superblk));
sb->magic = DataHeader::data_header_magic;
sb->version = DataHeader::data_header_version; // 0x02
sb->type = DataHeader::data_type_t::SHARD_INFO;
sb->sb_version = shard_info_superblk::shard_sb_version; // 0x02

// Migrate v1_ShardInfo to v2 ShardInfo (v2 added meta field)
sb->info.id = v1_info.id;
sb->info.placement_group = v1_info.placement_group;
sb->info.state = v1_info.state;
sb->info.lsn = v1_info.lsn;
sb->info.created_time = v1_info.created_time;
sb->info.last_modified_time = v1_info.last_modified_time;
sb->info.available_capacity_bytes = v1_info.available_capacity_bytes;
sb->info.total_capacity_bytes = v1_info.total_capacity_bytes;
sb->info.current_leader = v1_info.current_leader;
// meta field is new in v2, initialize to empty
std::memset(sb->info.meta, 0, ShardInfo::meta_length);

sb->p_chunk_id = saved_p_chunk_id;
sb->v_chunk_id = saved_v_chunk_id;
sb->sb_version = shard_info_superblk::shard_sb_version; // 0x03
sb->p_chunk_id = saved_p_chunk;
sb->v_chunk_id = saved_v_chunk;
sb->info = v2_info;

// Save shard_id and old mblk pointer for migration (cannot write or remove during callback due to
// metasvc lock held - would cause deadlock)
shards_to_migrate_.push_back(v1_info.id);

LOGI("Queued shard_id={} for migration write and old metablk removal after recovery", v1_info.id);
} else if (header->version == DataHeader::data_header_version) {
shards_to_migrate_.push_back(v2_info.id);
LOGI("Queued shard_id={} for v2->v3 migration write after recovery", v2_info.id);
} else if (shard_sb->sb_version == shard_info_superblk::shard_sb_version) {
sb.load(buf, mblk);
} else {
RELEASE_ASSERT(false, "Unknown shard superblock version {}", header->version);
RELEASE_ASSERT(false, "Unsupported shard superblock version {}", shard_sb->sb_version);
}

add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(sb)));
Expand Down Expand Up @@ -663,7 +685,7 @@ void HSHomeObject::write_migrated_shard_metablks() {
// Write all migrated shards to disk
// This is called AFTER read_sub_sb() returns to avoid deadlock with metasvc lock
if (!shards_to_migrate_.empty()) {
LOGI("Writing {} migrated shard v2 superblocks", shards_to_migrate_.size());
LOGI("Writing {} migrated shard v3 superblocks", shards_to_migrate_.size());

std::scoped_lock lock_guard(_shard_lock);
for (auto& shard_id : shards_to_migrate_) {
Expand All @@ -677,15 +699,15 @@ void HSHomeObject::write_migrated_shard_metablks() {

try {
hs_shard->sb_.write();
LOGI("Successfully wrote migrated v2 shard superblk for shard_id={}", shard_id);
LOGI("Successfully wrote migrated v3 shard superblk for shard_id={}", shard_id);
} catch (const std::exception& e) {
LOGE("Failed to migrate shard_id={}: {}", shard_id, e.what());
// Continue with other shards even if one fails
}
}

shards_to_migrate_.clear();
LOGI("Completed migrating all shard superblocks from v1 to v2");
LOGI("Completed migrating all shard superblocks from v2 to v3");
}
}

Expand Down Expand Up @@ -815,35 +837,6 @@ std::optional< homestore::chunk_num_t > HSHomeObject::get_shard_v_chunk_id(const
return std::make_optional< homestore::chunk_num_t >(hs_shard->sb_->v_chunk_id);
}

bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const& header) {
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGW("replication message header is corrupted with crc error");
return false;
}

switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
const pg_id_t pg_id = msg_header->pg_id;
if (!pg_exists(pg_id)) {
LOGW("shardID=0x{:x}, pg={}, shard=0x{:x}, Requesting a chunk for an unknown pg={}", msg_header->shard_id,
(msg_header->shard_id >> homeobject::shard_width), (msg_header->shard_id & homeobject::shard_mask),
pg_id);
return false;
}
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
bool res = chunk_selector_->release_chunk(sb->info.placement_group, sb->v_chunk_id);
if (!res) { LOGW("Failed to release chunk {} to pg={}", sb->v_chunk_id, sb->info.placement_group); }
return res;
}
default: {
LOGW("Unexpected message type encountered={}. This function should only be called with 'CREATE_SHARD_MSG'.",
msg_header->msg_type);
return false;
}
}
}

void HSHomeObject::destroy_shards(pg_id_t pg_id) {
auto lg = std::scoped_lock(_pg_lock, _shard_lock);
auto hs_pg = _get_hs_pg_unlocked(pg_id);
Expand Down
25 changes: 16 additions & 9 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob&
break;
}
case ReplicationMessageType::CREATE_SHARD_MSG: {
bool res = home_object_->release_chunk_based_on_create_shard_message(header);
if (!res) { LOGW("failed to release chunk based on create shard msg"); }
// Create shard is log only, no need to release chunk anymore, just return error to caller.
auto result_ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(ctx).get();
result_ctx->promise_.setValue(folly::makeUnexpected(toShardError(error)));
break;
Expand Down Expand Up @@ -213,13 +212,21 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG:
case ReplicationMessageType::SEAL_SHARD_MSG: {
// CREATE_SHARD and SEAL_SHARD are log-only messages (no data blocks), so get_blk_alloc_hints
// should never be called for them. If we reach here, something is wrong.
RELEASE_ASSERT(false,
"get_blk_alloc_hints called for log-only message type={}, shard={}, pg={} -- "
"this should never happen",
msg_header->msg_type, msg_header->shard_id, msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
if (data_size == 0) {
// New log-only format: should not reach here, but handle gracefully.
LOGW("get_blk_alloc_hints called for log-only shard message type={}, shard={}, pg={}", msg_header->msg_type,
msg_header->shard_id, msg_header->pg_id);
return homestore::blk_alloc_hints{};
} else {
// Old format: shard message carried shard header/footer data blocks. Return committed_blk_id
// so HomeStore skips block allocation and data write. Shard on_commit will ignore pbas.
homestore::blk_alloc_hints hints;
hints.committed_blk_id = homestore::MultiBlkId{homestore::BlkId{0, 1, 0xFFFF}};
LOGW("get_blk_alloc_hints called for old shard message type={}, shard={}, pg={}, return committed_blk hint "
"to skip blk allocation",
msg_header->msg_type, msg_header->shard_id, msg_header->pg_id);
return hints;
}
}

case ReplicationMessageType::PUT_BLOB_MSG:
Expand Down
Loading
Loading