Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 55 additions & 27 deletions src/brpc/builtin/rpcz_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,35 @@ static void PrintElapse(std::ostream& os, int64_t cur_time,

static void PrintAnnotations(
std::ostream& os, int64_t cur_time, int64_t* last_time,
SpanInfoExtractor** extractors, int num_extr) {
SpanInfoExtractor** extractors, int num_extr, const RpczSpan* span) {
int64_t anno_time;
std::string a;
const char* span_type_str = "Span";
if (span) {
switch (span->type()) {
case SPAN_TYPE_SERVER:
span_type_str = "ServerSpan";
break;
case SPAN_TYPE_CLIENT:
span_type_str = "ClientSpan";
break;
case SPAN_TYPE_BTHREAD:
span_type_str = "BthreadSpan";
break;
}
}

// TODO: Going through all extractors is not strictly correct because
// later extractors may have earlier annotations.
for (int i = 0; i < num_extr; ++i) {
while (extractors[i]->PopAnnotation(cur_time, &anno_time, &a)) {
PrintRealTime(os, anno_time);
PrintElapse(os, anno_time, last_time);
os << ' ' << WebEscape(a);
os << ' ';
if (span) {
os << '[' << span_type_str << ' ' << SPAN_ID_STR << '=' << Hex(span->span_id()) << "] ";
}
os << WebEscape(a);
if (a.empty() || butil::back_char(a) != '\n') {
os << '\n';
}
Expand All @@ -204,12 +223,12 @@ static void PrintAnnotations(

static bool PrintAnnotationsAndRealTimeSpan(
std::ostream& os, int64_t cur_time, int64_t* last_time,
SpanInfoExtractor** extr, int num_extr) {
SpanInfoExtractor** extr, int num_extr, const RpczSpan* span) {
if (cur_time == 0) {
// the field was not set.
return false;
}
PrintAnnotations(os, cur_time, last_time, extr, num_extr);
PrintAnnotations(os, cur_time, last_time, extr, num_extr, span);
PrintRealTime(os, cur_time);
PrintElapse(os, cur_time, last_time);
return true;
Expand Down Expand Up @@ -239,9 +258,10 @@ static void PrintClientSpan(
extr[num_extr++] = server_extr;
}
extr[num_extr++] = &client_extr;
// start_send_us is always set for client spans.
CHECK(PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(),
last_time, extr, num_extr));
if (!PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(),
last_time, extr, num_extr, &span)) {
os << " start_send_real_us:not-set";
}
const Protocol* protocol = FindProtocol(span.protocol());
const char* protocol_name = (protocol ? protocol->name : "Unknown");
const butil::EndPoint remote_side(butil::int2ip(span.remote_ip()), span.remote_port());
Expand Down Expand Up @@ -271,12 +291,12 @@ static void PrintClientSpan(
os << std::endl;

if (PrintAnnotationsAndRealTimeSpan(os, span.sent_real_us(),
last_time, extr, num_extr)) {
os << " Requested(" << span.request_size() << ") [1]" << std::endl;
last_time, extr, num_extr, &span)) {
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Requested(" << span.request_size() << ") [1]" << std::endl;
}
if (PrintAnnotationsAndRealTimeSpan(os, span.received_real_us(),
last_time, extr, num_extr)) {
os << " Received response(" << span.response_size() << ")";
last_time, extr, num_extr, &span)) {
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Received response(" << span.response_size() << ")";
if (span.base_cid() != 0 && span.ending_cid() != 0) {
int64_t ver = span.ending_cid() - span.base_cid();
if (ver >= 1) {
Expand All @@ -289,18 +309,18 @@ static void PrintClientSpan(
}

if (PrintAnnotationsAndRealTimeSpan(os, span.start_parse_real_us(),
last_time, extr, num_extr)) {
os << " Processing the response in a new bthread" << std::endl;
last_time, extr, num_extr, &span)) {
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the response in a new bthread" << std::endl;
}

if (PrintAnnotationsAndRealTimeSpan(
os, span.start_callback_real_us(),
last_time, extr, num_extr)) {
os << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
last_time, extr, num_extr, &span)) {
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] " << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
}

PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
last_time, extr, num_extr);
last_time, extr, num_extr, &span);
}

