diff --git a/src/Makefile.am b/src/Makefile.am index dd07a25a..2e8718f6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -53,6 +53,7 @@ nutcracker_SOURCES = \ nc_array.c nc_array.h \ nc_util.c nc_util.h \ nc_queue.h \ + nc_monitor.c nc_monitor.h \ nc.c nutcracker_LDADD = $(top_builddir)/src/hashkit/libhashkit.a @@ -81,7 +82,8 @@ test_all_SOURCES = test_all.c \ nc_string.c nc_string.h \ nc_array.c nc_array.h \ nc_util.c nc_util.h \ - nc_queue.h + nc_queue.h \ + nc_monitor.c nc_monitor.h test_all_LDADD = $(top_builddir)/src/hashkit/libhashkit.a test_all_LDADD += $(top_builddir)/src/proto/libproto.a diff --git a/src/nc_array.c b/src/nc_array.c index efc8c08c..3753d1c8 100644 --- a/src/nc_array.c +++ b/src/nc_array.c @@ -202,3 +202,22 @@ array_each(const struct array *a, array_each_t func, void *data) return NC_OK; } + +rstatus_t +array_del(struct array *a, uint32_t idx) +{ + uint8_t *pos = NULL; + uint64_t len = 0; + + if (a->nelem == 0 || idx >= a->nelem) { + return NC_ERROR; + } + + pos = (uint8_t*)a->elem + (a->size * idx); + len = (a->nelem - idx - 1) * a->size; + + memmove(pos, pos + a->size, (size_t)len); + a->nelem--; + + return NC_OK; +} diff --git a/src/nc_array.h b/src/nc_array.h index 61669457..7f63dc57 100644 --- a/src/nc_array.h +++ b/src/nc_array.h @@ -69,5 +69,6 @@ void *array_top(const struct array *a); void array_swap(struct array *a, struct array *b); void array_sort(struct array *a, array_compare_t compare); rstatus_t array_each(const struct array *a, array_each_t func, void *data); +rstatus_t array_del(struct array *a, uint32_t idx); #endif diff --git a/src/nc_client.c b/src/nc_client.c index a3cecaac..a1f24f98 100644 --- a/src/nc_client.c +++ b/src/nc_client.c @@ -129,6 +129,11 @@ client_close(struct context *ctx, struct conn *conn) client_close_stats(ctx, conn->owner, conn->err, conn->eof); + /* when client close, if conn in monitor, delete it */ + if (conn->monitor_client) { + del_from_monitor(conn); + } + if (conn->sd < 0) { conn->unref(conn); conn_put(conn); diff --git a/src/nc_conf.c b/src/nc_conf.c index c4e48ddb..9f1ab3c4 100644 --- a/src/nc_conf.c +++ b/src/nc_conf.c @@ -19,6 +19,7 @@ #include #include #include +#include #define DEFINE_ACTION(_hash, _name) string(#_name), static const struct string hash_strings[] = { @@ -110,6 +111,10 @@ static const struct command conf_commands[] = { conf_add_server, offsetof(struct conf_pool, server) }, + { string("enable_monitor"), + conf_set_bool, + offsetof(struct conf_pool, enable_monitor) }, + null_command }; @@ -225,6 +230,8 @@ conf_pool_init(struct conf_pool *cp, const struct string *name) return status; } + cp->enable_monitor = CONF_UNSET_NUM; + log_debug(LOG_VVERB, "init conf pool %p, '%.*s'", cp, name->len, name->data); return NC_OK; @@ -311,6 +318,9 @@ conf_pool_each_transform(void *elem, void *data) return status; } + sp->enable_monitor = cp->enable_monitor ? 1 : 0; + monitor_init(sp); + log_debug(LOG_VERB, "transform to pool %"PRIu32" '%.*s'", sp->idx, sp->name.len, sp->name.data); @@ -1282,6 +1292,10 @@ conf_validate_pool(struct conf *cf, struct conf_pool *cp) return status; } + if (cp->enable_monitor == CONF_UNSET_NUM) { + cp->enable_monitor = CONF_DEFAULT_ENABLE_MONITOR; + } + cp->valid = 1; return NC_OK; diff --git a/src/nc_conf.h b/src/nc_conf.h index 6f861949..881d9e2a 100644 --- a/src/nc_conf.h +++ b/src/nc_conf.h @@ -55,6 +55,7 @@ #define CONF_DEFAULT_SERVER_CONNECTIONS 1 #define CONF_DEFAULT_KETAMA_PORT 11211 #define CONF_DEFAULT_TCPKEEPALIVE false +#define CONF_DEFAULT_ENABLE_MONITOR false struct conf_listen { struct string pname; /* listen: as "hostname:port" */ @@ -94,6 +95,7 @@ struct conf_pool { int server_retry_timeout; /* server_retry_timeout: in msec */ int server_failure_limit; /* server_failure_limit: */ struct array server; /* servers: conf_server[] */ + int enable_monitor; /* enable_monitor: */ unsigned valid:1; /* valid? */ }; diff --git a/src/nc_connection.c b/src/nc_connection.c index d37ac8bb..9c20a515 100644 --- a/src/nc_connection.c +++ b/src/nc_connection.c @@ -157,6 +157,7 @@ _conn_get(void) conn->done = 0; conn->redis = 0; conn->authenticated = 0; + conn->monitor_client = 0; ntotal_conn++; ncurr_conn++; diff --git a/src/nc_connection.h b/src/nc_connection.h index 2fe83d42..caf06257 100644 --- a/src/nc_connection.h +++ b/src/nc_connection.h @@ -89,6 +89,7 @@ struct conn { unsigned done:1; /* done? aka close? */ unsigned redis:1; /* redis? */ unsigned authenticated:1; /* authenticated? */ + unsigned monitor_client:1;/* monitor client? */ }; TAILQ_HEAD(conn_tqh, conn); diff --git a/src/nc_core.h b/src/nc_core.h index 3ac6dce6..0ea92d5b 100644 --- a/src/nc_core.h +++ b/src/nc_core.h @@ -117,6 +117,7 @@ struct event_base; #include #include #include +#include struct context { uint32_t id; /* unique context id */ diff --git a/src/nc_message.c b/src/nc_message.c index cf53ed46..b3fb5782 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -273,6 +273,7 @@ _msg_get(void) msg->fdone = 0; msg->swallow = 0; msg->redis = 0; + msg->monitor = 0; return msg; } @@ -910,3 +911,28 @@ bool msg_set_placeholder_key(struct msg *r) return true; } +rstatus_t +msg_append_full(struct msg *msg, uint8_t *pos, size_t n) +{ + struct mbuf *mbuf = NULL; + size_t cidx = 0; + size_t mbsize = 0; + size_t clen = 0; + + do { + mbuf = msg_ensure_mbuf(msg, n); + if (mbuf == NULL) { + return NC_ENOMEM; + } + + mbsize = mbuf_size(mbuf); + + clen = n > mbsize ? mbsize : n; + mbuf_copy(mbuf, pos+cidx, clen); + cidx += clen; + msg->mlen += (uint32_t)clen; + n -= clen; + } while(n); + + return NC_OK; +} diff --git a/src/nc_message.h b/src/nc_message.h index 26a063bc..b7276563 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -51,6 +51,7 @@ typedef enum msg_parse_result { ACTION( REQ_MC_TOUCH ) /* memcache touch request */ \ ACTION( REQ_MC_QUIT ) /* memcache quit request */ \ ACTION( REQ_MC_VERSION ) /* memcache version request */ \ + ACTION( REQ_MC_MONITOR ) /* memcache monitor request, only used for proxy */ \ ACTION( RSP_MC_NUM ) /* memcache arithmetic response */ \ ACTION( RSP_MC_STORED ) /* memcache cas and storage response */ \ ACTION( RSP_MC_NOT_STORED ) \ @@ -202,6 +203,7 @@ typedef enum msg_parse_result { ACTION( REQ_REDIS_SELECT) /* only during init */ \ ACTION( REQ_REDIS_COMMAND) /* Sent to random server for redis-cli completions*/ \ ACTION( REQ_REDIS_LOLWUT) /* Vitally important */ \ + ACTION( REQ_REDIS_MONITOR) /* monitor */ \ ACTION( RSP_REDIS_STATUS ) /* redis response */ \ ACTION( RSP_REDIS_ERROR ) \ ACTION( RSP_REDIS_ERROR_ERR ) \ @@ -300,6 +302,7 @@ struct msg { unsigned fdone:1; /* all fragments are done? */ unsigned swallow:1; /* swallow response? */ unsigned redis:1; /* redis? */ + unsigned monitor:1; /* monitor comamnd? */ }; TAILQ_HEAD(msg_tqh, msg); @@ -350,4 +353,6 @@ void rsp_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, stru struct msg *rsp_send_next(struct context *ctx, struct conn *conn); void rsp_send_done(struct context *ctx, struct conn *conn, struct msg *msg); +rstatus_t msg_append_full(struct msg *msg, uint8_t *pos, size_t n); + #endif diff --git a/src/nc_monitor.c b/src/nc_monitor.c new file mode 100644 index 00000000..ab28b2af --- /dev/null +++ b/src/nc_monitor.c @@ -0,0 +1,174 @@ +/* + * twemproxy - A fast and lightweight proxy for memcached protocol. + * + * Copyright (C) 2021, wei huang + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +void +monitor_init(struct server_pool *sp) +{ + if (sp->enable_monitor) { + array_init(&sp->monitor_conns, CONF_DEFAULT_ARRAY_MONITOR_NUM, sizeof(struct conn *)); + } +} + +void +monitor_deinit(struct server_pool *sp) +{ + struct array *monitor_conns = &sp->monitor_conns; + + ASSERT(monitor_conns != NULL); + + if (sp->enable_monitor) { + while (array_n(monitor_conns) > 0) { + array_pop(monitor_conns); + } + array_deinit(monitor_conns); + } +} + +rstatus_t +add_to_monitor(struct conn *c) +{ + struct server_pool *sp = c->owner; + + ASSERT(c->client && sp != NULL); + + struct conn **monitor = array_push(&sp->monitor_conns); + if (monitor == NULL) { + return NC_ENOMEM; + } + c->monitor_client = 1; + *monitor = c; + + return NC_OK; +} + +void +del_from_monitor(struct conn *c) +{ + uint32_t i; + struct conn **tmp_conn = NULL; + struct server_pool *sp = c->owner; + struct array *a = NULL; + + ASSERT(c->client && c->monitor_client); + ASSERT(sp != NULL); + + a = &sp->monitor_conns; + for (i = 0; i < array_n(a); i++) { + tmp_conn = array_get(a, i); + if (*tmp_conn == c) { + array_del(a, i); + break; + } + } +} + +struct monitor_data +{ + struct msg *m; + struct conn *c; + struct context *ctx; + struct string *d; +}; + +static int +monitor_callback(void *conn, void *data) +{ + struct monitor_data *mdata = data; + struct conn **monitor_conn = conn; + struct conn *req_c = *monitor_conn; + + struct msg *req = req_get(req_c); + if (req == NULL) { + return NC_ENOMEM; + } + struct msg *rsp = msg_get(req_c, 0, mdata->c->redis); + if (rsp == NULL) { + msg_put(req); + return NC_ENOMEM; + } + + req->peer = rsp; + rsp->peer = req; + + req->done = 1; + rsp->done = 1; + + if (msg_append_full(rsp, mdata->d->data, mdata->d->len) != NC_OK) { + msg_put(req); + msg_put(rsp); + return NC_ENOMEM; + } + req_c->enqueue_outq(mdata->ctx, req_c, req); + if (event_add_out(mdata->ctx->evb, req_c) != NC_OK) { + req_c->err = errno; + req_c->dequeue_outq(mdata->ctx, req_c, req); + msg_put(req); + msg_put(rsp); + return NC_ERROR; + } + + return NC_OK; +} + +rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg *m) +{ + ASSERT(c->client); + + struct server_pool *sp = c->owner; + struct string monitor_message = null_string; + struct monitor_data mdata = {0}; + mdata.m = m; + mdata.c = c; + mdata.d = &monitor_message; + mdata.ctx = ctx; + struct keypos kpos = {0}; + struct keypos *tmp_kpos = NULL; + + /* Only redis command command has a fake key. */ + if (m->type != MSG_REQ_REDIS_COMMAND) { + tmp_kpos = array_get(m->keys, 0); + + kpos.start = tmp_kpos->start; + kpos.end = tmp_kpos->end; + } + + if (c->redis) { + string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=%.*s\r\n", + m->start_ts/1000000, m->start_ts%1000000, + nc_unresolve_peer_desc(c->sd), + (msg_type_string(m->type))->data, + kpos.end - kpos.start, kpos.start); + + } else { + /* This monitor protocol only for twemproxy. */ + string_printf(&monitor_message, "MONITOR\r\n%ld.%06ld [%s] command=%s key0=%.*s\r\nEND\r\n", + m->start_ts/1000000, m->start_ts%1000000, + nc_unresolve_peer_desc(c->sd), + (msg_type_string(m->type))->data, + kpos.end - kpos.start, kpos.start); + } + + array_each(&sp->monitor_conns, monitor_callback, &mdata); + + string_deinit(&monitor_message); + return NC_OK; +} diff --git a/src/nc_monitor.h b/src/nc_monitor.h new file mode 100644 index 00000000..76e897c2 --- /dev/null +++ b/src/nc_monitor.h @@ -0,0 +1,41 @@ +/* + * twemproxy - A fast and lightweight proxy for memcached protocol. + * + * Copyright (C) 2021, wei huang + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NC_MONITOR_H +#define NC_MONITOR_H + +#include +#include + +#define CONF_DEFAULT_ARRAY_MONITOR_NUM 2 + +void monitor_init(struct server_pool *sp); +void monitor_deinit(struct server_pool *sp); + +static inline bool +monitor_is_empty(struct server_pool *sp) +{ + return sp->monitor_conns.nelem == 0; +} + +rstatus_t add_to_monitor(struct conn *c); +void del_from_monitor(struct conn *c); +rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg *m); + +#endif \ No newline at end of file diff --git a/src/nc_rbtree.h b/src/nc_rbtree.h index 4b6137ac..3d7bafe0 100644 --- a/src/nc_rbtree.h +++ b/src/nc_rbtree.h @@ -23,6 +23,7 @@ #define rbtree_is_red(_node) ((_node)->color) #define rbtree_is_black(_node) (!rbtree_is_red(_node)) #define rbtree_copy_color(_n1, _n2) ((_n1)->color = (_n2)->color) +#define rbtree_is_empty(_tree) ((_tree)->root == (_tree)->sentinel) struct rbnode { struct rbnode *left; /* left link */ diff --git a/src/nc_request.c b/src/nc_request.c index d69dd1ee..34ccfbc1 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -474,8 +474,13 @@ req_make_reply(struct context *ctx, struct conn *conn, struct msg *req) static bool req_filter(struct conn *conn, struct msg *msg) { + struct server_pool *sp; + ASSERT(conn->client && !conn->proxy); + sp = conn->owner; + ASSERT(sp != NULL); + if (msg_empty(msg)) { ASSERT(conn->rmsg == NULL); log_debug(LOG_VERB, "filter empty req %"PRIu64" from c %d", msg->id, @@ -503,6 +508,15 @@ req_filter(struct conn *conn, struct msg *msg) return true; } + /* + * Handle monitor command. + */ + if (sp->enable_monitor && msg->monitor) { + add_to_monitor(conn); + req_put(msg); + return true; + } + /* * If this conn is not authenticated, we will mark it as noforward, * and handle it in the redis_reply handler. @@ -667,8 +681,13 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, return; } - /* do fragment */ pool = conn->owner; + /* if have monitor client, make monitor info. */ + if (pool->enable_monitor && !monitor_is_empty(pool)) { + rsp_send_monitor_msg(ctx, conn, msg); + } + + /* do fragment */ TAILQ_INIT(&frag_msgq); status = msg->fragment(msg, array_n(&pool->server), &frag_msgq); if (status != NC_OK) { diff --git a/src/nc_server.c b/src/nc_server.c index dab6a79b..f4bd4d43 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -922,6 +922,7 @@ server_pool_deinit(struct array *server_pool) } server_deinit(&sp->server); + monitor_deinit(sp); log_debug(LOG_DEBUG, "deinit pool %"PRIu32" '%.*s'", sp->idx, sp->name.len, sp->name.data); diff --git a/src/nc_server.h b/src/nc_server.h index b9798db0..91ed7778 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -116,11 +116,13 @@ struct server_pool { int64_t server_retry_timeout; /* server retry timeout in usec */ uint32_t server_failure_limit; /* server failure limit */ struct string redis_auth; /* redis_auth password (matches requirepass on redis) */ + struct array monitor_conns; /* monitor connections */ unsigned require_auth; /* require_auth? */ unsigned auto_eject_hosts:1; /* auto_eject_hosts? */ unsigned preconnect:1; /* preconnect? */ unsigned redis:1; /* redis? */ unsigned tcpkeepalive:1; /* tcpkeepalive? */ + unsigned enable_monitor:1; /* enable_monitor? */ }; void server_ref(struct conn *conn, void *owner); diff --git a/src/nc_string.c b/src/nc_string.c index dffaeafd..92a99ab8 100644 --- a/src/nc_string.c +++ b/src/nc_string.c @@ -175,6 +175,34 @@ _safe_itoa(int base, int64_t val, char *buf) return buf + 1; } +static const char * +_safe_check_placeholder(const char *fmt, int32_t *placeholder_len, bool *is_variable) { + *placeholder_len = 0; + *is_variable = false; + + if (*fmt == '0') { + fmt++; + + while (isdigit(*fmt)) { + *placeholder_len = *placeholder_len * 10 + (*fmt - '0'); + fmt++; + } + } else if (*fmt == '.') { + fmt++; + if (*fmt == '*') { + *is_variable = true; + fmt++; + } else { + while (isdigit(*fmt)) { + *placeholder_len = *placeholder_len * 10 + (*fmt - '0'); + fmt++; + } + } + } + + return fmt; +} + static const char * _safe_check_longlong(const char *fmt, int32_t * have_longlong) { @@ -192,14 +220,20 @@ _safe_check_longlong(const char *fmt, int32_t * have_longlong) } int -_safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) +_safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_list ap) { char *start = to; char *end = start + size - 1; + if (parse_done) *parse_done = 1; + bool is_variable = false; + for (; *format; ++format) { int32_t have_longlong = false; + int32_t placeholder_len = false; + int32_t placeholder_num = 0; if (*format != '%') { if (to == end) { /* end of buffer */ + if (parse_done) *parse_done = 0; break; } *to++ = *format; /* copy ordinary char */ @@ -207,6 +241,7 @@ _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) } ++format; /* skip '%' */ + format = _safe_check_placeholder(format, &placeholder_len, &is_variable); format = _safe_check_longlong(format, &have_longlong); switch (*format) { @@ -235,7 +270,7 @@ _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) } { - char buff[22]; + char buff[22] = {0}; const int base = (*format == 'x' || *format == 'p') ? 16 : 10; /* *INDENT-OFF* */ @@ -248,6 +283,14 @@ _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) if (*format == 'x' && !have_longlong && ival < 0) { val_as_str += 8; } + + if (placeholder_len) { + placeholder_num = (int32_t)(&buff[sizeof(buff) - 1] - val_as_str); + while (placeholder_len > placeholder_num && to < end) { + *to++ = '0'; + placeholder_num++; + } + } while (*val_as_str && to < end) { *to++ = *val_as_str++; @@ -257,11 +300,25 @@ _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) } case 's': { + if (is_variable) { + placeholder_len = (int32_t)va_arg(ap, int64_t); + } + const char *val = va_arg(ap, char *); if (!val) { val = "(null)"; + if (is_variable) { + /* placeholder_len = nc_strlen(val); */ + placeholder_len = 6; + } } while (*val && to < end) { + if (is_variable) { + if (placeholder_len == 0) { + break; + } + placeholder_len--; + } *to++ = *val++; } continue; @@ -278,7 +335,76 @@ _safe_snprintf(char *to, size_t n, const char *fmt, ...) int result; va_list args; va_start(args, fmt); - result = _safe_vsnprintf(to, n, fmt, args); + result = _safe_vsnprintf(to, n, NULL, fmt, args); va_end(args); return result; } + +rstatus_t +string_printf(struct string *s, const char *fmt, ...) +{ + char static_buff[1024] = {0}; + char *buf = NULL; + size_t buflen = sizeof(static_buff); + int bufstrlen, parse_done; + + buf = static_buff; + + va_list args, cpy; + va_start(args, fmt); + while(1) { + va_copy(cpy, args); + bufstrlen = _safe_vsnprintf(buf, buflen, &parse_done, fmt, cpy); + va_end(cpy); + + if (!parse_done) { + nc_free(buf); + buflen += 256; + buf = nc_alloc(buflen); + if (buf == NULL) { + return NC_ENOMEM; + } + continue; + } else { + buf = nc_zalloc(bufstrlen + 1); + if (buf == NULL) { + return NC_ENOMEM; + } + + memcpy(buf ,static_buff, (size_t)bufstrlen); + } + + break; + } + va_end(args); + + s->len = (uint32_t)bufstrlen; + s->data = (uint8_t*)buf; + + return NC_OK; +} + +rstatus_t string_cat_len(struct string *dst, uint8_t *data, uint32_t len) { + if (len == 0) { + return NC_OK; + } + + uint8_t *buf = dst->data; + uint32_t newlen = dst->len + len + 1; + + buf = nc_realloc(dst->data, newlen); + if (buf == NULL) { + return NC_ENOMEM; + } + + nc_memcpy(buf + dst->len, data, len); + buf[newlen-1] = '\0'; + dst->len = newlen-1; + dst->data = buf; + + return NC_OK; +} + +rstatus_t string_cat(struct string *dst, struct string *src) { + return string_cat_len(dst, src->data, src->len); +} diff --git a/src/nc_string.h b/src/nc_string.h index 755305ea..a842aeb4 100644 --- a/src/nc_string.h +++ b/src/nc_string.h @@ -117,14 +117,14 @@ int string_compare(const struct string *s1, const struct string *s2); * Does not support any width/precision * Implemented with simplicity, and async-signal-safety in mind */ -int _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap); +int _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_list ap); int _safe_snprintf(char *to, size_t n, const char *fmt, ...); #define nc_safe_snprintf(_s, _n, ...) \ _safe_snprintf((char *)(_s), (size_t)(_n), __VA_ARGS__) #define nc_safe_vsnprintf(_s, _n, _f, _a) \ - _safe_vsnprintf((char *)(_s), (size_t)(_n), _f, _a) + _safe_vsnprintf((char *)(_s), (size_t)(_n), NULL, _f, _a) static inline uint8_t * _nc_strchr(uint8_t *p, uint8_t *last, uint8_t c) @@ -152,4 +152,8 @@ _nc_strrchr(uint8_t *p, uint8_t *start, uint8_t c) return NULL; } +rstatus_t string_printf(struct string *s, const char *fmt, ...); +rstatus_t string_cat_len(struct string *dst, uint8_t *data, uint32_t len); +rstatus_t string_cat(struct string *dst, struct string *src); + #endif diff --git a/src/proto/nc_memcache.c b/src/proto/nc_memcache.c index 79545976..4c5f32e7 100644 --- a/src/proto/nc_memcache.c +++ b/src/proto/nc_memcache.c @@ -328,6 +328,12 @@ memcache_parse_req(struct msg *r) break; } + if (str7cmp(m, 'm', 'o', 'n', 'i', 't', 'o', 'r')) { + r->type = MSG_REQ_MC_MONITOR; + r->monitor = 1; + break; + } + break; } @@ -352,6 +358,7 @@ memcache_parse_req(struct msg *r) case MSG_REQ_MC_VERSION: case MSG_REQ_MC_QUIT: + case MSG_REQ_MC_MONITOR: p = p - 1; /* go back by 1 byte */ state = SW_CRLF; break; diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index a5ea210f..3a5b72f9 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -47,6 +47,7 @@ redis_argz(const struct msg *r) case MSG_REQ_REDIS_PING: case MSG_REQ_REDIS_QUIT: case MSG_REQ_REDIS_COMMAND: + case MSG_REQ_REDIS_MONITOR: return true; default: @@ -1105,6 +1106,13 @@ redis_parse_req(struct msg *r) break; } + if (str7icmp(m, 'm', 'o', 'n', 'i', 't', 'o', 'r')) { + r->type = MSG_REQ_REDIS_MONITOR; + r->noforward = 1; + r->monitor = 1; + break; + } + break; case 8: