From 8e41a4d3d46e14f1d388d959876b804b6ab6644c Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 5 Nov 2025 21:01:35 +0300 Subject: [PATCH 1/9] feat(tiering): Serialize hashes Signed-off-by: Vladislav Oleshko fixes Signed-off-by: Vladislav Oleshko fixes and small tests Signed-off-by: Vladislav Oleshko small fix extend test more fixes and uploading fix: use size estimation for page deduction rename fixes --- src/core/compact_object.cc | 3 +++ src/server/tiered_storage.cc | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index ad546f59233d..631258ec8edd 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -879,10 +879,13 @@ CompactObjType CompactObj::ObjType() const { return OBJ_STRING; if (taglen_ == EXTERNAL_TAG) { + VLOG(0) << "My type is external"; switch (static_cast(u_.ext_ptr.representation)) { case ExternalRep::STRING: + VLOG(0) << "My type is a string"; return OBJ_STRING; case ExternalRep::SERIALIZED_MAP: + VLOG(0) << "Mype is a hash map"; return OBJ_HASH; }; } diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index e24bc99eb1c4..ad2958b0eb56 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -223,7 +223,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { stats->tiered_used_bytes += segment.length; stats_.total_stashes++; +<<<<<<< HEAD CompactObj::ExternalRep rep = DetermineSerializationParams(*pv).second; +======= + CompactObj::ExternalRep rep = EstimateSerializedSize(*pv)->second; +>>>>>>> 77a24f54 (feat(tiering): Serialize hashes) if (ts_->config_.experimental_cooling) { RetireColdEntries(pv->MallocUsed()); ts_->CoolDown(key.first, key.second, segment, rep, pv); From 28e2128fbfd077e72b90b7e9fea98fc6efa4d716 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Sun, 16 Nov 2025 23:46:29 +0300 Subject: [PATCH 2/9] more than POc Signed-off-by: Vladislav Oleshko --- src/core/compact_object.cc | 3 - src/core/detail/listpack_wrap.cc | 8 +-- src/core/detail/listpack_wrap.h | 3 - src/server/hset_family.cc | 94 +++++++++++++++++++++++++------- src/server/tiered_storage.cc | 2 +- src/server/tiering/decoders.cc | 37 +++++++++++-- src/server/tiering/decoders.h | 12 +++- 7 files changed, 117 insertions(+), 42 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 631258ec8edd..ad546f59233d 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -879,13 +879,10 @@ CompactObjType CompactObj::ObjType() const { return OBJ_STRING; if (taglen_ == EXTERNAL_TAG) { - VLOG(0) << "My type is external"; switch (static_cast(u_.ext_ptr.representation)) { case ExternalRep::STRING: - VLOG(0) << "My type is a string"; return OBJ_STRING; case ExternalRep::SERIALIZED_MAP: - VLOG(0) << "Mype is a hash map"; return OBJ_HASH; }; } diff --git a/src/core/detail/listpack_wrap.cc b/src/core/detail/listpack_wrap.cc index 091a091a77cc..289b80f5acb6 100644 --- a/src/core/detail/listpack_wrap.cc +++ b/src/core/detail/listpack_wrap.cc @@ -33,8 +33,8 @@ void ListpackWrap::Iterator::Read() { next_ptr_ = lpNext(lp_, next_ptr_); } -ListpackWrap::~ListpackWrap() { - DCHECK(!dirty_); +ListpackWrap ListpackWrap::WithCapacity(size_t capacity) { + return ListpackWrap{lpNew(capacity)}; } ListpackWrap ListpackWrap::WithCapacity(size_t capacity) { @@ -42,7 +42,6 @@ ListpackWrap ListpackWrap::WithCapacity(size_t capacity) { } uint8_t* ListpackWrap::GetPointer() { - dirty_ = false; return lp_; } @@ -63,7 +62,6 @@ bool ListpackWrap::Delete(std::string_view key) { return false; lp_ = lpDeleteRangeWithEntry(lp_, &ptr, 2); - dirty_ = true; return true; } @@ -90,7 +88,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski lp_ = lpReplace(lp_, &vptr, vsrc, value.size()); DCHECK_EQ(0u, lpLength(lp_) % 2); - dirty_ = true; updated = true; } } @@ -100,7 +97,6 @@ bool ListpackWrap::Insert(std::string_view key, std::string_view value, bool ski // TODO: we should at least allocate once for both elements lp_ = lpAppend(lp_, fsrc, key.size()); lp_ = lpAppend(lp_, vsrc, value.size()); - dirty_ = true; } return !updated; diff --git a/src/core/detail/listpack_wrap.h b/src/core/detail/listpack_wrap.h index 06e035061209..c2474c6a344d 100644 --- a/src/core/detail/listpack_wrap.h +++ b/src/core/detail/listpack_wrap.h @@ -15,8 +15,6 @@ struct ListpackWrap { using IntBuf = uint8_t[2][24]; public: - ~ListpackWrap(); - struct Iterator { using iterator_category = std::forward_iterator_tag; using difference_type = std::ptrdiff_t; @@ -66,7 +64,6 @@ struct ListpackWrap { private: uint8_t* lp_; // the listpack itself mutable IntBuf intbuf_; // buffer for integers decoded to strings - bool dirty_ = false; // whether lp_ was updated, but never retrieved with GetPointer }; } // namespace dfly::detail diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index bc371c264002..4d13b898740d 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -76,6 +76,7 @@ struct HMapWrap { } public: + // Create from non-external prime value HMapWrap(const PrimeValue& pv, DbContext db_cntx) { DCHECK(!pv.IsExternal() || pv.IsCool()); if (pv.Encoding() == kEncodingListPack) @@ -84,6 +85,9 @@ struct HMapWrap { impl_ = GetStringMap(pv, db_cntx); } + explicit HMapWrap(detail::ListpackWrap lw) : impl_{std::move(lw)} { + } + explicit HMapWrap(tiering::SerializedMap* sm) : impl_{sm} { } @@ -194,7 +198,12 @@ OpResult ExecuteRO(Transaction* tx, F&& f) { using D = tiering::SerializedMapDecoder; util::fb2::Future> fut; auto read_cb = [fut, f = std::move(f)](io::Result res) mutable { - HMapWrap hw{res.value()->Get()}; + // Create wrapper from different types + Overloaded ov{ + [](tiering::SerializedMap* sm) { return HMapWrap{sm}; }, + [](detail::ListpackWrap* lw) { return HMapWrap{*lw}; }, + }; + auto hw = visit(ov, res.value()->Read()); fut.Resolve(f(hw)); }; @@ -217,15 +226,34 @@ OpResult ExecuteRO(Transaction* tx, F&& f) { } // Wrap write handler -template auto WrapW(F&& f) { - using RT = std::invoke_result_t; - return [f = std::forward(f)](Transaction* t, EngineShard* es) -> RT { +template auto ExecuteW(Transaction* tx, F&& f) { + using T = typename std::invoke_result_t::Type; + auto shard_cb = [f = std::forward(f)](Transaction* t, + EngineShard* es) -> OpResult> { + // Fetch value of hash type auto [key, op_args] = KeyAndArgs(t, es); auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_HASH); RETURN_ON_BAD_STATUS(it_res); auto& pv = it_res->it->second; + // Enqueue read for future values + if (pv.IsExternal() && !pv.IsCool()) { + using D = tiering::SerializedMapDecoder; + util::fb2::Future> fut; + auto read_cb = [fut, f = std::move(f)](io::Result res) mutable { + // Create wrapper from different types + HMapWrap hw{*res.value()->Write()}; + fut.Resolve(f(hw)); + + // soak listpack wrapper back to get updated value + *res.value()->Write() = *hw.Get(); + }; + + es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb)); + return CbVariant{std::move(fut)}; + } + // Remove document before modification op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv); @@ -241,8 +269,11 @@ template auto WrapW(F&& f) { else op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); - return res; + RETURN_ON_BAD_STATUS(res); + return CbVariant{std::move(res).value()}; }; + + return Unwrap(tx->ScheduleSingleHopT(std::move(shard_cb))); } size_t EstimateListpackMinBytes(CmdArgList members) { @@ -392,14 +423,20 @@ OpResult> OpHMGet(const HMapWrap& hw, CmdArgList fields) { DCHECK(!fields.empty()); std::vector result(fields.size()); - if (auto lw = hw.Get(); lw) { + if (auto sm = hw.Get(); sm) { + for (size_t i = 0; i < fields.size(); ++i) { + if (auto it = (*sm)->Find(fields[i]); it != (*sm)->end()) { + result[i].emplace(it->second, sdslen(it->second)); + } + } + } else { absl::flat_hash_map> reverse; reverse.reserve(fields.size() + 1); for (size_t i = 0; i < fields.size(); ++i) { reverse[ArgS(fields, i)].push_back(i); // map fields to their index. } - for (const auto [key, value] : *lw) { + for (const auto [key, value] : hw.Range()) { if (auto it = reverse.find(key); it != reverse.end()) { for (size_t index : it->second) { DCHECK_LT(index, result.size()); @@ -407,13 +444,6 @@ OpResult> OpHMGet(const HMapWrap& hw, CmdArgList fields) { } } } - } else { - StringMap* sm = *hw.Get(); - for (size_t i = 0; i < fields.size(); ++i) { - if (auto it = sm->Find(fields[i]); it != sm->end()) { - result[i].emplace(it->second, sdslen(it->second)); - } - } } return result; @@ -425,8 +455,8 @@ struct OpSetParams { bool keepttl = false; }; -OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList values, - const OpSetParams& op_sp = OpSetParams{}) { +OpResult> OpSet(const OpArgs& op_args, string_view key, CmdArgList values, + const OpSetParams& op_sp = OpSetParams{}) { DCHECK(!values.empty() && 0 == values.size() % 2); VLOG(2) << "OpSet(" << key << ")"; @@ -439,6 +469,26 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu auto& it = add_res.it; PrimeValue& pv = it->second; + // If the value is external, enqueue read and modify it there + if (pv.IsExternal() && !pv.IsCool()) { + CHECK(op_sp.ttl == UINT32_MAX); // TODO: remove + using D = tiering::SerializedMapDecoder; + util::fb2::Future> fut; + auto read_cb = [fut, values, &op_sp](io::Result res) mutable { + // Create wrapper from different types + auto& lw = *res.value()->Write(); + uint32_t created = 0; + for (size_t i = 0; i < values.size(); i += 2) { + created += lw.Insert(values[i], values[i + 1], op_sp.skip_if_exists); + } + fut.Resolve(created); + }; + + op_args.shard->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, + std::move(read_cb)); + return CbVariant{std::move(fut)}; + } + if (add_res.is_new) { if (op_sp.ttl == UINT32_MAX) { lp = lpNew(0); @@ -496,7 +546,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu if (auto* ts = op_args.shard->tiered_storage(); ts) ts->TryStash(op_args.db_cntx.db_index, key, &pv); - return created; + return CbVariant{created}; } void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkReplyBuilder* builder) { @@ -587,7 +637,8 @@ void HSetEx(CmdArgList args, CommandContext* cmd_cntx) { return OpSet(t->GetOpArgs(shard), key, fields, op_sp); }; - OpResult result = cmd_cntx->tx->ScheduleSingleHopT(std::move(cb)); + auto delayed_result = cmd_cntx->tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = Unwrap(std::move(delayed_result)); if (result) { rb->SendLong(*result); } else { @@ -616,7 +667,7 @@ void CmdHDel(CmdArgList args, CommandContext* cmd_cntx) { deleted += hw.Erase(s); return deleted; }; - HSetReplies{cmd_cntx->rb()}.Send(cmd_cntx->tx->ScheduleSingleHopT(WrapW(cb))); + HSetReplies{cmd_cntx->rb}.Send(ExecuteW(cmd_cntx->tx, std::move(cb))); } void CmdHExpire(CmdArgList args, CommandContext* cmd_cntx) { @@ -859,7 +910,8 @@ void CmdHSet(CmdArgList args, CommandContext* cmd_cntx) { return OpSet(t->GetOpArgs(shard), key, args); }; - OpResult result = cmd_cntx->tx->ScheduleSingleHopT(std::move(cb)); + auto delayed_result = cmd_cntx->tx->ScheduleSingleHopT(std::move(cb)); + OpResult result = Unwrap(std::move(delayed_result)); if (result && cmd == "HSET") { rb->SendLong(*result); @@ -874,7 +926,7 @@ void CmdHSetNx(CmdArgList args, CommandContext* cmd_cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpSet(t->GetOpArgs(shard), key, args.subspan(1), OpSetParams{.skip_if_exists = true}); }; - HSetReplies{cmd_cntx->rb()}.Send(cmd_cntx->tx->ScheduleSingleHopT(cb)); + HSetReplies{cmd_cntx->rb}.Send(Unwrap(cmd_cntx->tx->ScheduleSingleHopT(cb))); } void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index ad2958b0eb56..f2f4433d4456 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -196,7 +196,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Set value to be an in-memory type again. Update memory stats. void Upload(DbIndex dbid, string_view value, PrimeValue* pv) { DCHECK(!value.empty()); - switch (pv->GetExternalRep()) { case CompactObj::ExternalRep::STRING: pv->Materialize(value, true); @@ -322,6 +321,7 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegme auto key = get(id); auto* pv = Find(key); if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) { + VLOG(0) << "Touched? " << pv->WasTouched(); if (metrics.modified || pv->WasTouched()) { ++stats_.total_uploads; decoder->Upload(pv); diff --git a/src/server/tiering/decoders.cc b/src/server/tiering/decoders.cc index 7268cca207c5..0878437224a4 100644 --- a/src/server/tiering/decoders.cc +++ b/src/server/tiering/decoders.cc @@ -4,7 +4,11 @@ #include "server/tiering/decoders.h" +<<<<<<< HEAD #include "base/logging.h" +======= +#include "core/compact_object.h" +>>>>>>> fbcc8d59 (more than POc) #include "core/detail/listpack_wrap.h" #include "server/tiering/serialized_map.h" @@ -78,19 +82,40 @@ void SerializedMapDecoder::Initialize(std::string_view slice) { } Decoder::UploadMetrics SerializedMapDecoder::GetMetrics() const { - return UploadMetrics{.modified = false, + return UploadMetrics{.modified = modified_, .estimated_mem_usage = map_->DataBytes() + map_->size() * 2 * 8}; } void SerializedMapDecoder::Upload(CompactObj* obj) { + if (std::holds_alternative>(map_)) + MakeOwned(); + + obj->InitRobj(OBJ_HASH, kEncodingListPack, Write()->GetPointer()); +} + +std::variant SerializedMapDecoder::Read() const { + using RT = std::variant; + return std::visit([](auto& ptr) -> RT { return ptr.get(); }, map_); +} + +detail::ListpackWrap* SerializedMapDecoder::Write() { + if (std::holds_alternative>(map_)) + return std::get>(map_).get(); + + // Convert SerializedMap to listpack + MakeOwned(); + modified_ = true; + return Write(); +} + +void SerializedMapDecoder::MakeOwned() { + auto& map = std::get>(map_); + auto lw = detail::ListpackWrap::WithCapacity(GetMetrics().estimated_mem_usage); - for (const auto& [key, value] : *map_) + for (const auto& [key, value] : *map) lw.Insert(key, value, true); - obj->InitRobj(OBJ_HASH, kEncodingListPack, lw.GetPointer()); -} -SerializedMap* SerializedMapDecoder::Get() const { - return map_.get(); + map_ = std::make_unique(lw); } } // namespace dfly::tiering diff --git a/src/server/tiering/decoders.h b/src/server/tiering/decoders.h index 2adacaa2044e..2ff3d38b3007 100644 --- a/src/server/tiering/decoders.h +++ b/src/server/tiering/decoders.h @@ -11,6 +11,10 @@ #include "core/compact_object.h" +namespace dfly::detail { +struct ListpackWrap; +} + namespace dfly::tiering { struct SerializedMap; @@ -76,10 +80,14 @@ struct SerializedMapDecoder : public Decoder { UploadMetrics GetMetrics() const override; void Upload(CompactObj* obj) override; - SerializedMap* Get() const; + std::variant Read() const; + dfly::detail::ListpackWrap* Write(); private: - std::unique_ptr map_; + void MakeOwned(); // Convert to listpack + + bool modified_; + std::variant, std::unique_ptr> map_; }; } // namespace dfly::tiering From f750651df6a5106f681f53377245458a7d9c7b70 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Mon, 17 Nov 2025 17:49:04 +0300 Subject: [PATCH 3/9] fiixxes --- src/server/tiered_storage.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index f2f4433d4456..917082a2b6ba 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -321,7 +321,6 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, tiering::DiskSegme auto key = get(id); auto* pv = Find(key); if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) { - VLOG(0) << "Touched? " << pv->WasTouched(); if (metrics.modified || pv->WasTouched()) { ++stats_.total_uploads; decoder->Upload(pv); From 77cc3746736d19f2f1e5f196ad967ad5978a35f2 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Mon, 17 Nov 2025 18:03:14 +0300 Subject: [PATCH 4/9] more fixes --- src/server/hset_family.cc | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 4d13b898740d..3af23f8d011f 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -456,7 +456,8 @@ struct OpSetParams { }; OpResult> OpSet(const OpArgs& op_args, string_view key, CmdArgList values, - const OpSetParams& op_sp = OpSetParams{}) { + const OpSetParams& op_sp = OpSetParams{}, + optional>* bp_anker = nullptr) { DCHECK(!values.empty() && 0 == values.size() % 2); VLOG(2) << "OpSet(" << key << ")"; @@ -543,8 +544,11 @@ OpResult> OpSet(const OpArgs& op_args, string_view key, CmdA op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); - if (auto* ts = op_args.shard->tiered_storage(); ts) - ts->TryStash(op_args.db_cntx.db_index, key, &pv); + if (auto* ts = op_args.shard->tiered_storage(); ts) { + auto bp = ts->TryStash(op_args.db_cntx.db_index, key, &pv, true); + if (bp && bp_anker) + *bp_anker = std::move(*bp); + } return CbVariant{created}; } @@ -905,14 +909,19 @@ void CmdHSet(CmdArgList args, CommandContext* cmd_cntx) { return rb->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType); } + optional> tiered_backpressure; + args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(t->GetOpArgs(shard), key, args); + return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{}, &tiered_backpressure); }; auto delayed_result = cmd_cntx->tx->ScheduleSingleHopT(std::move(cb)); OpResult result = Unwrap(std::move(delayed_result)); + if (tiered_backpressure) + tiered_backpressure->GetFor(10ms); + if (result && cmd == "HSET") { rb->SendLong(*result); } else { From 8a00e9b680ec21eb76bbd0e405ad840c55b382f1 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Mon, 17 Nov 2025 18:25:06 +0300 Subject: [PATCH 5/9] cooling fixes --- src/server/tiered_storage.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 917082a2b6ba..3c5acd8f92a2 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -711,8 +711,13 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const { } void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, +<<<<<<< HEAD const tiering::DiskSegment& segment, CompactObj::ExternalRep rep, PrimeValue* pv) { +======= + const tiering::DiskSegment& segment, PrimeValue* pv, + CompactObj::ExternalRep rep) { +>>>>>>> 5b1ceb13 (cooling fixes) detail::TieredColdRecord* record = CompactObj::AllocateMR(); cool_queue_.push_front(*record); stats_.cool_memory_used += (sizeof(detail::TieredColdRecord) + pv->MallocUsed()); @@ -722,7 +727,11 @@ void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, record->page_index = segment.offset / tiering::kPageSize; record->value = std::move(*pv); +<<<<<<< HEAD pv->SetCool(segment.offset, segment.length, rep, record); +======= + pv->SetCool(segment.offset, segment.length, record, rep); +>>>>>>> 5b1ceb13 (cooling fixes) } PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) { From 44a933ba43b9e4260218a800a9e8381ec24aad6c Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 19 Nov 2025 10:43:57 +0300 Subject: [PATCH 6/9] tmp Signed-off-by: Vladislav Oleshko --- src/core/detail/listpack_wrap.cc | 8 ++++---- src/core/detail/listpack_wrap.h | 2 ++ src/server/tiered_storage.cc | 13 ------------- src/server/tiered_storage_test.cc | 17 +++++++++++++++++ src/server/tiering/decoders.cc | 13 +++++++------ 5 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/core/detail/listpack_wrap.cc b/src/core/detail/listpack_wrap.cc index 289b80f5acb6..7d86c8d73c1d 100644 --- a/src/core/detail/listpack_wrap.cc +++ b/src/core/detail/listpack_wrap.cc @@ -37,10 +37,6 @@ ListpackWrap ListpackWrap::WithCapacity(size_t capacity) { return ListpackWrap{lpNew(capacity)}; } -ListpackWrap ListpackWrap::WithCapacity(size_t capacity) { - return ListpackWrap{lpNew(capacity)}; -} - uint8_t* ListpackWrap::GetPointer() { return lp_; } @@ -106,6 +102,10 @@ size_t ListpackWrap::size() const { return lpLength(lp_) / 2; } +size_t ListpackWrap::DataBytes() const { + return lpBytes(lp_); +} + ListpackWrap::Iterator ListpackWrap::begin() const { return Iterator{lp_, lpFirst(lp_), intbuf_}; } diff --git a/src/core/detail/listpack_wrap.h b/src/core/detail/listpack_wrap.h index c2474c6a344d..80f5e9484b1b 100644 --- a/src/core/detail/listpack_wrap.h +++ b/src/core/detail/listpack_wrap.h @@ -58,6 +58,8 @@ struct ListpackWrap { Iterator end() const; size_t size() const; // number of entries + size_t DataBytes() const; + // Get view from raw listpack iterator static std::string_view GetView(uint8_t* lp_it, uint8_t int_buf[]); diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 3c5acd8f92a2..9fc8f4b53b55 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -222,11 +222,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { stats->tiered_used_bytes += segment.length; stats_.total_stashes++; -<<<<<<< HEAD CompactObj::ExternalRep rep = DetermineSerializationParams(*pv).second; -======= - CompactObj::ExternalRep rep = EstimateSerializedSize(*pv)->second; ->>>>>>> 77a24f54 (feat(tiering): Serialize hashes) if (ts_->config_.experimental_cooling) { RetireColdEntries(pv->MallocUsed()); ts_->CoolDown(key.first, key.second, segment, rep, pv); @@ -711,13 +707,8 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const { } void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, -<<<<<<< HEAD const tiering::DiskSegment& segment, CompactObj::ExternalRep rep, PrimeValue* pv) { -======= - const tiering::DiskSegment& segment, PrimeValue* pv, - CompactObj::ExternalRep rep) { ->>>>>>> 5b1ceb13 (cooling fixes) detail::TieredColdRecord* record = CompactObj::AllocateMR(); cool_queue_.push_front(*record); stats_.cool_memory_used += (sizeof(detail::TieredColdRecord) + pv->MallocUsed()); @@ -727,11 +718,7 @@ void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, record->page_index = segment.offset / tiering::kPageSize; record->value = std::move(*pv); -<<<<<<< HEAD pv->SetCool(segment.offset, segment.length, rep, record); -======= - pv->SetCool(segment.offset, segment.length, record, rep); ->>>>>>> 5b1ceb13 (cooling fixes) } PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) { diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 2498fbb7ffc7..e688b79db180 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -522,6 +522,8 @@ TEST_P(LatentCoolingTSTest, SimpleHash) { // Wait for all to be stashed or in end up in bins ExpectConditionWithinTimeout([=] { auto metrics = GetMetrics(); + VLOG(0) << metrics.tiered_stats.total_stashes << " " + << metrics.tiered_stats.small_bins_entries_cnt; return metrics.tiered_stats.total_stashes + metrics.tiered_stats.small_bins_filling_entries_cnt == kNUM; @@ -536,6 +538,21 @@ TEST_P(LatentCoolingTSTest, SimpleHash) { auto v = string{31, 'x'} + 'f'; EXPECT_EQ(resp, v); } + + // Wait for all offloads again + ExpectConditionWithinTimeout([=] { + auto metrics = GetMetrics(); + return metrics.db_stats[0].tiered_entries + + metrics.tiered_stats.small_bins_filling_entries_cnt == + kNUM; + }); + + // HDEL + for (size_t i = 0; i < kNUM; i++) { + string key = absl::StrCat("k", i); + EXPECT_THAT(Run({"DEL", key, string{1, 'c'}}), IntArg(1)); + EXPECT_THAT(Run({"HLEN", key}), IntArg(25)); + } } } // namespace dfly diff --git a/src/server/tiering/decoders.cc b/src/server/tiering/decoders.cc index 0878437224a4..920184f8f4f6 100644 --- a/src/server/tiering/decoders.cc +++ b/src/server/tiering/decoders.cc @@ -4,12 +4,9 @@ #include "server/tiering/decoders.h" -<<<<<<< HEAD -#include "base/logging.h" -======= #include "core/compact_object.h" ->>>>>>> fbcc8d59 (more than POc) #include "core/detail/listpack_wrap.h" +#include "core/overloaded.h" #include "server/tiering/serialized_map.h" extern "C" { @@ -82,8 +79,12 @@ void SerializedMapDecoder::Initialize(std::string_view slice) { } Decoder::UploadMetrics SerializedMapDecoder::GetMetrics() const { - return UploadMetrics{.modified = modified_, - .estimated_mem_usage = map_->DataBytes() + map_->size() * 2 * 8}; + Overloaded ov{ + [](const SerializedMap& sm) { return sm.DataBytes() + sm.size() * 8; }, + [](const detail::ListpackWrap& lw) { return lw.DataBytes(); }, + }; + size_t bytes = visit(Overloaded{ov, [&](const auto& ptr) { return ov(*ptr); }}, map_); + return UploadMetrics{.modified = modified_, .estimated_mem_usage = bytes}; } void SerializedMapDecoder::Upload(CompactObj* obj) { From 80e6f672003002e43c55c59d478f88980b79cc1f Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 4 Dec 2025 18:38:12 +0300 Subject: [PATCH 7/9] more fixes, better test --- src/server/hset_family.cc | 21 ++++++++++++---- src/server/tiered_storage_test.cc | 42 ++++++++++++++++++++----------- 2 files changed, 44 insertions(+), 19 deletions(-) diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 3af23f8d011f..2b4dc2e9882f 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -139,6 +139,14 @@ struct HMapWrap { VisitMut(ov); } + void Launder(tiering::SerializedMapDecoder* dec) { + Overloaded ov{ + [](StringMap* s) {}, + [&](detail::ListpackWrap& lw) { *dec->Write() = lw; }, + }; + VisitMut(ov); + } + template optional Get() const { if (holds_alternative(impl_)) return get(impl_); @@ -245,9 +253,7 @@ template auto ExecuteW(Transaction* tx, F&& f) { // Create wrapper from different types HMapWrap hw{*res.value()->Write()}; fut.Resolve(f(hw)); - - // soak listpack wrapper back to get updated value - *res.value()->Write() = *hw.Get(); + hw.Launder(*res); }; es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb)); @@ -331,6 +337,10 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc auto& add_res = *op_res; PrimeValue& pv = add_res.it->second; + + if (pv.IsExternal() && !pv.IsCool()) + return OpStatus::CANCELLED; // Not supported for offloaded values + if (add_res.is_new) { pv.InitRobj(OBJ_HASH, kEncodingListPack, lpNew(0)); } else { @@ -472,11 +482,12 @@ OpResult> OpSet(const OpArgs& op_args, string_view key, CmdA // If the value is external, enqueue read and modify it there if (pv.IsExternal() && !pv.IsCool()) { - CHECK(op_sp.ttl == UINT32_MAX); // TODO: remove + if (op_sp.ttl != UINT32_MAX) + return OpStatus::CANCELLED; // Don't support expiry with offloaded hashes + using D = tiering::SerializedMapDecoder; util::fb2::Future> fut; auto read_cb = [fut, values, &op_sp](io::Result res) mutable { - // Create wrapper from different types auto& lw = *res.value()->Write(); uint32_t created = 0; for (size_t i = 0; i < values.size(); i += 2) { diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index e688b79db180..95b14b57089f 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -499,8 +499,8 @@ TEST_F(PureDiskTSTest, Dump) { TEST_P(LatentCoolingTSTest, SimpleHash) { absl::FlagSaver saver; absl::SetFlag(&FLAGS_tiered_experimental_hash_support, true); - // For now, never upload as its not implemented yet - absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0); + absl::SetFlag(&FLAGS_tiered_upload_threshold, + 0.0); // For now, never upload as its not implemented yet UpdateFromFlags(); const size_t kNUM = 100; @@ -522,11 +522,9 @@ TEST_P(LatentCoolingTSTest, SimpleHash) { // Wait for all to be stashed or in end up in bins ExpectConditionWithinTimeout([=] { auto metrics = GetMetrics(); - VLOG(0) << metrics.tiered_stats.total_stashes << " " - << metrics.tiered_stats.small_bins_entries_cnt; - return metrics.tiered_stats.total_stashes + - metrics.tiered_stats.small_bins_filling_entries_cnt == - kNUM; + size_t sum = + metrics.tiered_stats.total_stashes + metrics.tiered_stats.small_bins_filling_entries_cnt; + return sum == kNUM; }); // Verify correctness @@ -539,20 +537,36 @@ TEST_P(LatentCoolingTSTest, SimpleHash) { EXPECT_EQ(resp, v); } - // Wait for all offloads again - ExpectConditionWithinTimeout([=] { + // Start offloading + SetFlag(&FLAGS_tiered_offload_threshold, 1.0); + UpdateFromFlags(); + auto wait_offloaded = [=] { auto metrics = GetMetrics(); - return metrics.db_stats[0].tiered_entries + - metrics.tiered_stats.small_bins_filling_entries_cnt == - kNUM; - }); + size_t sum = + metrics.db_stats[0].tiered_entries + metrics.tiered_stats.small_bins_filling_entries_cnt; + return sum == kNUM; + }; + + // Wait for all offloads again + ExpectConditionWithinTimeout(wait_offloaded); // HDEL for (size_t i = 0; i < kNUM; i++) { string key = absl::StrCat("k", i); - EXPECT_THAT(Run({"DEL", key, string{1, 'c'}}), IntArg(1)); + EXPECT_THAT(Run({"HDEL", key, string{1, 'c'}}), IntArg(1)); EXPECT_THAT(Run({"HLEN", key}), IntArg(25)); } + + // Wait for all offloads again + ExpectConditionWithinTimeout(wait_offloaded); + + // HSET new field + for (size_t i = 0; i < kNUM; i++) { + string key = absl::StrCat("k", i); + EXPECT_THAT(Run({"HSET", key, string{1, 'c'}, "Some new value"}), IntArg(1)); + EXPECT_THAT(Run({"HLEN", key}), IntArg(26)); + EXPECT_EQ(Run({"HGET", key, string{1, 'c'}}), "Some new value"); + } } } // namespace dfly From fd1b9ec798c224be91f3321f19fdaeda0b3329ad Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Sun, 7 Dec 2025 13:19:36 +0300 Subject: [PATCH 8/9] fixes --- src/server/hset_family.cc | 14 ++++++++------ src/server/tiered_storage_test.cc | 3 +-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 2b4dc2e9882f..800fb6b2a022 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -463,11 +463,12 @@ struct OpSetParams { bool skip_if_exists = false; uint32_t ttl = UINT32_MAX; bool keepttl = false; + + optional>* backpressure = nullptr; }; OpResult> OpSet(const OpArgs& op_args, string_view key, CmdArgList values, - const OpSetParams& op_sp = OpSetParams{}, - optional>* bp_anker = nullptr) { + const OpSetParams& op_sp = OpSetParams{}) { DCHECK(!values.empty() && 0 == values.size() % 2); VLOG(2) << "OpSet(" << key << ")"; @@ -487,7 +488,7 @@ OpResult> OpSet(const OpArgs& op_args, string_view key, CmdA using D = tiering::SerializedMapDecoder; util::fb2::Future> fut; - auto read_cb = [fut, values, &op_sp](io::Result res) mutable { + auto read_cb = [fut, values, op_sp](io::Result res) mutable { auto& lw = *res.value()->Write(); uint32_t created = 0; for (size_t i = 0; i < values.size(); i += 2) { @@ -557,8 +558,8 @@ OpResult> OpSet(const OpArgs& op_args, string_view key, CmdA if (auto* ts = op_args.shard->tiered_storage(); ts) { auto bp = ts->TryStash(op_args.db_cntx.db_index, key, &pv, true); - if (bp && bp_anker) - *bp_anker = std::move(*bp); + if (bp && op_sp.backpressure) + *op_sp.backpressure = std::move(*bp); } return CbVariant{created}; @@ -921,10 +922,11 @@ void CmdHSet(CmdArgList args, CommandContext* cmd_cntx) { } optional> tiered_backpressure; + OpSetParams params{.backpressure = &tiered_backpressure}; args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{}, &tiered_backpressure); + return OpSet(t->GetOpArgs(shard), key, args, params); }; auto delayed_result = cmd_cntx->tx->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 95b14b57089f..136541d74973 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -499,8 +499,7 @@ TEST_F(PureDiskTSTest, Dump) { TEST_P(LatentCoolingTSTest, SimpleHash) { absl::FlagSaver saver; absl::SetFlag(&FLAGS_tiered_experimental_hash_support, true); - absl::SetFlag(&FLAGS_tiered_upload_threshold, - 0.0); // For now, never upload as its not implemented yet + absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0); // never upload UpdateFromFlags(); const size_t kNUM = 100; From f2cdc9f820bbb6140db127f1a600d5ca136d3180 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Tue, 16 Dec 2025 18:43:57 +0300 Subject: [PATCH 9/9] fixes --- src/server/cluster/cluster_family.cc | 2 +- src/server/hset_family.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index ebb40291cea8..c11a8dfd3403 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -81,7 +81,7 @@ ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(serve } void ClusterFamily::Shutdown() { - Coordinator::Current().Shutdown(); + // Coordinator::Current().Shutdown(); shard_set->pool()->at(0)->Await([this]() ABSL_LOCKS_EXCLUDED(set_config_mu) { PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock { diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 800fb6b2a022..6a78959e3841 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -683,7 +683,7 @@ void CmdHDel(CmdArgList args, CommandContext* cmd_cntx) { deleted += hw.Erase(s); return deleted; }; - HSetReplies{cmd_cntx->rb}.Send(ExecuteW(cmd_cntx->tx, std::move(cb))); + HSetReplies{cmd_cntx->rb()}.Send(ExecuteW(cmd_cntx->tx, std::move(cb))); } void CmdHExpire(CmdArgList args, CommandContext* cmd_cntx) { @@ -948,7 +948,7 @@ void CmdHSetNx(CmdArgList args, CommandContext* cmd_cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpSet(t->GetOpArgs(shard), key, args.subspan(1), OpSetParams{.skip_if_exists = true}); }; - HSetReplies{cmd_cntx->rb}.Send(Unwrap(cmd_cntx->tx->ScheduleSingleHopT(cb))); + HSetReplies{cmd_cntx->rb()}.Send(Unwrap(cmd_cntx->tx->ScheduleSingleHopT(cb))); } void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) {