Skip to content

Commit 83e4b52

Browse files
committed
chore(tiering): Optimize serialization
1 parent ff51f27 commit 83e4b52

15 files changed

+154
-124
lines changed

src/core/compact_object.cc

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
// #define XXH_INLINE_ALL
88
#include <xxhash.h>
99

10+
#include <array>
11+
1012
extern "C" {
1113
#include "redis/intset.h"
1214
#include "redis/listpack.h"
@@ -1609,19 +1611,22 @@ void CompactObj::EncodeString(string_view str, bool is_key) {
16091611
u_.r_obj.SetString(encoded, tl.local_mr);
16101612
}
16111613

1612-
StringOrView CompactObj::GetRawString() const {
1614+
std::array<std::string_view, 2> CompactObj::GetRawString() const {
16131615
DCHECK(!IsExternal());
16141616

16151617
if (taglen_ == ROBJ_TAG) {
16161618
CHECK_EQ(OBJ_STRING, u_.r_obj.type());
16171619
DCHECK_EQ(OBJ_ENCODING_RAW, u_.r_obj.encoding());
1618-
return StringOrView::FromView(u_.r_obj.AsView());
1620+
return {u_.r_obj.AsView(), {}};
16191621
}
16201622

16211623
if (taglen_ == SMALL_TAG) {
1622-
string tmp;
1623-
u_.small_str.Get(&tmp);
1624-
return StringOrView::FromString(std::move(tmp));
1624+
std::string_view arr[2];
1625+
u_.small_str.GetV(arr);
1626+
std::array<std::string_view, 2> out; // TODO: use c++ 20 to_array
1627+
out[0] = arr[0];
1628+
out[1] = arr[1];
1629+
return out;
16251630
}
16261631

16271632
LOG(FATAL) << "Unsupported tag for GetRawString(): " << int(taglen_);

src/core/compact_object.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,9 @@ class CompactObj {
429429
memory_resource()->deallocate(ptr, sizeof(T), alignof(T));
430430
}
431431

432-
// returns raw (non-decoded) string. Used to bypass decoding layer.
432+
// Return raw (non-decoded) string as two views. First is guaranteed to be non-empty.
433433
// Precondition: the object is a non-inline string.
434-
StringOrView GetRawString() const;
434+
std::array<std::string_view, 2> GetRawString() const;
435435

436436
StrEncoding GetStrEncoding() const {
437437
return StrEncoding{mask_bits_.encoding, bool(huffman_domain_)};

src/core/compact_object_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,8 @@ TEST_F(CompactObjectTest, StrEncodingAndMaterialize) {
613613
obj.SetString(test_str, false);
614614

615615
// Test StrEncoding helper
616-
string raw_str = obj.GetRawString().Take();
616+
auto strs = obj.GetRawString();
617+
string raw_str = string{strs[0]} + string{strs[1]};
617618
CompactObj::StrEncoding enc = obj.GetStrEncoding();
618619
EXPECT_EQ(test_str, enc.Decode(raw_str).Take());
619620

src/core/detail/listpack_wrap.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ ListpackWrap::Iterator ListpackWrap::end() const {
118118
return Iterator{lp_, nullptr, intbuf_};
119119
}
120120

121+
size_t ListpackWrap::Bytes() const {
122+
return lpBytes(lp_);
123+
}
124+
121125
std::string_view ListpackWrap::GetView(uint8_t* lp_it, uint8_t int_buf[]) {
122126
int64_t ele_len = 0;
123127
uint8_t* elem = lpGet(lp_it, &ele_len, int_buf);

src/core/detail/listpack_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ struct ListpackWrap {
5959
Iterator begin() const;
6060
Iterator end() const;
6161
size_t size() const; // number of entries
62+
size_t Bytes() const;
6263

6364
// Get view from raw listpack iterator
6465
static std::string_view GetView(uint8_t* lp_it, uint8_t int_buf[]);

src/server/tiered_storage.cc

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "absl/cleanup/cleanup.h"
1616
#include "absl/flags/internal/flag.h"
17+
#include "absl/functional/bind_front.h"
1718
#include "base/flag_utils.h"
1819
#include "base/flags.h"
1920
#include "base/logging.h"
@@ -82,23 +83,24 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
8283
}
8384

8485
// Determine required byte size and encoding type based on value.
85-
// TODO(vlad): Maybe split into different accessors?
8686
// Do NOT enforce rules depending on dynamic runtime values as this is called
8787
// when scheduling stash and just before succeeeding and is expected to return the same results
8888
pair<size_t /*size*/, CompactObj::ExternalRep> DetermineSerializationParams(const PrimeValue& pv) {
8989
switch (pv.ObjType()) {
90-
case OBJ_STRING:
90+
case OBJ_STRING: {
9191
if (pv.IsInline())
9292
return {};
93-
return std::make_pair(pv.GetRawString().view().size(), CompactObj::ExternalRep::STRING);
94-
case OBJ_HASH:
93+
auto strs = pv.GetRawString();
94+
return std::make_pair(strs[0].size() + strs[1].size(), CompactObj::ExternalRep::STRING);
95+
}
96+
case OBJ_HASH: {
9597
if (pv.Encoding() == kEncodingListPack) {
96-
auto* lp = static_cast<uint8_t*>(pv.RObjPtr());
97-
size_t bytes = 4 + lpBytes(lp); // encoded length and data bytes
98-
bytes += lpLength(lp) * 2 * 4; // 4 bytes for encoded key/value lengths
99-
return std::make_pair(bytes, CompactObj::ExternalRep::SERIALIZED_MAP);
98+
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
99+
return std::make_pair(tiering::SerializedMap::EstimateSize(lw.Bytes(), lw.size()),
100+
CompactObj::ExternalRep::SERIALIZED_MAP);
100101
}
101102
return {};
103+
}
102104
default:
103105
return {};
104106
};
@@ -108,18 +110,17 @@ size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableB
108110
DCHECK_LE(DetermineSerializationParams(pv).first, buffer.size());
109111
switch (rep) {
110112
case CompactObj::ExternalRep::STRING: {
111-
auto sv = pv.GetRawString();
112-
memcpy(buffer.data(), sv.view().data(), sv.view().size());
113-
return sv.view().size();
113+
auto strs = pv.GetRawString();
114+
memcpy(buffer.data(), strs[0].data(), strs[0].size());
115+
if (!strs[1].empty())
116+
memcpy(buffer.data() + strs[0].size(), strs[1].data(), strs[1].size());
117+
return strs[0].size() + strs[1].size();
114118
}
115119
case CompactObj::ExternalRep::SERIALIZED_MAP: {
116120
DCHECK_EQ(pv.Encoding(), kEncodingListPack);
117-
118-
// TODO(vlad): Optimize copy for serialization
119121
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
120-
vector<pair<string, string>> entries(lw.begin(), lw.end());
121122
return tiering::SerializedMap::Serialize(
122-
entries, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
123+
lw, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
123124
}
124125
};
125126
return 0;
@@ -447,11 +448,9 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
447448
if (!ShouldStash(*value))
448449
return {};
449450

450-
// This invariant should always hold because ShouldStash tests for IoPending flag.
451-
CHECK(!bins_->IsPending(dbid, key));
451+
CHECK(!bins_->IsPending(dbid, key)); // Because has stash pending is false (ShouldStash checks)
452452

453-
// TODO: When we are low on memory we should introduce a back-pressure, to avoid OOMs
454-
// with a lot of underutilized disk space.
453+
// Limit write depth. TODO: Provide backpressure?
455454
if (op_manager_->GetStats().pending_stash_cnt >= config_.write_depth_limit) {
456455
++stats_.stash_overflow_cnt;
457456
return {};
@@ -463,35 +462,22 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
463462
tiering::OpManager::EntryId id;
464463
error_code ec;
465464

466-
value->SetStashPending(true);
465+
value->SetStashPending(true); // Optimistically set ahead, unset in case of error
466+
467467
if (OccupiesWholePages(est_size)) { // large enough for own page
468468
id = KeyRef(dbid, key);
469-
if (auto prepared = op_manager_->PrepareStash(est_size); prepared) {
470-
auto [offset, buf] = *prepared;
471-
size_t written = Serialize(rep, *value, buf.bytes);
472-
tiering::DiskSegment segment{offset, written};
473-
op_manager_->Stash(id, segment, buf);
474-
} else {
475-
ec = prepared.error();
476-
}
469+
auto serialize = absl::bind_front(Serialize, rep, cref(*value));
470+
ec = op_manager_->PrepareAndStash(id, est_size, serialize);
477471
} else if (auto bin = bins_->Stash(dbid, key, SerializeToString(*value)); bin) {
478-
id = bin->first;
479-
// TODO(vlad): Write bin to prepared buffer instead of allocating one
480-
if (auto prepared = op_manager_->PrepareStash(bin->second.length()); prepared) {
481-
auto [offset, buf] = *prepared;
482-
memcpy(buf.bytes.data(), bin->second.data(), bin->second.size());
483-
tiering::DiskSegment segment{offset, bin->second.size()};
484-
op_manager_->Stash(id, segment, buf);
485-
} else {
486-
ec = prepared.error();
487-
bins_->ReportStashAborted(bin->first);
488-
}
472+
id = bin->id;
473+
auto serialize = absl::bind_front(&tiering::SmallBins::SerializeBin, bins_.get(), &*bin);
474+
ec = op_manager_->PrepareAndStash(id, 4_KB, serialize);
489475
} else {
490-
return {}; // silently added to bin
476+
return {}; // added to bin, no operations pending
491477
}
492478

479+
// Set stash pending to false on single value or whole bin
493480
if (ec) {
494-
value->SetStashPending(false);
495481
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
496482
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
497483
return {};

src/server/tiering/op_manager.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ void OpManager::Stash(EntryId id_ref, tiering::DiskSegment segment, util::fb2::U
9999
storage_.Stash(segment, buf, std::move(io_cb));
100100
}
101101

102+
std::error_code OpManager::PrepareAndStash(EntryId id, size_t length,
103+
const std::function<size_t(io::MutableBytes)>& writer) {
104+
auto buf = PrepareStash(length);
105+
if (!buf.has_value())
106+
return buf.error();
107+
108+
size_t written = writer(buf->second.bytes);
109+
Stash(id, {buf->first, written}, buf->second);
110+
}
111+
102112
OpManager::ReadOp& OpManager::PrepareRead(DiskSegment aligned_segment) {
103113
DCHECK_EQ(aligned_segment.offset % kPageSize, 0u);
104114
DCHECK_EQ(aligned_segment.length % kPageSize, 0u);

src/server/tiering/op_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ class OpManager {
6868
// Stash value to be offloaded. It is opaque to OpManager.
6969
void Stash(EntryId id, tiering::DiskSegment segment, util::fb2::UringBuf buf);
7070

71+
// PrepareStash + Stash via function
72+
std::error_code PrepareAndStash(
73+
EntryId id, size_t length,
74+
const std::function<size_t /*written*/ (io::MutableBytes)>& writer);
75+
7176
Stats GetStats() const;
7277

7378
protected:

src/server/tiering/op_manager_test.cc

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,10 @@ struct OpManagerTest : PoolTestBase, OpManager {
8383
}
8484

8585
std::error_code Stash(EntryId id, std::string_view value) {
86-
auto prepared = storage_.PrepareStash(value.size());
87-
if (!prepared.has_value())
88-
return prepared.error();
89-
90-
auto [offset, buf] = *prepared;
91-
memcpy(buf.bytes.data(), value.data(), value.size());
92-
DiskSegment segment{offset, value.size()};
93-
OpManager::Stash(id, segment, buf);
94-
return {};
86+
return PrepareAndStash(id, value.size(), [=](io::MutableBytes bytes) {
87+
memcpy(bytes.data(), value.data(), value.size());
88+
return value.size();
89+
});
9590
}
9691

9792
absl::flat_hash_map<EntryId, std::string> fetched_;

src/server/tiering/serialized_map.cc

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
#include <absl/base/internal/endian.h>
44

55
#include "base/logging.h"
6+
#include "core/detail/listpack_wrap.h"
67

78
namespace dfly::tiering {
89

10+
constexpr size_t kLenBytes = 4;
11+
912
SerializedMap::Iterator& SerializedMap::Iterator::operator++() {
10-
slice_.remove_prefix(8 + key_.size() + value_.size());
13+
slice_.remove_prefix(2 * kLenBytes + key_.size() + value_.size());
1114
Read();
1215
return *this;
1316
}
@@ -37,7 +40,7 @@ SerializedMap::Iterator SerializedMap::Find(std::string_view key) const {
3740
}
3841

3942
SerializedMap::Iterator SerializedMap::begin() const {
40-
return Iterator{slice_.substr(4)};
43+
return Iterator{slice_.substr(kLenBytes)};
4144
}
4245

4346
SerializedMap::Iterator SerializedMap::end() const {
@@ -52,22 +55,18 @@ size_t SerializedMap::DataBytes() const {
5255
return slice_.size() - 4 - size() * 2 * 4;
5356
}
5457

55-
constexpr size_t kLenBytes = 4;
56-
57-
size_t SerializedMap::SerializeSize(Input input) {
58-
size_t out = kLenBytes; // number of entries
59-
for (const auto& [key, value] : input)
60-
out += kLenBytes * 2 /* string lengts */ + key.size() + value.size();
61-
return out;
58+
size_t SerializedMap::EstimateSize(size_t data_bytes, size_t entries) {
59+
return kLenBytes /* entry number */ + data_bytes + entries * 2 * kLenBytes /* string lenghts */;
6260
}
6361

64-
size_t SerializedMap::Serialize(Input input, absl::Span<char> buffer) {
65-
DCHECK_GE(buffer.size(), SerializeSize(input));
62+
size_t SerializedMap::Serialize(const detail::ListpackWrap& lw, absl::Span<char> buffer) {
63+
DCHECK_GE(buffer.size(), EstimateSize(lw.Bytes(), lw.size()));
64+
6665
char* ptr = buffer.data();
67-
absl::little_endian::Store32(ptr, input.size());
66+
absl::little_endian::Store32(ptr, lw.size());
6867
ptr += kLenBytes;
6968

70-
for (const auto& [key, value] : input) {
69+
for (const auto& [key, value] : lw) {
7170
absl::little_endian::Store32(ptr, key.length());
7271
ptr += kLenBytes;
7372
absl::little_endian::Store32(ptr, value.length());

0 commit comments

Comments
 (0)