Skip to content

Commit 3facd07

Browse files
committed
more fixes, better test
1 parent 6126ae2 commit 3facd07

File tree

2 files changed

+44
-19
lines changed

2 files changed

+44
-19
lines changed

src/server/hset_family.cc

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,14 @@ struct HMapWrap {
138138
VisitMut(ov);
139139
}
140140

141+
void Launder(tiering::SerializedMapDecoder* dec) {
142+
Overloaded ov{
143+
[](StringMap* s) {},
144+
[&](detail::ListpackWrap& lw) { *dec->Write() = lw; },
145+
};
146+
VisitMut(ov);
147+
}
148+
141149
template <typename T> optional<T> Get() const {
142150
if (holds_alternative<T>(impl_))
143151
return get<T>(impl_);
@@ -244,9 +252,7 @@ template <typename F> auto ExecuteW(Transaction* tx, F&& f) {
244252
// Create wrapper from different types
245253
HMapWrap hw{*res.value()->Write()};
246254
fut.Resolve(f(hw));
247-
248-
// soak listpack wrapper back to get updated value
249-
*res.value()->Write() = *hw.Get<detail::ListpackWrap>();
255+
hw.Launder(*res);
250256
};
251257

252258
es->tiered_storage()->Read(op_args.db_cntx.db_index, key, pv, D{}, std::move(read_cb));
@@ -330,6 +336,10 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
330336

331337
auto& add_res = *op_res;
332338
PrimeValue& pv = add_res.it->second;
339+
340+
if (pv.IsExternal() && !pv.IsCool())
341+
return OpStatus::CANCELLED; // Not supported for offloaded values
342+
333343
if (add_res.is_new) {
334344
pv.InitRobj(OBJ_HASH, kEncodingListPack, lpNew(0));
335345
} else {
@@ -471,11 +481,12 @@ OpResult<CbVariant<uint32_t>> OpSet(const OpArgs& op_args, string_view key, CmdA
471481

472482
// If the value is external, enqueue read and modify it there
473483
if (pv.IsExternal() && !pv.IsCool()) {
474-
CHECK(op_sp.ttl == UINT32_MAX); // TODO: remove
484+
if (op_sp.ttl != UINT32_MAX)
485+
return OpStatus::CANCELLED; // Don't support expiry with offloaded hashes
486+
475487
using D = tiering::SerializedMapDecoder;
476488
util::fb2::Future<OpResult<uint32_t>> fut;
477489
auto read_cb = [fut, values, &op_sp](io::Result<D*> res) mutable {
478-
// Create wrapper from different types
479490
auto& lw = *res.value()->Write();
480491
uint32_t created = 0;
481492
for (size_t i = 0; i < values.size(); i += 2) {

src/server/tiered_storage_test.cc

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -493,8 +493,8 @@ TEST_F(PureDiskTSTest, Dump) {
493493
TEST_P(LatentCoolingTSTest, SimpleHash) {
494494
absl::FlagSaver saver;
495495
absl::SetFlag(&FLAGS_tiered_experimental_hash_support, true);
496-
// For now, never upload as its not implemented yet
497-
absl::SetFlag(&FLAGS_tiered_upload_threshold, 0.0);
496+
absl::SetFlag(&FLAGS_tiered_upload_threshold,
497+
0.0); // For now, never upload as its not implemented yet
498498
UpdateFromFlags();
499499

500500
const size_t kNUM = 100;
@@ -516,11 +516,9 @@ TEST_P(LatentCoolingTSTest, SimpleHash) {
516516
// Wait for all to be stashed or in end up in bins
517517
ExpectConditionWithinTimeout([=] {
518518
auto metrics = GetMetrics();
519-
VLOG(0) << metrics.tiered_stats.total_stashes << " "
520-
<< metrics.tiered_stats.small_bins_entries_cnt;
521-
return metrics.tiered_stats.total_stashes +
522-
metrics.tiered_stats.small_bins_filling_entries_cnt ==
523-
kNUM;
519+
size_t sum =
520+
metrics.tiered_stats.total_stashes + metrics.tiered_stats.small_bins_filling_entries_cnt;
521+
return sum == kNUM;
524522
});
525523

526524
// Verify correctness
@@ -533,20 +531,36 @@ TEST_P(LatentCoolingTSTest, SimpleHash) {
533531
EXPECT_EQ(resp, v);
534532
}
535533

536-
// Wait for all offloads again
537-
ExpectConditionWithinTimeout([=] {
534+
// Start offloading
535+
SetFlag(&FLAGS_tiered_offload_threshold, 1.0);
536+
UpdateFromFlags();
537+
auto wait_offloaded = [=] {
538538
auto metrics = GetMetrics();
539-
return metrics.db_stats[0].tiered_entries +
540-
metrics.tiered_stats.small_bins_filling_entries_cnt ==
541-
kNUM;
542-
});
539+
size_t sum =
540+
metrics.db_stats[0].tiered_entries + metrics.tiered_stats.small_bins_filling_entries_cnt;
541+
return sum == kNUM;
542+
};
543+
544+
// Wait for all offloads again
545+
ExpectConditionWithinTimeout(wait_offloaded);
543546

544547
// HDEL
545548
for (size_t i = 0; i < kNUM; i++) {
546549
string key = absl::StrCat("k", i);
547-
EXPECT_THAT(Run({"DEL", key, string{1, 'c'}}), IntArg(1));
550+
EXPECT_THAT(Run({"HDEL", key, string{1, 'c'}}), IntArg(1));
548551
EXPECT_THAT(Run({"HLEN", key}), IntArg(25));
549552
}
553+
554+
// Wait for all offloads again
555+
ExpectConditionWithinTimeout(wait_offloaded);
556+
557+
// HSET new field
558+
for (size_t i = 0; i < kNUM; i++) {
559+
string key = absl::StrCat("k", i);
560+
EXPECT_THAT(Run({"HSET", key, string{1, 'c'}, "Some new value"}), IntArg(1));
561+
EXPECT_THAT(Run({"HLEN", key}), IntArg(26));
562+
EXPECT_EQ(Run({"HGET", key, string{1, 'c'}}), "Some new value");
563+
}
550564
}
551565

552566
} // namespace dfly

0 commit comments

Comments
 (0)