1414
1515#include " absl/cleanup/cleanup.h"
1616#include " absl/flags/internal/flag.h"
17+ #include " absl/functional/bind_front.h"
1718#include " base/flag_utils.h"
1819#include " base/flags.h"
1920#include " base/logging.h"
@@ -82,23 +83,24 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
8283}
8384
8485// Determine required byte size and encoding type based on value.
85- // TODO(vlad): Maybe split into different accessors?
8686// Do NOT enforce rules depending on dynamic runtime values as this is called
8787// when scheduling stash and just before succeeeding and is expected to return the same results
8888pair<size_t /* size*/ , CompactObj::ExternalRep> DetermineSerializationParams (const PrimeValue& pv) {
8989 switch (pv.ObjType ()) {
90- case OBJ_STRING:
90+ case OBJ_STRING: {
9191 if (pv.IsInline ())
9292 return {};
93- return std::make_pair (pv.GetRawString ().view ().size (), CompactObj::ExternalRep::STRING);
94- case OBJ_HASH:
93+ auto strs = pv.GetRawString ();
94+ return std::make_pair (strs[0 ].size () + strs[1 ].size (), CompactObj::ExternalRep::STRING);
95+ }
96+ case OBJ_HASH: {
9597 if (pv.Encoding () == kEncodingListPack ) {
96- auto * lp = static_cast <uint8_t *>(pv.RObjPtr ());
97- size_t bytes = 4 + lpBytes (lp); // encoded length and data bytes
98- bytes += lpLength (lp) * 2 * 4 ; // 4 bytes for encoded key/value lengths
99- return std::make_pair (bytes, CompactObj::ExternalRep::SERIALIZED_MAP);
98+ detail::ListpackWrap lw{static_cast <uint8_t *>(pv.RObjPtr ())};
99+ return std::make_pair (tiering::SerializedMap::EstimateSize (lw.UsedBytes (), lw.size ()),
100+ CompactObj::ExternalRep::SERIALIZED_MAP);
100101 }
101102 return {};
103+ }
102104 default :
103105 return {};
104106 };
@@ -108,18 +110,17 @@ size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableB
108110 DCHECK_LE (DetermineSerializationParams (pv).first , buffer.size ());
109111 switch (rep) {
110112 case CompactObj::ExternalRep::STRING: {
111- auto sv = pv.GetRawString ();
112- memcpy (buffer.data (), sv.view ().data (), sv.view ().size ());
113- return sv.view ().size ();
113+ auto strs = pv.GetRawString ();
114+ memcpy (buffer.data (), strs[0 ].data (), strs[0 ].size ());
115+ if (!strs[1 ].empty ())
116+ memcpy (buffer.data () + strs[0 ].size (), strs[1 ].data (), strs[1 ].size ());
117+ return strs[0 ].size () + strs[1 ].size ();
114118 }
115119 case CompactObj::ExternalRep::SERIALIZED_MAP: {
116120 DCHECK_EQ (pv.Encoding (), kEncodingListPack );
117-
118- // TODO(vlad): Optimize copy for serialization
119121 detail::ListpackWrap lw{static_cast <uint8_t *>(pv.RObjPtr ())};
120- vector<pair<string, string>> entries (lw.begin (), lw.end ());
121122 return tiering::SerializedMap::Serialize (
122- entries , {reinterpret_cast <char *>(buffer.data ()), buffer.length ()});
123+ lw , {reinterpret_cast <char *>(buffer.data ()), buffer.length ()});
123124 }
124125 };
125126 return 0 ;
@@ -447,11 +448,9 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
447448 if (!ShouldStash (*value))
448449 return {};
449450
450- // This invariant should always hold because ShouldStash tests for IoPending flag.
451- CHECK (!bins_->IsPending (dbid, key));
451+ CHECK (!bins_->IsPending (dbid, key)); // Because has stash pending is false (ShouldStash checks)
452452
453- // TODO: When we are low on memory we should introduce a back-pressure, to avoid OOMs
454- // with a lot of underutilized disk space.
453+ // Limit write depth. TODO: Provide backpressure?
455454 if (op_manager_->GetStats ().pending_stash_cnt >= config_.write_depth_limit ) {
456455 ++stats_.stash_overflow_cnt ;
457456 return {};
@@ -463,35 +462,22 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
463462 tiering::OpManager::EntryId id;
464463 error_code ec;
465464
466- value->SetStashPending (true );
465+ value->SetStashPending (true ); // Optimistically set ahead, unset in case of error
466+
467467 if (OccupiesWholePages (est_size)) { // large enough for own page
468468 id = KeyRef (dbid, key);
469- if (auto prepared = op_manager_->PrepareStash (est_size); prepared) {
470- auto [offset, buf] = *prepared;
471- size_t written = Serialize (rep, *value, buf.bytes );
472- tiering::DiskSegment segment{offset, written};
473- op_manager_->Stash (id, segment, buf);
474- } else {
475- ec = prepared.error ();
476- }
469+ auto serialize = absl::bind_front (Serialize, rep, cref (*value));
470+ ec = op_manager_->PrepareAndStash (id, est_size, serialize);
477471 } else if (auto bin = bins_->Stash (dbid, key, SerializeToString (*value)); bin) {
478- id = bin->first ;
479- // TODO(vlad): Write bin to prepared buffer instead of allocating one
480- if (auto prepared = op_manager_->PrepareStash (bin->second .length ()); prepared) {
481- auto [offset, buf] = *prepared;
482- memcpy (buf.bytes .data (), bin->second .data (), bin->second .size ());
483- tiering::DiskSegment segment{offset, bin->second .size ()};
484- op_manager_->Stash (id, segment, buf);
485- } else {
486- ec = prepared.error ();
487- bins_->ReportStashAborted (bin->first );
488- }
472+ id = bin->id ;
473+ auto serialize = absl::bind_front (&tiering::SmallBins::SerializeBin, bins_.get (), &*bin);
474+ ec = op_manager_->PrepareAndStash (id, 4_KB, serialize);
489475 } else {
490- return {}; // silently added to bin
476+ return {}; // added to bin, no operations pending
491477 }
492478
479+ // Set stash pending to false on single value or whole bin
493480 if (ec) {
494- value->SetStashPending (false );
495481 LOG_IF (ERROR, ec != errc::file_too_large) << " Stash failed immediately" << ec.message ();
496482 visit ([this ](auto id) { op_manager_->ClearIoPending (id); }, id);
497483 return {};
0 commit comments