static void PrintClientSpan(std::ostream& os,const RpczSpan& span,
Expand All @@ -318,7 +338,15 @@ static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t* la
extr[num_extr++] = server_extr;
}
extr[num_extr++] = &client_extr;
PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr, num_extr);

// Print span id for bthread span context identification
os << " [BthreadSpan " << SPAN_ID_STR << '=' << Hex(span.span_id());
if (span.parent_span_id() != 0) {
os << " parent_span=" << Hex(span.parent_span_id());
}
os << "] ";

PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr, num_extr, &span);
}

static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
Expand Down Expand Up @@ -348,16 +376,16 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
os << std::endl;
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_parse_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
os << " Processing the request in a new bthread" << std::endl;
&last_time, extr, ARRAY_SIZE(extr), &span)) {
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the request in a new bthread" << std::endl;
}

bool entered_user_method = false;
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_callback_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
&last_time, extr, ARRAY_SIZE(extr), &span)) {
entered_user_method = true;
os << " Enter " << WebEscape(span.full_method_name()) << std::endl;
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Enter " << WebEscape(span.full_method_name()) << std::endl;
}

const int nclient = span.client_spans_size();
Expand All @@ -372,22 +400,22 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,

if (PrintAnnotationsAndRealTimeSpan(
os, span.start_send_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
&last_time, extr, ARRAY_SIZE(extr), &span)) {
if (entered_user_method) {
os << " Leave " << WebEscape(span.full_method_name()) << std::endl;
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Leave " << WebEscape(span.full_method_name()) << std::endl;
} else {
os << " Responding" << std::endl;
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responding" << std::endl;
}
}

