Skip to content

Commit a0e5c83

Browse files
authored
chore: hold memcache value in BackedArguments (#6193)
STORE-like memached commands have exactly two arguments: a key and a value. This PR aggregates both under the same BackedArguments object and simplifies the logic around MemcacheParser::Command. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
1 parent ec2d44f commit a0e5c83

File tree

8 files changed

+193
-96
lines changed

8 files changed

+193
-96
lines changed

src/common/backed_args.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ class BackedArguments {
133133
storage_.clear();
134134
}
135135

136+
// Reserves space for additional argument of given length at the end.
137+
void PushArg(size_t len) {
138+
size_t old_size = storage_.size();
139+
offsets_.push_back(old_size);
140+
storage_.resize(old_size + len + 1);
141+
}
142+
136143
protected:
137144
absl::InlinedVector<uint32_t, kLenCap> offsets_;
138145
StorageType storage_;

src/facade/dragonfly_connection.cc

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -381,11 +381,6 @@ class PipelineCacheSizeTracker {
381381

382382
thread_local PipelineCacheSizeTracker tl_pipe_cache_sz_tracker;
383383

384-
Connection::MCPipelineMessage::MCPipelineMessage(MemcacheParser::Command&& cmd_in,
385-
std::string_view value_in)
386-
: cmd{std::move(cmd_in)}, value{value_in} {
387-
}
388-
389384
size_t Connection::MessageHandle::UsedMemory() const {
390385
struct MessageSize {
391386
size_t operator()(const PubMessagePtr& msg) {
@@ -415,7 +410,7 @@ size_t Connection::MessageHandle::UsedMemory() const {
415410
return 0;
416411
}
417412
size_t operator()(const MCPipelineMessagePtr& msg) {
418-
return sizeof(MCPipelineMessage) + msg->cmd.HeapMemory() + msg->value.capacity();
413+
return sizeof(MCPipelineMessage) + msg->HeapMemory();
419414
}
420415
};
421416

@@ -426,7 +421,7 @@ bool Connection::MessageHandle::IsReplying() const {
426421
return IsPubMsg() || holds_alternative<MonitorMessage>(handle) ||
427422
holds_alternative<PipelineMessagePtr>(handle) ||
428423
(holds_alternative<MCPipelineMessagePtr>(handle) &&
429-
!get<MCPipelineMessagePtr>(handle)->cmd.no_reply);
424+
!get<MCPipelineMessagePtr>(handle)->no_reply);
430425
}
431426

432427
struct Connection::AsyncOperations {
@@ -516,9 +511,8 @@ void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
516511

517512
void Connection::AsyncOperations::operator()(const MCPipelineMessage& msg) {
518513
++self->local_stats_.cmds;
519-
self->service_->DispatchMC(msg.cmd, msg.value,
520-
static_cast<MCReplyBuilder*>(self->reply_builder_.get()),
521-
self->cc_.get());
514+
self->service_->DispatchMC(
515+
msg, msg.value(), static_cast<MCReplyBuilder*>(self->reply_builder_.get()), self->cc_.get());
522516
self->last_interaction_ = time(nullptr);
523517
}
524518

@@ -1238,17 +1232,17 @@ auto Connection::ParseMemcache() -> ParserStatus {
12381232
uint32_t consumed = 0;
12391233
MemcacheParser::Result result = MemcacheParser::OK;
12401234

1241-
MemcacheParser::Command cmd;
1242-
string_view value;
1243-
1244-
auto dispatch_sync = [this, &cmd, &value] {
1245-
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(reply_builder_.get()), cc_.get());
1235+
auto dispatch_sync = [this] {
1236+
service_->DispatchMC(mc_cmd_, mc_cmd_.value(),
1237+
static_cast<MCReplyBuilder*>(reply_builder_.get()), cc_.get());
12461238
};
12471239

1248-
auto dispatch_async = [&cmd, &value]() -> MessageHandle {
1249-
return {make_unique<MCPipelineMessage>(std::move(cmd), value)};
1240+
auto dispatch_async = [this]() -> MessageHandle {
1241+
return {make_unique<MCPipelineMessage>(std::move(mc_cmd_))};
12501242
};
12511243

1244+
DCHECK(io_buf_.InputLen() > 0);
1245+
12521246
MCReplyBuilder* builder = static_cast<MCReplyBuilder*>(reply_builder_.get());
12531247

12541248
do {
@@ -1258,34 +1252,29 @@ auto Connection::ParseMemcache() -> ParserStatus {
12581252
return OK;
12591253
}
12601254

1261-
result = memcache_parser_->Parse(str, &consumed, &cmd);
1255+
result = memcache_parser_->Parse(str, &consumed, &mc_cmd_);
1256+
1257+
io_buf_.ConsumeInput(consumed);
12621258

1263-
DVLOG(2) << "mc_result " << result << " consumed: " << consumed << " type " << cmd.type;
1264-
if (result != MemcacheParser::OK) {
1265-
io_buf_.ConsumeInput(consumed);
1259+
DVLOG(2) << "mc_result " << result << " consumed: " << consumed << " type " << mc_cmd_.type;
1260+
if (result == MemcacheParser::INPUT_PENDING) {
12661261
break;
12671262
}
12681263

1269-
size_t total_len = consumed;
1270-
if (MemcacheParser::IsStoreCmd(cmd.type)) {
1271-
total_len += cmd.bytes_len + 2;
1272-
if (io_buf_.InputLen() >= total_len) {
1273-
string_view parsed_value = str.substr(consumed, cmd.bytes_len + 2);
1274-
if (parsed_value[cmd.bytes_len] != '\r' && parsed_value[cmd.bytes_len + 1] != '\n') {
1275-
builder->SendClientError("bad data chunk");
1276-
// We consume the whole buffer because we don't really know where it ends
1277-
// since the value length exceeds the cmd.bytes_len.
1278-
io_buf_.ConsumeInput(io_buf_.InputLen());
1279-
return OK;
1280-
}
1281-
1282-
value = parsed_value.substr(0, cmd.bytes_len);
1283-
} else {
1284-
return NEED_MORE;
1264+
memcache_parser_->Reset();
1265+
if (result == MemcacheParser::OK) {
1266+
DispatchSingle(io_buf_.InputLen() > 0, dispatch_sync, dispatch_async);
1267+
} else {
1268+
if (result == MemcacheParser::UNKNOWN_CMD) {
1269+
builder->SendSimpleString("ERROR");
1270+
} else if (result == MemcacheParser::PARSE_ERROR) {
1271+
builder->SendClientError("bad data chunk");
1272+
} else if (result == MemcacheParser::BAD_DELTA) {
1273+
builder->SendClientError("invalid numeric delta argument");
1274+
} else if (result != MemcacheParser::OK) {
1275+
builder->SendClientError("bad command line format");
12851276
}
12861277
}
1287-
DispatchSingle(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
1288-
io_buf_.ConsumeInput(total_len);
12891278
} while (!builder->GetError());
12901279

12911280
parser_error_ = result;
@@ -1294,14 +1283,6 @@ auto Connection::ParseMemcache() -> ParserStatus {
12941283
return NEED_MORE;
12951284
}
12961285

1297-
if (result == MemcacheParser::PARSE_ERROR || result == MemcacheParser::UNKNOWN_CMD) {
1298-
builder->SendSimpleString("ERROR");
1299-
} else if (result == MemcacheParser::BAD_DELTA) {
1300-
builder->SendClientError("invalid numeric delta argument");
1301-
} else if (result != MemcacheParser::OK) {
1302-
builder->SendClientError("bad command line format");
1303-
}
1304-
13051286
return OK;
13061287
}
13071288

src/facade/dragonfly_connection.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,7 @@ class Connection : public util::Connection {
8484
using PipelineMessage = cmn::BackedArguments;
8585

8686
// Pipeline message, accumulated Memcached command to be executed.
87-
struct MCPipelineMessage {
88-
MCPipelineMessage(MemcacheParser::Command&& cmd, std::string_view value);
89-
90-
MemcacheParser::Command cmd;
91-
std::string value;
92-
};
87+
using MCPipelineMessage = MemcacheParser::Command;
9388

9489
// Monitor message, carries a simple payload with the registered event to be sent.
9590
struct MonitorMessage : public std::string {};
@@ -414,6 +409,7 @@ class Connection : public util::Connection {
414409
io::IoBuf io_buf_; // used in io loop and parsers
415410
std::unique_ptr<RedisParser> redis_parser_;
416411
std::unique_ptr<MemcacheParser> memcache_parser_;
412+
MemcacheParser::Command mc_cmd_;
417413

418414
uint32_t id_;
419415
Protocol protocol_;

src/facade/memcache_parser.cc

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ MP::Result ParseStore(ArgSlice tokens, MP::Command* res) {
7171
}
7272

7373
// tokens[0] is key
74+
uint32_t bytes_len = 0;
7475
uint32_t flags;
7576
if (!absl::SimpleAtoi(tokens[1], &flags) || !absl::SimpleAtoi(tokens[2], &res->expire_ts) ||
76-
!absl::SimpleAtoi(tokens[3], &res->bytes_len))
77+
!absl::SimpleAtoi(tokens[3], &bytes_len))
7778
return MP::BAD_INT;
7879

7980
if (res->type == MP::CAS && !absl::SimpleAtoi(tokens[4], &res->cas_unique)) {
@@ -93,6 +94,7 @@ MP::Result ParseStore(ArgSlice tokens, MP::Command* res) {
9394

9495
string_view key = tokens[0];
9596
res->Assign(&key, &key + 1, 1);
97+
res->PushArg(bytes_len);
9698

9799
return MP::OK;
98100
}
@@ -199,12 +201,12 @@ MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
199201
return MP::PARSE_ERROR;
200202

201203
res->meta = true;
202-
res->bytes_len = 0;
203204
res->flags = 0;
204205
res->expire_ts = 0;
205206

206207
string_view arg0 = tokens[0];
207208
tokens.remove_prefix(1);
209+
uint32_t bytes_len = 0;
208210

209211
// We emulate the behavior by returning the high level commands.
210212
// TODO: we should reverse the interface in the future, so that a high level command
@@ -218,10 +220,9 @@ MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
218220
res->type = MP::DELETE;
219221
break;
220222
case MP::META_SET:
221-
if (tokens.empty()) {
223+
if (tokens.empty())
222224
return MP::PARSE_ERROR;
223-
}
224-
if (!absl::SimpleAtoi(tokens[0], &res->bytes_len))
225+
if (!absl::SimpleAtoi(tokens[0], &bytes_len))
225226
return MP::BAD_INT;
226227

227228
res->type = MP::SET;
@@ -292,16 +293,25 @@ MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
292293
}
293294
}
294295
res->Assign(&arg0, &arg0 + 1, 1);
295-
296+
if (MP::IsStoreCmd(res->type)) {
297+
res->PushArg(bytes_len);
298+
}
296299
return MP::OK;
297300
}
298301

299302
} // namespace
300303

301304
auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
305+
DVLOG(1) << "Parsing memcache input: [" << str << "]";
306+
307+
*consumed = 0;
308+
309+
if (val_len_to_read_ > 0) {
310+
return ConsumeValue(str, consumed, cmd);
311+
}
312+
302313
cmd->no_reply = false; // re-initialize
303314
auto pos = str.find("\r\n");
304-
*consumed = 0;
305315
if (pos == string_view::npos) {
306316
// We need more data to parse the command. For get/gets commands this line can be very long.
307317
// we limit maximum buffer capacity in the higher levels using max_client_iobuf_len.
@@ -338,11 +348,26 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
338348
return MP::PARSE_ERROR;
339349
}
340350

341-
return ParseStore(tokens_view, cmd);
351+
auto res = ParseStore(tokens_view, cmd);
352+
if (res != MP::OK)
353+
return res;
354+
val_len_to_read_ = cmd->value().size() + 2;
355+
return ConsumeValue(str.substr(pos + 2), consumed, cmd);
342356
}
343357

344358
if (cmd->type >= META_SET) {
345-
return tokens_view.empty() ? MP::PARSE_ERROR : ParseMeta(tokens_view, cmd);
359+
if (tokens_view.empty())
360+
return MP::PARSE_ERROR;
361+
362+
auto res = ParseMeta(tokens_view, cmd);
363+
if (res != MP::OK)
364+
return res;
365+
366+
if (IsStoreCmd(cmd->type)) {
367+
val_len_to_read_ = cmd->value().size() + 2;
368+
res = ConsumeValue(str.substr(pos + 2), consumed, cmd);
369+
}
370+
return res;
346371
}
347372

348373
if (tokens_view.empty()) {
@@ -356,4 +381,43 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
356381
return ParseValueless(tokens_view, &tmp_args_, cmd);
357382
};
358383

384+
auto MP::ConsumeValue(std::string_view str, uint32_t* consumed, Command* dest) -> Result {
385+
DCHECK_EQ(dest->size(), 2u); // key and value
386+
DCHECK_GT(val_len_to_read_, 0u);
387+
388+
if (val_len_to_read_ > 2) {
389+
uint32_t need_copy = val_len_to_read_ - 2;
390+
uint32_t dest_len = dest->elem_len(1);
391+
DCHECK_GE(dest_len, need_copy); // should be ensured during parsing
392+
393+
char* start = dest->value_ptr() + (dest_len - need_copy);
394+
uint32_t to_fill = std::min<uint32_t>(need_copy, str.size());
395+
if (to_fill) {
396+
memcpy(start, str.data(), to_fill);
397+
val_len_to_read_ -= to_fill;
398+
*consumed += to_fill;
399+
str.remove_prefix(to_fill);
400+
}
401+
}
402+
403+
if (str.empty()) {
404+
return MP::INPUT_PENDING;
405+
}
406+
407+
DCHECK(val_len_to_read_ <= 2u && val_len_to_read_ > 0);
408+
// consume \r\n
409+
char end[] = "\r\n";
410+
411+
do {
412+
if (str.front() != end[2 - val_len_to_read_]) // val_len_to_read_ 2 -> '\r', 1 -> '\n'
413+
return MP::PARSE_ERROR;
414+
415+
++(*consumed);
416+
--val_len_to_read_;
417+
str.remove_prefix(1);
418+
} while (val_len_to_read_ && !str.empty());
419+
420+
return val_len_to_read_ > 0 ? MP::INPUT_PENDING : MP::OK;
421+
}
422+
359423
} // namespace facade

src/facade/memcache_parser.h

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <vector>
1111

1212
#include "common/backed_args.h"
13+
#include "facade/facade_types.h"
1314

1415
namespace facade {
1516

@@ -63,14 +64,21 @@ class MemcacheParser {
6364
std::string_view key() const {
6465
return empty() ? std::string_view{} : Front();
6566
}
67+
68+
// For STORE commands, value is at index 1.
69+
// For both key and value we provide convenience accessors that return empty string_view
70+
// if not present.
71+
std::string_view value() const {
72+
return size() < 2 ? std::string_view{} : at(1);
73+
}
74+
6675
union {
6776
uint64_t cas_unique = 0; // for CAS COMMAND
6877
uint64_t delta; // for DECR/INCR commands.
6978
};
7079

7180
uint32_t expire_ts =
7281
0; // relative (expire_ts <= month) or unix time (expire_ts > month) in seconds
73-
uint32_t bytes_len = 0;
7482
uint32_t flags = 0;
7583
bool no_reply = false; // q
7684
bool meta = false;
@@ -83,9 +91,13 @@ class MemcacheParser {
8391
bool return_access_time = false; // l
8492
bool return_hit = false; // h
8593
bool return_version = false; // c
94+
95+
char* value_ptr() {
96+
return storage_.data() + elem_capacity(0);
97+
}
8698
};
8799

88-
enum Result {
100+
enum Result : uint8_t {
89101
OK,
90102
INPUT_PENDING,
91103
UNKNOWN_CMD,
@@ -102,9 +114,16 @@ class MemcacheParser {
102114
return tmp_args_.capacity() * sizeof(std::string_view);
103115
}
104116

117+
void Reset() {
118+
val_len_to_read_ = 0;
119+
}
120+
105121
Result Parse(std::string_view str, uint32_t* consumed, Command* res);
106122

107123
private:
124+
Result ConsumeValue(std::string_view str, uint32_t* consumed, Command* dest);
125+
126+
uint32_t val_len_to_read_ = 0;
108127
std::vector<std::string_view> tmp_args_;
109128
};
110129

0 commit comments

Comments
 (0)