if (PrintAnnotationsAndRealTimeSpan(
os, span.sent_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
os << " Responded(" << span.response_size() << ')' << std::endl;
&last_time, extr, ARRAY_SIZE(extr), &span)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please post some of the display information?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As shown below, the type of span (server span, client span, bthread span) is indicated at the beginning to facilitate troubleshooting.

Fetching new trace TRACE#B from HOST#X

Received request REQ#65674 from HOST#Y via PROTO#1 log=0 trace=TRACE#B span=SPAN#S1

[ServerSpan span=SPAN#S1] Processing the request in a new worker
[ServerSpan span=SPAN#S1] ParseStats[cut=?, queue=?, worker=?, rpc=?]
[ServerSpan span=SPAN#S1] Enter Ecom.OrderService.PlaceOrder

[ClientSpan span=SPAN#C1] Cart.AddItem item=ITEM# qty=QTY# price=PRICE# user=USER# session=SESSION#
[ClientSpan span=SPAN#C1] Cart.VerifyOperation item=ITEM# op=OP# user=USER# session=SESSION#
[ClientSpan span=SPAN#C1] Cart.ProposeUpdate item=ITEM# op=OP# user=USER# session=SESSION# attempt=SN#
[ClientSpan span=SPAN#C1] Orchestration.Manager: enqueue step=PlaceOrder first=I# last=I#
[ClientSpan span=SPAN#C1] Orchestration.Queue: pending step

Requesting Ecom.Consensus.Append@HOST#Z PROTO#1 call=CALL# trace=TRACE#B span=SPAN#C1
[ClientSpan span=SPAN#C1] Requested(REQ#65778) [1]
[ClientSpan span=SPAN#C1] Received response(RSP#) of request[1]
[ClientSpan span=SPAN#C1] Processing the response in a new worker
[ClientSpan span=SPAN#C1] Enter client callback

[ClientSpan span=SPAN#C1] Workflow.OnApply, ongoing tasks: 0
[ClientSpan span=SPAN#C1] TaskQueue status running=R# queues=Q# active=A#
[ClientSpan span=SPAN#C1] OrderPipeline.WriteOrder
[ClientSpan span=SPAN#C1] Payment.Authorize
[ClientSpan span=SPAN#C1] Inventory.Reserve
[ClientSpan span=SPAN#C1] Promotion.Apply
[ClientSpan span=SPAN#C1] Promotion.Apply_done
[ClientSpan span=SPAN#C1] Notification.Prepare
[ClientSpan span=SPAN#C1] Notification.Prepare_done
[ClientSpan span=SPAN#C1] Inventory.Reserve_done
[ClientSpan span=SPAN#C1] Payment.Authorize_done
[ClientSpan span=SPAN#C1] OrderPipeline.Sync
[ClientSpan span=SPAN#C1] OrderPipeline.WriteOrder done (~2.7ms)
[ClientSpan span=SPAN#C1] Latency stats: avg≈58us p90≈66us p99≈160us
[ClientSpan span=SPAN#C1] Protocol response before join
[ClientSpan span=SPAN#C1] Protocol response after join

[ClientSpan span=SPAN#C1] FulfillmentWorker: ongoing tasks=3
[ClientSpan span=SPAN#C1] FulfillmentWorker: global queue running=R# queues=Q# active=A#
[ClientSpan span=SPAN#C1] FulfillmentWorker: enter
[ClientSpan span=SPAN#C1] Batcher.append
[ClientSpan span=SPAN#C1] Batcher.flush queue_wait≈~6.7ms

Requesting Ecom.Consensus.Append@HOST#Y PROTO#1 call=CALL# trace=TRACE#B span=SPAN#C2
[ClientSpan span=SPAN#C2] Requested(REQ#65778) [1]
[ClientSpan span=SPAN#C2] Received response(RSP#) of request[1]
[ClientSpan span=SPAN#C2] Processing the response in a new worker
[ClientSpan span=SPAN#C2] Enter client callback

[ServerSpan span=SPAN#S1] Leave Ecom.OrderService.PlaceOrder
[ServerSpan span=SPAN#S1] Responded

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This format is a little bit tedious. How about [Server SPAN#S1] Processing the request in a new worker?

os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responded(" << span.response_size() << ')' << std::endl;
}

PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
&last_time, extr, ARRAY_SIZE(extr));
&last_time, extr, ARRAY_SIZE(extr), &span);
}

class RpczSpanFilter : public SpanFilter {
Expand Down
26 changes: 16 additions & 10 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/rdma/rdma_helper.h"
#include "brpc/policy/esp_authenticator.h"
#include "brpc/details/controller_private_accessor.h"

namespace brpc {

Expand Down Expand Up @@ -490,7 +491,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
}
cntl->set_used_by_rpc();

if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
if (cntl->_sender == NULL && IsTraceable(Span::tls_parent().get())) {
const int64_t start_send_us = butil::cpuwide_time_us();
const std::string* method_name = NULL;
if (_get_method_name) {
Expand All @@ -501,13 +502,20 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
const static std::string NULL_METHOD_STR = "null-method";
method_name = &NULL_METHOD_STR;
}
Span* span = Span::CreateClientSpan(
std::shared_ptr<Span> span = Span::CreateClientSpan(
*method_name, start_send_real_us - start_send_us);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
span->set_start_send_us(start_send_us);
cntl->_span = span;
if (span) {
ControllerPrivateAccessor accessor(cntl);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
span->set_start_send_us(start_send_us);
#if BRPC_SPAN_ENABLE_SHARED_PTR_API
accessor.set_span(span);
#else
accessor.set_span(span.get());
#endif
}
}
// Override some options if they haven't been set by Controller
if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
Expand Down Expand Up @@ -608,9 +616,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
// be woken up by callback when RPC finishes (succeeds or still
// fails after retry)
Join(correlation_id);
if (cntl->_span) {
cntl->SubmitSpan();
}
cntl->SubmitSpan();
cntl->OnRPCEnd(butil::gettimeofday_us());
}
}
Expand Down
Loading
Loading