From d470862a9d4215a0fba8286192db1b48779feca6 Mon Sep 17 00:00:00 2001 From: lhh Date: Mon, 10 Nov 2025 17:32:04 +0800 Subject: [PATCH 1/2] Fix span lifecycle with smart pointers to prevent use-after-free in async RPC callbacks (#3068) --- src/brpc/builtin/rpcz_service.cpp | 82 ++-- src/brpc/channel.cpp | 22 +- src/brpc/controller.cpp | 65 ++-- src/brpc/controller.h | 3 +- .../details/controller_private_accessor.h | 4 +- src/brpc/global.cpp | 8 +- src/brpc/policy/baidu_rpc_protocol.cpp | 19 +- src/brpc/policy/esp_protocol.cpp | 6 +- src/brpc/policy/http_rpc_protocol.cpp | 10 +- src/brpc/policy/hulu_pbrpc_protocol.cpp | 9 +- src/brpc/policy/memcache_binary_protocol.cpp | 3 +- src/brpc/policy/nova_pbrpc_protocol.cpp | 3 +- src/brpc/policy/nshead_mcpack_protocol.cpp | 3 +- src/brpc/policy/nshead_protocol.cpp | 13 +- src/brpc/policy/public_pbrpc_protocol.cpp | 5 +- src/brpc/policy/redis_protocol.cpp | 3 +- src/brpc/policy/rtmp_protocol.cpp | 3 +- src/brpc/policy/sofa_pbrpc_protocol.cpp | 7 +- src/brpc/policy/thrift_protocol.cpp | 10 +- src/brpc/policy/ubrpc2pb_protocol.cpp | 3 +- src/brpc/span.cpp | 359 +++++++++++++----- src/brpc/span.h | 153 ++++++-- src/brpc/traceprintf.h | 17 + src/bthread/bthread.cpp | 9 - src/bthread/bthread.h | 28 ++ src/bthread/key.cpp | 37 ++ src/bthread/task_group.cpp | 27 +- src/bthread/task_meta.h | 23 +- src/bthread/unstable.h | 3 - src/bvar/collector.cpp | 5 + test/brpc_channel_unittest.cpp | 11 +- test/bthread_unittest.cpp | 12 +- 32 files changed, 683 insertions(+), 282 deletions(-) diff --git a/src/brpc/builtin/rpcz_service.cpp b/src/brpc/builtin/rpcz_service.cpp index d9121eb555..4e56e2dee5 100644 --- a/src/brpc/builtin/rpcz_service.cpp +++ b/src/brpc/builtin/rpcz_service.cpp @@ -185,16 +185,35 @@ static void PrintElapse(std::ostream& os, int64_t cur_time, static void PrintAnnotations( std::ostream& os, int64_t cur_time, int64_t* last_time, - SpanInfoExtractor** extractors, int num_extr) { + SpanInfoExtractor** extractors, int num_extr, const RpczSpan* span) { int64_t anno_time; std::string a; + const char* span_type_str = "Span"; + if (span) { + switch (span->type()) { + case SPAN_TYPE_SERVER: + span_type_str = "ServerSpan"; + break; + case SPAN_TYPE_CLIENT: + span_type_str = "ClientSpan"; + break; + case SPAN_TYPE_BTHREAD: + span_type_str = "BthreadSpan"; + break; + } + } + // TODO: Going through all extractors is not strictly correct because // later extractors may have earlier annotations. for (int i = 0; i < num_extr; ++i) { while (extractors[i]->PopAnnotation(cur_time, &anno_time, &a)) { PrintRealTime(os, anno_time); PrintElapse(os, anno_time, last_time); - os << ' ' << WebEscape(a); + os << ' '; + if (span) { + os << '[' << span_type_str << ' ' << SPAN_ID_STR << '=' << Hex(span->span_id()) << "] "; + } + os << WebEscape(a); if (a.empty() || butil::back_char(a) != '\n') { os << '\n'; } @@ -204,12 +223,12 @@ static void PrintAnnotations( static bool PrintAnnotationsAndRealTimeSpan( std::ostream& os, int64_t cur_time, int64_t* last_time, - SpanInfoExtractor** extr, int num_extr) { + SpanInfoExtractor** extr, int num_extr, const RpczSpan* span) { if (cur_time == 0) { // the field was not set. return false; } - PrintAnnotations(os, cur_time, last_time, extr, num_extr); + PrintAnnotations(os, cur_time, last_time, extr, num_extr, span); PrintRealTime(os, cur_time); PrintElapse(os, cur_time, last_time); return true; @@ -239,9 +258,10 @@ static void PrintClientSpan( extr[num_extr++] = server_extr; } extr[num_extr++] = &client_extr; - // start_send_us is always set for client spans. - CHECK(PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(), - last_time, extr, num_extr)); + if (!PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(), + last_time, extr, num_extr, &span)) { + os << " start_send_real_us:not-set"; + } const Protocol* protocol = FindProtocol(span.protocol()); const char* protocol_name = (protocol ? protocol->name : "Unknown"); const butil::EndPoint remote_side(butil::int2ip(span.remote_ip()), span.remote_port()); @@ -271,12 +291,12 @@ static void PrintClientSpan( os << std::endl; if (PrintAnnotationsAndRealTimeSpan(os, span.sent_real_us(), - last_time, extr, num_extr)) { - os << " Requested(" << span.request_size() << ") [1]" << std::endl; + last_time, extr, num_extr, &span)) { + os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Requested(" << span.request_size() << ") [1]" << std::endl; } if (PrintAnnotationsAndRealTimeSpan(os, span.received_real_us(), - last_time, extr, num_extr)) { - os << " Received response(" << span.response_size() << ")"; + last_time, extr, num_extr, &span)) { + os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Received response(" << span.response_size() << ")"; if (span.base_cid() != 0 && span.ending_cid() != 0) { int64_t ver = span.ending_cid() - span.base_cid(); if (ver >= 1) { @@ -289,18 +309,18 @@ static void PrintClientSpan( } if (PrintAnnotationsAndRealTimeSpan(os, span.start_parse_real_us(), - last_time, extr, num_extr)) { - os << " Processing the response in a new bthread" << std::endl; + last_time, extr, num_extr, &span)) { + os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the response in a new bthread" << std::endl; } if (PrintAnnotationsAndRealTimeSpan( os, span.start_callback_real_us(), - last_time, extr, num_extr)) { - os << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl; + last_time, extr, num_extr, &span)) { + os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] " << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl; } PrintAnnotations(os, std::numeric_limits::max(), - last_time, extr, num_extr); + last_time, extr, num_extr, &span); } static void PrintClientSpan(std::ostream& os,const RpczSpan& span, @@ -318,7 +338,15 @@ static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t* la extr[num_extr++] = server_extr; } extr[num_extr++] = &client_extr; - PrintAnnotations(os, std::numeric_limits::max(), last_time, extr, num_extr); + + // Print span id for bthread span context identification + os << " [BthreadSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()); + if (span.parent_span_id() != 0) { + os << " parent_span=" << Hex(span.parent_span_id()); + } + os << "] "; + + PrintAnnotations(os, std::numeric_limits::max(), last_time, extr, num_extr, &span); } static void PrintServerSpan(std::ostream& os, const RpczSpan& span, @@ -348,16 +376,16 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span, os << std::endl; if (PrintAnnotationsAndRealTimeSpan( os, span.start_parse_real_us(), - &last_time, extr, ARRAY_SIZE(extr))) { - os << " Processing the request in a new bthread" << std::endl; + &last_time, extr, ARRAY_SIZE(extr), &span)) { + os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the request in a new bthread" << std::endl; } bool entered_user_method = false; if (PrintAnnotationsAndRealTimeSpan( os, span.start_callback_real_us(), - &last_time, extr, ARRAY_SIZE(extr))) { + &last_time, extr, ARRAY_SIZE(extr), &span)) { entered_user_method = true; - os << " Enter " << WebEscape(span.full_method_name()) << std::endl; + os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Enter " << WebEscape(span.full_method_name()) << std::endl; } const int nclient = span.client_spans_size(); @@ -372,22 +400,22 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span, if (PrintAnnotationsAndRealTimeSpan( os, span.start_send_real_us(), - &last_time, extr, ARRAY_SIZE(extr))) { + &last_time, extr, ARRAY_SIZE(extr), &span)) { if (entered_user_method) { - os << " Leave " << WebEscape(span.full_method_name()) << std::endl; + os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Leave " << WebEscape(span.full_method_name()) << std::endl; } else { - os << " Responding" << std::endl; + os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responding" << std::endl; } } if (PrintAnnotationsAndRealTimeSpan( os, span.sent_real_us(), - &last_time, extr, ARRAY_SIZE(extr))) { - os << " Responded(" << span.response_size() << ')' << std::endl; + &last_time, extr, ARRAY_SIZE(extr), &span)) { + os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responded(" << span.response_size() << ')' << std::endl; } PrintAnnotations(os, std::numeric_limits::max(), - &last_time, extr, ARRAY_SIZE(extr)); + &last_time, extr, ARRAY_SIZE(extr), &span); } class RpczSpanFilter : public SpanFilter { diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 0252e97d74..6872ce117f 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -37,6 +37,7 @@ #include "brpc/details/usercode_backup_pool.h" // TooManyUserCode #include "brpc/rdma/rdma_helper.h" #include "brpc/policy/esp_authenticator.h" +#include "brpc/details/controller_private_accessor.h" namespace brpc { @@ -490,7 +491,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, } cntl->set_used_by_rpc(); - if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) { + if (cntl->_sender == NULL && IsTraceable(Span::tls_parent().get())) { const int64_t start_send_us = butil::cpuwide_time_us(); const std::string* method_name = NULL; if (_get_method_name) { @@ -501,13 +502,16 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, const static std::string NULL_METHOD_STR = "null-method"; method_name = &NULL_METHOD_STR; } - Span* span = Span::CreateClientSpan( + std::shared_ptr span = Span::CreateClientSpan( *method_name, start_send_real_us - start_send_us); - span->set_log_id(cntl->log_id()); - span->set_base_cid(correlation_id); - span->set_protocol(_options.protocol); - span->set_start_send_us(start_send_us); - cntl->_span = span; + if (span) { + ControllerPrivateAccessor accessor(cntl); + span->set_log_id(cntl->log_id()); + span->set_base_cid(correlation_id); + span->set_protocol(_options.protocol); + span->set_start_send_us(start_send_us); + accessor.set_span(span); + } } // Override some options if they haven't been set by Controller if (cntl->timeout_ms() == UNSET_MAGIC_NUM) { @@ -608,9 +612,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, // be woken up by callback when RPC finishes (succeeds or still // fails after retry) Join(correlation_id); - if (cntl->_span) { - cntl->SubmitSpan(); - } + cntl->SubmitSpan(); cntl->OnRPCEnd(butil::gettimeofday_us()); } } diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index d4dbab951b..d44f3e4c8f 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -183,8 +183,8 @@ static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; } // you don't have to set the fields to initial state after deletion since // they'll be set uniformly after this method is called. void Controller::ResetNonPods() { - if (_span) { - Span::Submit(_span, butil::cpuwide_time_us()); + if (auto span = _span.lock()) { + Span::Submit(span, butil::cpuwide_time_us()); } _error_text.clear(); _remote_side = butil::EndPoint(); @@ -240,7 +240,7 @@ void Controller::ResetNonPods() { void Controller::ResetPods() { // NOTE: Make the sequence of assignments same with the order that they're // defined in header. Better for cpu cache and faster for lookup. - _span = NULL; + _span.reset(); _flags = 0; #ifndef BAIDU_INTERNAL set_pb_bytes_to_base64(true); @@ -450,9 +450,9 @@ void Controller::SetFailed(const std::string& reason) { AppendServerIdentiy(); } _error_text.append(reason); - if (_span) { - _span->set_error_code(_error_code); - _span->Annotate(reason); + if (auto span = _span.lock()) { + span->set_error_code(_error_code); + span->Annotate(reason); } UpdateResponseHeader(this); } @@ -479,9 +479,9 @@ void Controller::SetFailed(int error_code, const char* reason_fmt, ...) { va_start(ap, reason_fmt); butil::string_vappendf(&_error_text, reason_fmt, ap); va_end(ap); - if (_span) { - _span->set_error_code(_error_code); - _span->AnnotateCStr(_error_text.c_str() + old_size, 0); + if (auto span = _span.lock()) { + span->set_error_code(_error_code); + span->AnnotateCStr(_error_text.c_str() + old_size, 0); } UpdateResponseHeader(this); } @@ -507,9 +507,9 @@ void Controller::CloseConnection(const char* reason_fmt, ...) { va_start(ap, reason_fmt); butil::string_vappendf(&_error_text, reason_fmt, ap); va_end(ap); - if (_span) { - _span->set_error_code(_error_code); - _span->AnnotateCStr(_error_text.c_str() + old_size, 0); + if (auto span = _span.lock()) { + span->set_error_code(_error_code); + span->AnnotateCStr(_error_text.c_str() + old_size, 0); } UpdateResponseHeader(this); } @@ -943,9 +943,9 @@ void Controller::EndRPC(const CompletionInfo& info) { } // RPC finished, now it's safe to release `LoadBalancerWithNaming' _lb.reset(); - if (_span) { - _span->set_ending_cid(info.id); - _span->set_async(_done); + if (auto span = _span.lock()) { + span->set_ending_cid(info.id); + span->set_async(_done); // Submit the span if we're in async RPC. For sync RPC, the span // is submitted after Join() to get a more accurate resuming timestamp. if (_done) { @@ -1019,12 +1019,16 @@ void Controller::DoneInBackupThread() { void Controller::SubmitSpan() { const int64_t now = butil::cpuwide_time_us(); - _span->set_start_callback_us(now); - if (_span->local_parent()) { - _span->local_parent()->AsParent(); + if (auto span = _span.lock()) { + span->set_start_callback_us(now); + if (auto parent_span = span->local_parent().lock()) { + if (parent_span->is_active()) { + parent_span->AsParent(); + } + } + Span::Submit(span, now); + _span.reset(); } - Span::Submit(_span, now); - _span = NULL; } void Controller::HandleSendFailed() { @@ -1122,8 +1126,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { CHECK_EQ(_remote_side, tmp_sock->remote_side()); } - Span* span = _span; - if (span) { + if (auto span = _span.lock()) { if (_current_call.nretry == 0) { span->set_remote_side(_remote_side); } else { @@ -1235,7 +1238,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { int rc; size_t packet_size = 0; if (user_packet_guard) { - if (span) { + if (auto span = _span.lock()) { packet_size = user_packet_guard->EstimatedByteSize(); } rc = _current_call.sending_sock->Write(user_packet_guard, &wopt); @@ -1243,7 +1246,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { packet_size = packet.size(); rc = _current_call.sending_sock->Write(&packet, &wopt); } - if (span) { + if (auto span = _span.lock()) { if (_current_call.nretry == 0) { span->set_sent_us(butil::cpuwide_time_us()); span->set_request_size(packet_size); @@ -1387,8 +1390,18 @@ const Controller* Controller::sub(int index) const { return NULL; } -uint64_t Controller::trace_id() const { return _span ? _span->trace_id() : 0; } -uint64_t Controller::span_id() const { return _span ? _span->span_id() : 0; } +uint64_t Controller::trace_id() const { + if (auto span = _span.lock()) { + return span->trace_id(); + } + return 0; +} +uint64_t Controller::span_id() const { + if (auto span = _span.lock()) { + return span->span_id(); + } + return 0; +} void* Controller::session_local_data() { if (_session_local_data) { diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 69d859ea8f..45f71b72f6 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -25,6 +25,7 @@ #include // std::function #include // Users often need gflags #include +#include #include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr #include "bthread/errno.h" // Redefine errno #include "butil/endpoint.h" // butil::EndPoint @@ -803,7 +804,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); private: // NOTE: align and group fields to make Controller as compact as possible. - Span* _span; + std::weak_ptr _span; uint32_t _flags; // all boolean fields inside Controller int32_t _error_code; std::string _error_text; diff --git a/src/brpc/details/controller_private_accessor.h b/src/brpc/details/controller_private_accessor.h index 1a9d7062af..1a2c631208 100644 --- a/src/brpc/details/controller_private_accessor.h +++ b/src/brpc/details/controller_private_accessor.h @@ -90,7 +90,7 @@ class ControllerPrivateAccessor { return *this; } - ControllerPrivateAccessor &set_span(Span* span) { + ControllerPrivateAccessor &set_span(std::shared_ptr span) { _cntl->_span = span; return *this; } @@ -100,7 +100,7 @@ class ControllerPrivateAccessor { return *this; } - Span* span() const { return _cntl->_span; } + std::shared_ptr span() const { return _cntl->_span.lock(); } uint32_t pipelined_count() const { return _cntl->_pipelined_count; } void set_pipelined_count(uint32_t count) { _cntl->_pipelined_count = count; } diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 0196b6d008..0009266958 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -54,6 +54,7 @@ // Span #include "brpc/span.h" #include "bthread/unstable.h" +#include "bthread/bthread.h" // Compress handlers #include "brpc/compress.h" @@ -342,8 +343,11 @@ static void GlobalInitializeOrDieImpl() { SetLogHandler(&BaiduStreamingLogHandler); #endif - // Set bthread create span function - bthread_set_create_span_func(CreateBthreadSpan); + if (bthread_set_span_funcs(CreateBthreadSpanAsVoid, + DestroyRpczParentSpan, + EndBthreadSpan) != 0) { + LOG(FATAL) << "Failed to register span callbacks to bthread"; + } // Setting the variable here does not work, the profiler probably check // the variable before main() for only once. diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 5adf77b2c5..f17e16e820 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -269,9 +269,9 @@ struct BaiduProxyPBMessages : public RpcPBMessages { // Used by UT, can't be static. void SendRpcResponse(int64_t correlation_id, Controller* cntl, RpcPBMessages* messages, const Server* server, - MethodStatus* method_status, int64_t received_us) { + MethodStatus* method_status, int64_t received_us, + std::shared_ptr span) { ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); if (span) { span->set_start_send_us(butil::cpuwide_time_us()); } @@ -642,7 +642,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { bthread_assign_data((void*)&server->thread_local_options()); } - Span* span = NULL; + std::shared_ptr span; if (IsTraceable(request_meta.has_trace_id())) { span = Span::CreateServerSpan( request_meta.trace_id(), request_meta.span_id(), @@ -824,9 +824,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { // `socket' will be held until response has been sent google::protobuf::Closure* done = ::brpc::NewCallback< int64_t, Controller*, RpcPBMessages*, - const Server*, MethodStatus*, int64_t>( + const Server*, MethodStatus*, int64_t, std::shared_ptr>( &SendRpcResponse, meta.correlation_id(),cntl.get(), - messages, server, method_status, msg->received_us()); + messages, server, method_status, msg->received_us(), span); // optional, just release resource ASAP msg.reset(); @@ -855,10 +855,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { // `cntl', `req' and `res' will be deleted inside `SendRpcResponse' // `socket' will be held until response has been sent + SendRpcResponse(meta.correlation_id(), cntl.release(), messages, server, method_status, - msg->received_us()); + msg->received_us(), span); } bool VerifyRpcRequest(const InputMessageBase* msg_base) { @@ -945,8 +946,7 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { } cntl->set_rpc_received_us(msg->received_us()); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.size() + msg->payload.size() + 12); @@ -1116,8 +1116,7 @@ void PackRpcRequest(butil::IOBuf* req_buf, } meta.set_content_type(cntl->request_content_type()); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { request_meta->set_trace_id(span->trace_id()); request_meta->set_span_id(span->span_id()); request_meta->set_parent_span_id(span->parent_span_id()); diff --git a/src/brpc/policy/esp_protocol.cpp b/src/brpc/policy/esp_protocol.cpp index 5925796b88..ee8464b85e 100644 --- a/src/brpc/policy/esp_protocol.cpp +++ b/src/brpc/policy/esp_protocol.cpp @@ -101,8 +101,7 @@ void PackEspRequest(butil::IOBuf* packet_buf, } accessor.get_sending_socket()->set_correlation_id(correlation_id); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_request_size(request.length()); } @@ -131,8 +130,7 @@ void ProcessEspResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->payload.length()); diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 872c2897cc..e4885bab29 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -369,8 +369,7 @@ void ProcessHttpResponse(InputMessageBase* msg) { ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); // TODO: changing when imsg_guard->read_body_progressively() is true @@ -717,8 +716,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, hreq.uri().set_path(path); } - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { hreq.SetHeader("x-bd-trace-id", butil::string_printf( "%llu", (unsigned long long)span->trace_id())); hreq.SetHeader("x-bd-span-id", butil::string_printf( @@ -834,7 +832,7 @@ HttpResponseSender::~HttpResponseSender() { return; } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); + auto span = accessor.span(); if (span) { span->set_start_send_us(butil::cpuwide_time_us()); } @@ -1487,7 +1485,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { bthread_assign_data((void*)&server->thread_local_options()); } - Span* span = NULL; + std::shared_ptr span; const std::string& path = req_header.uri().path(); const std::string* trace_id_str = req_header.GetHeader("x-bd-trace-id"); if (IsTraceable(trace_id_str)) { diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index 02ec8efcad..4978873743 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -227,7 +227,7 @@ static void SendHuluResponse(int64_t correlation_id, MethodStatus* method_status, int64_t received_us) { ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); + auto span = accessor.span(); if (span) { span->set_start_send_us(butil::cpuwide_time_us()); } @@ -411,7 +411,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { bthread_assign_data((void*)&server->thread_local_options()); } - Span* span = NULL; + std::shared_ptr span; if (IsTraceable(meta.has_trace_id())) { span = Span::CreateServerSpan( meta.trace_id(), meta.span_id(), meta.parent_span_id(), @@ -608,8 +608,7 @@ void ProcessHuluResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.size() + msg->payload.size() + 12); @@ -711,7 +710,7 @@ void PackHuluRequest(butil::IOBuf* req_buf, } // else don't set user_mesage_size when there's no attachment, otherwise // existing hulu-pbrpc server may complain about empty attachment. - Span* span = ControllerPrivateAccessor(cntl).span(); + auto span = ControllerPrivateAccessor(cntl).span(); if (span) { meta.set_trace_id(span->trace_id()); meta.set_span_id(span->span_id()); diff --git a/src/brpc/policy/memcache_binary_protocol.cpp b/src/brpc/policy/memcache_binary_protocol.cpp index d4c39dfd33..46432c4f7e 100644 --- a/src/brpc/policy/memcache_binary_protocol.cpp +++ b/src/brpc/policy/memcache_binary_protocol.cpp @@ -164,8 +164,7 @@ void ProcessMemcacheResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.length()); diff --git a/src/brpc/policy/nova_pbrpc_protocol.cpp b/src/brpc/policy/nova_pbrpc_protocol.cpp index 249e35c7a1..a1d88f2562 100644 --- a/src/brpc/policy/nova_pbrpc_protocol.cpp +++ b/src/brpc/policy/nova_pbrpc_protocol.cpp @@ -121,8 +121,7 @@ void ProcessNovaResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.size() + msg->payload.size()); diff --git a/src/brpc/policy/nshead_mcpack_protocol.cpp b/src/brpc/policy/nshead_mcpack_protocol.cpp index 40d38836c6..b8a7fecbee 100644 --- a/src/brpc/policy/nshead_mcpack_protocol.cpp +++ b/src/brpc/policy/nshead_mcpack_protocol.cpp @@ -111,8 +111,7 @@ void ProcessNsheadMcpackResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.size() + msg->payload.size()); diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index a26dc96857..82f696e3c2 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -69,7 +69,7 @@ void NsheadClosure::Run() { std::unique_ptr recycle_ctx(this); ControllerPrivateAccessor accessor(&_controller); - Span* span = accessor.span(); + auto span = accessor.span(); if (span) { span->set_start_send_us(butil::cpuwide_time_us()); } @@ -144,8 +144,7 @@ void NsheadClosure::Run() { void NsheadClosure::SetMethodName(const std::string& full_method_name) { ControllerPrivateAccessor accessor(&_controller); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->ResetServerSpanName(full_method_name); } } @@ -298,7 +297,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) { bthread_assign_data((void*)&server->thread_local_options()); } - Span* span = NULL; + std::shared_ptr span; if (IsTraceable(false)) { span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us()); accessor.set_span(span); @@ -369,8 +368,7 @@ void ProcessNsheadResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->payload.length()); @@ -439,8 +437,7 @@ void PackNsheadRequest( // pack the field. accessor.get_sending_socket()->set_correlation_id(correlation_id); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_request_size(request.length()); // TODO: Nowhere to set tracing ids. // request_meta->set_trace_id(span->trace_id()); diff --git a/src/brpc/policy/public_pbrpc_protocol.cpp b/src/brpc/policy/public_pbrpc_protocol.cpp index 38a749dc72..a4298a15da 100644 --- a/src/brpc/policy/public_pbrpc_protocol.cpp +++ b/src/brpc/policy/public_pbrpc_protocol.cpp @@ -174,8 +174,7 @@ void ProcessPublicPbrpcResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.size() + msg->payload.size()); @@ -269,7 +268,7 @@ void PackPublicPbrpcRequest(butil::IOBuf* buf, nshead.body_len = GetProtobufByteSize(pbreq); buf->append(&nshead, sizeof(nshead)); - Span* span = ControllerPrivateAccessor(controller).span(); + auto span = ControllerPrivateAccessor(controller).span(); if (span) { // TODO: Nowhere to set tracing ids. // request_meta->set_trace_id(span->trace_id()); diff --git a/src/brpc/policy/redis_protocol.cpp b/src/brpc/policy/redis_protocol.cpp index f8acf49d6a..790db6f300 100644 --- a/src/brpc/policy/redis_protocol.cpp +++ b/src/brpc/policy/redis_protocol.cpp @@ -237,8 +237,7 @@ void ProcessRedisResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->response.ByteSize()); diff --git a/src/brpc/policy/rtmp_protocol.cpp b/src/brpc/policy/rtmp_protocol.cpp index 8b251eb2de..d706468650 100644 --- a/src/brpc/policy/rtmp_protocol.cpp +++ b/src/brpc/policy/rtmp_protocol.cpp @@ -3540,8 +3540,7 @@ void OnServerStreamCreated::Run(bool error, break; } } while (0); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(base_realtime); span->set_received_us(received_us); span->set_response_size(istream->popped_bytes()); diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index 9ee772dcff..3eaee4b0e6 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -212,7 +212,7 @@ static void SendSofaResponse(int64_t correlation_id, MethodStatus* method_status, int64_t received_us) { ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); + auto span = accessor.span(); if (span) { span->set_start_send_us(butil::cpuwide_time_us()); } @@ -371,7 +371,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { bthread_assign_data((void*)&server->thread_local_options()); } - Span* span = NULL; + std::shared_ptr span; if (IsTraceable(false)) { span = Span::CreateServerSpan( 0/*meta.trace_id()*/, 0/*meta.span_id()*/, @@ -514,8 +514,7 @@ void ProcessSofaResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.size() + msg->payload.size() + 24); diff --git a/src/brpc/policy/thrift_protocol.cpp b/src/brpc/policy/thrift_protocol.cpp index 1e25066d9f..2b5739ea3e 100755 --- a/src/brpc/policy/thrift_protocol.cpp +++ b/src/brpc/policy/thrift_protocol.cpp @@ -243,7 +243,7 @@ void ThriftClosure::DoRun() { const Server* server = _controller.server(); ControllerPrivateAccessor accessor(&_controller); - Span* span = accessor.span(); + auto span = accessor.span(); if (span) { span->set_start_send_us(butil::cpuwide_time_us()); } @@ -515,7 +515,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { bthread_assign_data((void*)&server->thread_local_options()); } - Span* span = NULL; + std::shared_ptr span; if (IsTraceable(false)) { span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us()); accessor.set_span(span); @@ -584,8 +584,7 @@ void ProcessThriftResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->payload.length()); @@ -752,8 +751,7 @@ void PackThriftRequest( // pack the field. accessor.get_sending_socket()->set_correlation_id(correlation_id); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_request_size(request.length()); // TODO: Nowhere to set tracing ids. // request_meta->set_trace_id(span->trace_id()); diff --git a/src/brpc/policy/ubrpc2pb_protocol.cpp b/src/brpc/policy/ubrpc2pb_protocol.cpp index 312ec5d92a..e68fb08b45 100644 --- a/src/brpc/policy/ubrpc2pb_protocol.cpp +++ b/src/brpc/policy/ubrpc2pb_protocol.cpp @@ -453,8 +453,7 @@ void ProcessUbrpcResponse(InputMessageBase* msg_base) { } ControllerPrivateAccessor accessor(cntl); - Span* span = accessor.span(); - if (span) { + if (auto span = accessor.span()) { span->set_base_real_us(msg->base_real_us()); span->set_received_us(msg->received_us()); span->set_response_size(msg->meta.size() + msg->payload.size()); diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp index 8e9af46cc6..350b006bde 100644 --- a/src/brpc/span.cpp +++ b/src/brpc/span.cpp @@ -37,9 +37,97 @@ #define BRPC_SPAN_INFO_SEP "\1" - namespace brpc { +// Callback for creating a new bthread span when creating a new bthread. +// This is called by bthread layer when BTHREAD_INHERIT_SPAN flag is set. +// Returns a heap-allocated weak_ptr* as void*, or NULL if span creation fails. +void* CreateBthreadSpanAsVoid() { + const int64_t received_us = butil::cpuwide_time_us(); + const int64_t base_realtime = butil::gettimeofday_us() - received_us; + std::shared_ptr span = Span::CreateBthreadSpan("Bthread", base_realtime); + + if (!span) { + return NULL; + } + return new std::weak_ptr(span); +} + +void DestroyRpczParentSpan(void* ptr) { + if (ptr) { + delete static_cast*>(ptr); + } +} + +void EndBthreadSpan() { + std::shared_ptr span = GetTlsParentSpan(); + if (span) { + bthread_id_t id = {bthread_self()}; + span->set_ending_cid(id); + } + + ClearTlsParentSpan(); +} + +void SetTlsParentSpan(std::shared_ptr span) { + using namespace bthread; + LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls); + if (ls.rpcz_parent_span) { + delete static_cast*>(ls.rpcz_parent_span); + } + ls.rpcz_parent_span = new std::weak_ptr(span); + BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls); +} + +std::shared_ptr GetTlsParentSpan() { + using namespace bthread; + LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls); + if (!ls.rpcz_parent_span) { + return nullptr; + } + + auto* weak_ptr = static_cast*>(ls.rpcz_parent_span); + return weak_ptr->lock(); +} + +void ClearTlsParentSpan() { + using namespace bthread; + LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls); + if (ls.rpcz_parent_span) { + delete static_cast*>(ls.rpcz_parent_span); + ls.rpcz_parent_span = nullptr; + BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls); + } +} + +bool HasTlsParentSpan() { + using namespace bthread; + LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls); + if (!ls.rpcz_parent_span) { + return false; + } + + auto* weak_ptr = static_cast*>(ls.rpcz_parent_span); + return !weak_ptr->expired(); +} + + +void SpanDeleter::operator()(Span* r) const { + if (r == NULL) { + return; + } + + // All children will be destroyed automatically along with the list. + // The list holds std::shared_ptr<> which will trigger deletion of + // children. + r->_client_list.clear(); + r->_info.clear(); + // Destroy the spinlocks, as the destructor might not be invoked. + pthread_spin_destroy(&r->_client_list_spinlock); + pthread_spin_destroy(&r->_info_spinlock); + butil::return_object(r); +} + const int64_t SPAN_DELETE_INTERVAL_US = 10000000L/*10s*/; DEFINE_string(rpcz_database_dir, "./rpc_data/rpcz", @@ -104,12 +192,24 @@ inline uint64_t GenerateTraceId() { return (g->current_random & 0xFFFFFFFFFFFF0000ULL) | g->seq++; } -Span* Span::CreateClientSpan(const std::string& full_method_name, - int64_t base_real_us) { - Span* span = butil::get_object(Forbidden()); - if (__builtin_expect(span == NULL, 0)) { - return NULL; +Span::Span(Forbidden) { + CHECK_EQ(0, pthread_spin_init(&_info_spinlock, 0)) + << "Failed to initialize _info_spinlock"; + CHECK_EQ(0, pthread_spin_init(&_client_list_spinlock, 0)) + << "Failed to initialize _client_list_spinlock"; +} + +Span::~Span() { + // The destruction of the spinlock has been handled in SpanDeleter. +} + +std::shared_ptr Span::CreateClientSpan(const std::string& full_method_name, + int64_t base_real_us) { + Span* span_raw = butil::get_object(Forbidden()); + if (__builtin_expect(span_raw == NULL, 0)) { + return nullptr; } + std::shared_ptr span(span_raw, SpanDeleter()); span->_log_id = 0; span->_base_cid = INVALID_BTHREAD_ID; span->_ending_cid = INVALID_BTHREAD_ID; @@ -125,37 +225,33 @@ Span* Span::CreateClientSpan(const std::string& full_method_name, span->_start_callback_real_us = 0; span->_start_send_real_us = 0; span->_sent_real_us = 0; - span->_next_client = NULL; - span->_client_list = NULL; - span->_tls_next = NULL; span->_full_method_name = full_method_name; span->_info.clear(); - Span* parent = static_cast(bthread::tls_bls.rpcz_parent_span); + std::shared_ptr parent = Span::tls_parent(); if (parent) { span->_trace_id = parent->trace_id(); span->_parent_span_id = parent->span_id(); span->_local_parent = parent; - span->_next_client = parent->_client_list; - parent->_client_list = span; + { + BAIDU_SCOPED_LOCK(parent->_client_list_spinlock); + parent->_client_list.push_back(span); + } } else { span->_trace_id = GenerateTraceId(); span->_parent_span_id = 0; - span->_local_parent = NULL; } span->_span_id = GenerateSpanId(); return span; } -Span* Span::CreateBthreadSpan(const std::string& full_method_name, - int64_t base_real_us) { - Span* parent = static_cast(bthread::tls_bls.rpcz_parent_span); - if (parent == NULL) { - return NULL; - } - Span* span = butil::get_object(Forbidden()); - if (__builtin_expect(span == NULL, 0)) { - return NULL; +std::shared_ptr Span::CreateBthreadSpan(const std::string& full_method_name, + int64_t base_real_us) { + std::shared_ptr parent = Span::tls_parent(); + Span* span_raw = butil::get_object(Forbidden()); + if (__builtin_expect(span_raw == NULL, 0)) { + return nullptr; } + std::shared_ptr span(span_raw, SpanDeleter()); span->_log_id = 0; span->_base_cid = INVALID_BTHREAD_ID; span->_ending_cid = INVALID_BTHREAD_ID; @@ -171,17 +267,21 @@ Span* Span::CreateBthreadSpan(const std::string& full_method_name, span->_start_callback_real_us = 0; span->_start_send_real_us = 0; span->_sent_real_us = 0; - span->_next_client = NULL; - span->_client_list = NULL; - span->_tls_next = NULL; span->_full_method_name = full_method_name; span->_info.clear(); - span->_trace_id = parent->trace_id(); - span->_parent_span_id = parent->span_id(); - span->_local_parent = parent; - span->_next_client = parent->_client_list; - parent->_client_list = span; + if (parent) { + span->_trace_id = parent->trace_id(); + span->_parent_span_id = parent->span_id(); + span->_local_parent = parent; + { + BAIDU_SCOPED_LOCK(parent->_client_list_spinlock); + parent->_client_list.push_back(span); + } + } else { + span->_trace_id = GenerateTraceId(); + span->_parent_span_id = 0; + } span->_span_id = GenerateSpanId(); return span; @@ -193,14 +293,15 @@ inline const std::string& unknown_span_name() { return s_unknown_method_name; } -Span* Span::CreateServerSpan( +std::shared_ptr Span::CreateServerSpan( const std::string& full_method_name, uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id, int64_t base_real_us) { - Span* span = butil::get_object(Forbidden()); - if (__builtin_expect(span == NULL, 0)) { - return NULL; + Span* span_raw = butil::get_object(Forbidden()); + if (__builtin_expect(span_raw == NULL, 0)) { + return nullptr; } + std::shared_ptr span(span_raw, SpanDeleter()); span->_trace_id = (trace_id ? trace_id : GenerateTraceId()); span->_span_id = (span_id ? span_id : GenerateSpanId()); span->_parent_span_id = parent_span_id; @@ -219,17 +320,13 @@ Span* Span::CreateServerSpan( span->_start_callback_real_us = 0; span->_start_send_real_us = 0; span->_sent_real_us = 0; - span->_next_client = NULL; - span->_client_list = NULL; - span->_tls_next = NULL; span->_full_method_name = (!full_method_name.empty() ? full_method_name : unknown_span_name()); span->_info.clear(); - span->_local_parent = NULL; return span; } -Span* Span::CreateServerSpan( +std::shared_ptr Span::CreateServerSpan( uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id, int64_t base_real_us) { return CreateServerSpan(unknown_span_name(), trace_id, span_id, @@ -241,26 +338,22 @@ void Span::ResetServerSpanName(const std::string& full_method_name) { full_method_name : unknown_span_name()); } -void Span::destroy() { +void Span::submit(int64_t cpuwide_us) { + // Note: this method is not called for client-side spans. EndAsParent(); - traversal(this, [](Span* r) { - r->_info.clear(); - butil::return_object(r); - }); -} - -void Span::traversal(Span* r, const std::function& f) const { - if (r == NULL) { - return; - } - for (auto p = r->_client_list; p != NULL; p = p->_next_client) { - traversal(p, f); + SpanContainer* container = new(std::nothrow) SpanContainer(shared_from_this()); + // If memory allocation fails, the server span will not be submitted for persistence. + // The server span will be destroyed later when its shared_ptr refcount drops to zero + // Child spans (held in _client_list) will also be destroyed when + // their refcounts reach zero. + if (container) { + container->submit(cpuwide_us); } - f(r); } void Span::Annotate(const char* fmt, ...) { const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us; + BAIDU_SCOPED_LOCK(_info_spinlock); butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ", (long long)anno_time); va_list ap; @@ -271,6 +364,7 @@ void Span::Annotate(const char* fmt, ...) { void Span::Annotate(const char* fmt, va_list args) { const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us; + BAIDU_SCOPED_LOCK(_info_spinlock); butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ", (long long)anno_time); butil::string_vappendf(&_info, fmt, args); @@ -278,6 +372,7 @@ void Span::Annotate(const char* fmt, va_list args) { void Span::Annotate(const std::string& info) { const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us; + BAIDU_SCOPED_LOCK(_info_spinlock); butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ", (long long)anno_time); _info.append(info); @@ -285,6 +380,7 @@ void Span::Annotate(const std::string& info) { void Span::AnnotateCStr(const char* info, size_t length) { const int64_t anno_time = butil::cpuwide_time_us() + _base_real_us; + BAIDU_SCOPED_LOCK(_info_spinlock); butil::string_appendf(&_info, BRPC_SPAN_INFO_SEP "%lld ", (long long)anno_time); if (length <= 0) { @@ -295,9 +391,14 @@ void Span::AnnotateCStr(const char* info, size_t length) { } size_t Span::CountClientSpans() const { - size_t n = 0; - traversal(const_cast(this), [&](Span*) { ++n; }); - return n - 1; + size_t n = 1; + { + BAIDU_SCOPED_LOCK(_client_list_spinlock); + for (const auto& child : _client_list) { + n += child->CountClientSpans(); + } + } + return n; } int64_t Span::GetStartRealTimeUs() const { @@ -345,15 +446,26 @@ bool SpanInfoExtractor::PopAnnotation( } bool CanAnnotateSpan() { - return bthread::tls_bls.rpcz_parent_span; + return HasTlsParentSpan(); } void AnnotateSpan(const char* fmt, ...) { - Span* span = static_cast(bthread::tls_bls.rpcz_parent_span); - va_list ap; - va_start(ap, fmt); - span->Annotate(fmt, ap); - va_end(ap); + std::shared_ptr span = GetTlsParentSpan(); + if (span) { // TRACEPRINTF checks CanAnnotateSpan, but this is safer. + va_list ap; + va_start(ap, fmt); + span->Annotate(fmt, ap); + va_end(ap); + } +} + +void AnnotateSpanEx(std::shared_ptr span, const char* fmt, ...) { + if (span) { + va_list ap; + va_start(ap, fmt); + span->Annotate(fmt, ap); + va_end(ap); + } } class SpanDB : public SharedObject { @@ -365,7 +477,7 @@ class SpanDB : public SharedObject { SpanDB() : id_db(NULL), time_db(NULL) { } static SpanDB* Open(); - leveldb::Status Index(const Span* span, std::string* value_buf); + leveldb::Status Index(std::shared_ptr span, std::string* value_buf); leveldb::Status RemoveSpansBefore(int64_t tm); private: @@ -405,10 +517,14 @@ static bvar::DisplaySamplingRatio s_display_sampling_ratio( "rpcz_sampling_ratio", &g_span_sl); struct SpanEarlier { - bool operator()(bvar::Collected* c1, bvar::Collected* c2) const { - const Span* span1 = static_cast(c1); - const Span* span2 = static_cast(c2); - return span1->GetStartRealTimeUs() < span2->GetStartRealTimeUs(); + bool operator()(const bvar::Collected* c1, const bvar::Collected* c2) const { + const SpanContainer* container1 = static_cast(c1); + const SpanContainer* container2 = static_cast(c2); + + const int64_t time1 = container1->span()->GetStartRealTimeUs(); + const int64_t time2 = container2->span()->GetStartRealTimeUs(); + + return time1 < time2; } }; class SpanPreprocessor : public bvar::CollectorPreprocessor { @@ -471,8 +587,13 @@ inline int GetSpanDB(butil::intrusive_ptr* db) { return -1; } -void Span::Submit(Span* span, int64_t cpuwide_time_us) { - if (span->local_parent() == NULL) { +void Span::Submit(std::shared_ptr span, int64_t cpuwide_time_us) { + // Only submit spans without a local parent (i.e., server spans). + // Server spans hold shared_ptr references to their child spans (via _client_list), + // ensuring child spans remain alive until the server span is submitted and dumped. + // Client spans are not submitted here because their lifetime is managed by their + // parent server span. + if (span->local_parent().expired()) { span->submit(cpuwide_time_us); } } @@ -497,6 +618,7 @@ static void Span2Proto(const Span* span, RpczSpan* out) { out->set_start_send_real_us(span->start_send_real_us()); out->set_sent_real_us(span->sent_real_us()); out->set_full_method_name(span->full_method_name()); + // info() returns by value for thread safety (see span.h for details). out->set_info(span->info()); out->set_error_code(span->error_code()); } @@ -571,7 +693,7 @@ SpanDB* SpanDB::Open() { return db; } -leveldb::Status SpanDB::Index(const Span* span, std::string* value_buf) { +leveldb::Status SpanDB::Index(std::shared_ptr span, std::string* value_buf) { leveldb::WriteOptions options; options.sync = false; @@ -637,20 +759,46 @@ leveldb::Status SpanDB::Index(const Span* span, std::string* value_buf) { ToBigEndian(span->span_id(), key_data + 2); leveldb::Slice key((char*)key_data, sizeof(key_data)); RpczSpan value_proto; - Span2Proto(span, &value_proto); - // client spans should be reversed. - size_t client_span_count = span->CountClientSpans(); - for (size_t i = 0; i < client_span_count; ++i) { - value_proto.add_client_spans(); - } - size_t i = 0; - span->traversal(const_cast(span), [&](Span* p) { - if (span == p) { - return; + Span2Proto(span.get(), &value_proto); + + std::vector> all_child_spans; + + std::function)> collect_all_spans = + [&](std::shared_ptr current_span) { + if (!current_span) { + return; + } + + std::vector> children; + { + BAIDU_SCOPED_LOCK(current_span->_client_list_spinlock); + children.reserve(current_span->_client_list.size()); + for (const auto& child_span : current_span->_client_list) { + if (child_span) { + children.push_back(child_span); + } + } + } + + for (const auto& child : children) { + collect_all_spans(child); + } + + all_child_spans.push_back(current_span); + }; + + collect_all_spans(span); + + // Traverse in reverse order and insert child elements. + // Only collect ended spans to avoid race conditions - active spans may still + // be modified by other threads, which could lead to inconsistent data when + // serializing to database. + for (auto it = all_child_spans.rbegin(); it != all_child_spans.rend(); ++it) { + if (*it && it->get() != span.get() && !(*it)->is_active()) { + RpczSpan* child_proto = value_proto.add_client_spans(); + Span2Proto((*it).get(), child_proto); } - Span2Proto(p, value_proto.mutable_client_spans(client_span_count - i - 1)); - ++i; - }); + } if (!value_proto.SerializeToString(value_buf)) { return leveldb::Status::InvalidArgument( leveldb::Slice("Fail to serialize RpczSpan")); @@ -691,7 +839,7 @@ leveldb::Status SpanDB::RemoveSpansBefore(int64_t tm) { break; } } else { - LOG(ERROR) << "Fail to parse from value"; + LOG(ERROR) << "Fail to parse value"; } rc = time_db->Delete(options, it->key()); if (!rc.ok()) { @@ -704,7 +852,7 @@ leveldb::Status SpanDB::RemoveSpansBefore(int64_t tm) { } // Write span into leveldb. -void Span::dump_and_destroy(size_t /*round*/) { +void Span::dump_to_db() { StartIndexingIfNeeded(); std::string value_buf; @@ -712,21 +860,18 @@ void Span::dump_and_destroy(size_t /*round*/) { butil::intrusive_ptr db; if (GetSpanDB(&db) != 0) { if (g_span_ending) { - destroy(); return; } SpanDB* db2 = SpanDB::Open(); if (db2 == NULL) { LOG(WARNING) << "Fail to open SpanDB"; - destroy(); return; } ResetSpanDB(db2); db.reset(db2); } - leveldb::Status st = db->Index(this, &value_buf); - destroy(); + leveldb::Status st = db->Index(shared_from_this(), &value_buf); if (!st.ok()) { LOG(WARNING) << st.ToString(); if (st.IsNotFound() || st.IsIOError() || st.IsCorruption()) { @@ -751,6 +896,42 @@ void Span::dump_and_destroy(size_t /*round*/) { } } +// ========== SpanContainer ============ + +// Destroy the span container without persisting to database. +// This is called in abnormal scenarios: +// 1. When the pending sample queue is full (to prevent memory explosion) +// 2. When grab_thread hasn't run for too long (system overload) +// In these cases, we discard the span quickly without expensive I/O. +void SpanContainer::destroy() { + delete this; +} + +// The round parameter is required by bvar::Collected interface but unused here. +// Other implementations (e.g., SampledRequest in rpc_dump.cpp) use it to detect +// new batches and trigger per-round operations like reloading gflags or switching +// output files. SpanContainer doesn't need batch-level operations since it writes +// directly to leveldb without buffering or configuration reloading. +void SpanContainer::dump_and_destroy(size_t round) { + if (_span) { + _span->dump_to_db(); + } + destroy(); +} + +void SpanContainer::submit(int64_t cpuwide_us) { + bvar::Collected::submit(cpuwide_us); +} + +bvar::CollectorSpeedLimit* SpanContainer::speed_limit() { + if (_span) { + return _span->speed_limit(); + } + return NULL; +} + +// ===================================== + int FindSpan(uint64_t trace_id, uint64_t span_id, RpczSpan* response) { butil::intrusive_ptr db; if (GetSpanDB(&db) != 0) { diff --git a/src/brpc/span.h b/src/brpc/span.h index 75d8e7fc05..e785f3a5e5 100644 --- a/src/brpc/span.h +++ b/src/brpc/span.h @@ -23,8 +23,11 @@ #include #include +#include #include #include +#include +#include #include "butil/macros.h" #include "butil/endpoint.h" #include "butil/string_splitter.h" @@ -37,28 +40,48 @@ namespace bthread { extern __thread bthread::LocalStorage tls_bls; } - namespace brpc { +class Span; + +void SetTlsParentSpan(std::shared_ptr span); +std::shared_ptr GetTlsParentSpan(); +void ClearTlsParentSpan(); +bool HasTlsParentSpan(); + +void* CreateBthreadSpanAsVoid(); +void DestroyRpczParentSpan(void* ptr); +void EndBthreadSpan(); + DECLARE_bool(enable_rpcz); +class Span; +class SpanContainer; + +// Deleter for Span. +struct SpanDeleter { + void operator()(Span* r) const; +}; + // Collect information required by /rpcz and tracing system whose idea is // described in http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf -class Span : public bvar::Collected { +class Span : public std::enable_shared_from_this { friend class SpanDB; - struct Forbidden {}; +friend struct SpanDeleter; +friend class SpanContainer; public: + struct Forbidden {}; // Call CreateServerSpan/CreateClientSpan instead. - Span(Forbidden) {} - ~Span() {} + Span(Forbidden); + ~Span(); // Create a span to track a request inside server. - static Span* CreateServerSpan( + static std::shared_ptr CreateServerSpan( const std::string& full_method_name, uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id, int64_t base_real_us); // Create a span without name to track a request inside server. - static Span* CreateServerSpan( + static std::shared_ptr CreateServerSpan( uint64_t trace_id, uint64_t span_id, uint64_t parent_span_id, int64_t base_real_us); @@ -66,18 +89,24 @@ friend class SpanDB; void ResetServerSpanName(const std::string& name); // Create a span to track a request inside channel. - static Span* CreateClientSpan(const std::string& full_method_name, - int64_t base_real_us); + static std::shared_ptr CreateClientSpan(const std::string& full_method_name, + int64_t base_real_us); // Create a span to track start bthread - static Span* CreateBthreadSpan(const std::string& full_method_name, - int64_t base_real_us); - - static void Submit(Span* span, int64_t cpuwide_time_us); - - // Set tls parent. + static std::shared_ptr CreateBthreadSpan(const std::string& full_method_name, + int64_t base_real_us); + + static void Submit(std::shared_ptr span, int64_t cpuwide_time_us); + + // Set this span as the TLS parent for subsequent child span creation. + // Typical flow: + // 1. Server span calls AsParent() before user callback to enable tracing + // 2. Client spans created in user code automatically link to this parent + // 3. When client RPC completes, it restores its own parent via AsParent() + // to maintain the trace chain (see Controller::SubmitSpan) + // 4. Server span calls EndAsParent() when submitting to clear TLS parent void AsParent() { - bthread::tls_bls.rpcz_parent_span = this; + SetTlsParentSpan(shared_from_this()); } // Add log with time. @@ -115,9 +144,15 @@ friend class SpanDB; void set_sent_us(int64_t tm) { _sent_real_us = tm + _base_real_us; } - Span* local_parent() const { return _local_parent; } - static Span* tls_parent() { - return static_cast(bthread::tls_bls.rpcz_parent_span); + bool is_active() const { return _ending_cid == INVALID_BTHREAD_ID; } + + std::weak_ptr local_parent() const { return _local_parent; } + static std::shared_ptr tls_parent() { + auto parent = GetTlsParentSpan(); + if (parent && parent->is_active()) { + return parent; + } + return nullptr; } uint64_t trace_id() const { return _trace_id; } @@ -139,20 +174,38 @@ friend class SpanDB; int64_t sent_real_us() const { return _sent_real_us; } bool async() const { return _async; } const std::string& full_method_name() const { return _full_method_name; } - const std::string& info() const { return _info; } + + // Returns a copy instead of a reference for thread safety. + // + // Current usage: Only called by Span2Proto() which immediately passes the result + // to protobuf's set_info(). In this specific scenario, returning a reference would + // also be safe because set_info() copies the string before the reference could be + // invalidated by concurrent Annotate() calls. + // + // However, returning by value is more robust: it prevents potential data races if + // future code holds the reference longer, and has no performance penalty due to + // C++11 move semantics (the temporary is moved, not copied, into protobuf). + std::string info() const { + BAIDU_SCOPED_LOCK(_info_spinlock); + return _info; + } private: DISALLOW_COPY_AND_ASSIGN(Span); - void dump_and_destroy(size_t round_index); - void destroy(); - void traversal(Span*, const std::function&) const; + void dump_to_db(); + void submit(int64_t cpuwide_us); bvar::CollectorSpeedLimit* speed_limit(); bvar::CollectorPreprocessor* preprocessor(); + // Clear this span from TLS parent if it's currently set as the parent. + // Called when server span is being submitted to prevent subsequent spans + // from incorrectly linking to an ended span. Only clears if the current + // TLS parent is this span (avoids clearing if another span has taken over). void EndAsParent() { - if (this == static_cast(bthread::tls_bls.rpcz_parent_span)) { - bthread::tls_bls.rpcz_parent_span = NULL; + std::shared_ptr current_parent = GetTlsParentSpan(); + if (current_parent.get() == this) { + ClearTlsParentSpan(); } } @@ -181,11 +234,38 @@ friend class SpanDB; // time2_us \s annotation2 // ... std::string _info; + // Protects _info from concurrent modifications. + // Multiple threads may call Annotate() simultaneously (e.g., retry logic, + // network layer, user code via TRACEPRINTF), causing data corruption in + // string concatenation without synchronization. + mutable pthread_spinlock_t _info_spinlock; + + std::weak_ptr _local_parent; + std::list> _client_list; + // Protects _client_list from concurrent modifications. + // In some scenarios, multiple bthreads may simultaneously create child spans + // (e.g.,raft leader parallel RPCs to followers) and push_back to parent's _client_list. + // Also protects against concurrent iteration (e.g., CountClientSpans, SpanDB::Index) + // while the list is being modified. + mutable pthread_spinlock_t _client_list_spinlock; +}; + +class SpanContainer : public bvar::Collected { +public: + explicit SpanContainer(std::shared_ptr span) : _span(span) {} + ~SpanContainer() {} + + // Implementations of bvar::Collected + void dump_and_destroy(size_t round_index) override; + void destroy() override; + bvar::CollectorSpeedLimit* speed_limit() override; - Span* _local_parent; - Span* _next_client; - Span* _client_list; - Span* _tls_next; + void submit(int64_t cpuwide_us); + + const std::shared_ptr& span() const { return _span; } + +private: + std::shared_ptr _span; }; // Extract name and annotations from Span::info() @@ -198,11 +278,14 @@ class SpanInfoExtractor { butil::StringSplitter _sp; }; -// These two functions can be used for composing TRACEPRINT as well as hiding -// span implementations. -bool CanAnnotateSpan(); +// These two functions can be used for composing TRACEPRINT// Add an annotation to the current span. +// If current bthread is not tracing, this function does nothing. void AnnotateSpan(const char* fmt, ...); +// Add an annotation to the given span. +// If the span is NULL, this function does nothing. +void AnnotateSpanEx(std::shared_ptr span, const char* fmt, ...); + class SpanFilter { public: @@ -240,12 +323,6 @@ inline bool IsTraceable(bool is_upstream_traced) { (FLAGS_enable_rpcz && bvar::is_collectable(&g_span_sl)); } -inline void* CreateBthreadSpan() { - const int64_t received_us = butil::cpuwide_time_us(); - const int64_t base_realtime = butil::gettimeofday_us() - received_us; - return Span::CreateBthreadSpan("Bthread", base_realtime); -} - } // namespace brpc diff --git a/src/brpc/traceprintf.h b/src/brpc/traceprintf.h index 513daf2b00..47a8dcf33a 100644 --- a/src/brpc/traceprintf.h +++ b/src/brpc/traceprintf.h @@ -19,6 +19,7 @@ #ifndef BRPC_TRACEPRINTF_H #define BRPC_TRACEPRINTF_H +#include #include "butil/macros.h" // To brpc developers: This is a header included by user, don't depend @@ -27,9 +28,15 @@ namespace brpc { +// Forward declaration +class Span; + bool CanAnnotateSpan(); void AnnotateSpan(const char* fmt, ...); +// Declarations for AnnotateSpanEx used by TRACEPRINTF_SPAN macro +void AnnotateSpanEx(std::shared_ptr span, const char* fmt, ...); + } // namespace brpc @@ -43,4 +50,14 @@ void AnnotateSpan(const char* fmt, ...); } \ } while (0) + +// Use this macro to print log to a specific span. +// If span_ptr is NULL, arguments to this macro is NOT evaluated. +#define TRACEPRINTF_SPAN(span_ptr, fmt, args...) \ + do { \ + if ((span_ptr)) { \ + ::brpc::AnnotateSpanEx((span_ptr), "[" __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__) "] " fmt, ##args); \ + } \ + } while (0) + #endif // BRPC_TRACEPRINTF_H diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 085d814df6..920d369ca8 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -90,7 +90,6 @@ extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group); extern void (*g_worker_startfn)(); extern void (*g_tagged_worker_startfn)(bthread_tag_t); -extern void* (*g_create_span_func)(); inline TaskControl* get_task_control() { return g_task_control; @@ -597,14 +596,6 @@ int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) { return 0; } -int bthread_set_create_span_func(void* (*func)()) { - if (func == NULL) { - return EINVAL; - } - bthread::g_create_span_func = func; - return 0; -} - void bthread_stop_world() { bthread::TaskControl* c = bthread::get_task_control(); if (c != NULL) { diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index 7e42c96c9f..4ea0afcc52 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -428,6 +428,34 @@ extern int bthread_once(bthread_once_t* once_control, void (*init_routine)()); */ extern uint64_t bthread_cpu_clock_ns(void); +// Span callback function types for tracing bthread lifecycle. +// These callbacks are typically set by upper-layer frameworks (e.g., brpc) +// to integrate distributed tracing with bthread execution. +typedef void* (*bthread_create_span_fn)(void); +typedef void (*bthread_destroy_span_fn)(void*); +typedef void (*bthread_end_span_fn)(void); + +// Set span-related callbacks for bthread tracing. +// This should be called during framework initialization (e.g., in GlobalInitializeOrDie). +// +// Parameters: +// create_fn - Called when creating a bthread with BTHREAD_INHERIT_SPAN flag. +// Should return a heap-allocated span context (e.g., weak_ptr*). +// Returns NULL if span creation is disabled or fails. +// destroy_fn - Called to destroy the span context when bthread exits or cleans up. +// Receives the pointer returned by create_fn. +// end_fn - Called when bthread ends to finalize the span (e.g., set end time). +// +// All three callbacks must be provided together, or all NULL to disable span tracking. +// This function should only be called once during initialization. +// +// Returns: +// 0 on success +// -1 if parameters are invalid (sets errno to EINVAL) +extern int bthread_set_span_funcs(bthread_create_span_fn create_fn, + bthread_destroy_span_fn destroy_fn, + bthread_end_span_fn end_fn); + __END_DECLS #endif // BTHREAD_BTHREAD_H diff --git a/src/bthread/key.cpp b/src/bthread/key.cpp index 00215d7f3b..85f2913555 100644 --- a/src/bthread/key.cpp +++ b/src/bthread/key.cpp @@ -22,6 +22,7 @@ #include #include +#include "bthread/bthread.h" // bthread_create_span_fn and related types #include "bthread/errno.h" // EAGAIN #include "bthread/task_group.h" // TaskGroup #include "butil/atomicops.h" @@ -34,6 +35,12 @@ namespace bthread { +void* (*g_create_bthread_span)() = NULL; + +void (*g_rpcz_parent_span_dtor)(void*) = NULL; + +void (*g_end_bthread_span)() = NULL; + DEFINE_uint32(key_table_list_size, 4000, "The maximum length of the KeyTableList. Once this value is " "exceeded, a portion of the KeyTables will be moved to the " @@ -245,6 +252,11 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTableList { if (g) { g->current_task()->local_storage.keytable = old_kt; } + + if (tls_bls.rpcz_parent_span && g_rpcz_parent_span_dtor) { + g_rpcz_parent_span_dtor(tls_bls.rpcz_parent_span); + tls_bls.rpcz_parent_span = NULL; + } } void append(KeyTable* keytable) { @@ -405,6 +417,11 @@ static void cleanup_pthread(void* arg) { delete kt; // After deletion: tls may be set during deletion. tls_bls.keytable = NULL; + + if (tls_bls.rpcz_parent_span && g_rpcz_parent_span_dtor) { + g_rpcz_parent_span_dtor(tls_bls.rpcz_parent_span); + tls_bls.rpcz_parent_span = NULL; + } } } @@ -483,6 +500,11 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) { if (g) { g->current_task()->local_storage.keytable = old_kt; } + + if (bthread::tls_bls.rpcz_parent_span && bthread::g_rpcz_parent_span_dtor) { + bthread::g_rpcz_parent_span_dtor(bthread::tls_bls.rpcz_parent_span); + bthread::tls_bls.rpcz_parent_span = NULL; + } // TODO: return_keytable may race with this function, we don't destroy // the mutex right now. // pthread_mutex_destroy(&pool->mutex); @@ -672,4 +694,19 @@ void* bthread_get_assigned_data() { return bthread::tls_bls.assigned_data; } +int bthread_set_span_funcs(bthread_create_span_fn create_fn, + bthread_destroy_span_fn destroy_fn, + bthread_end_span_fn end_fn) { + if ((create_fn && destroy_fn && end_fn) || + (!create_fn && !destroy_fn && !end_fn)) { + bthread::g_create_bthread_span = create_fn; + bthread::g_rpcz_parent_span_dtor = destroy_fn; + bthread::g_end_bthread_span = end_fn; + return 0; + } + + errno = EINVAL; + return -1; +} + } // extern "C" diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 40daaa1ace..dc413fc923 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -78,15 +78,6 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL); const TaskStatistics EMPTY_STAT = { 0, 0, 0 }; -void* (*g_create_span_func)() = NULL; - -void* run_create_span_func() { - if (g_create_span_func) { - return g_create_span_func(); - } - return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span; -} - AtomicInteger128::Value AtomicInteger128::load() const { #if __x86_64__ || __ARM_NEON // Supress compiler warning. @@ -391,6 +382,12 @@ void TaskGroup::task_runner(intptr_t skip_remained) { thread_return = e.value(); } + if (m->attr.flags & BTHREAD_INHERIT_SPAN) { + if (g_end_bthread_span) { + g_end_bthread_span(); + } + } + // TODO: Save thread_return (void)thread_return; @@ -493,7 +490,11 @@ int TaskGroup::start_foreground(TaskGroup** pg, m->attr = using_attr; m->local_storage = LOCAL_STORAGE_INIT; if (using_attr.flags & BTHREAD_INHERIT_SPAN) { - m->local_storage.rpcz_parent_span = run_create_span_func(); + if (g_create_bthread_span) { + m->local_storage.rpcz_parent_span = g_create_bthread_span(); + } else { + m->local_storage.rpcz_parent_span = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span; + } } m->cpuwide_start_ns = start_ns; m->stat = EMPTY_STAT; @@ -558,7 +559,11 @@ int TaskGroup::start_background(bthread_t* __restrict th, m->attr = using_attr; m->local_storage = LOCAL_STORAGE_INIT; if (using_attr.flags & BTHREAD_INHERIT_SPAN) { - m->local_storage.rpcz_parent_span = run_create_span_func(); + if (g_create_bthread_span) { + m->local_storage.rpcz_parent_span = g_create_bthread_span(); + } else { + m->local_storage.rpcz_parent_span = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span; + } } m->cpuwide_start_ns = start_ns; m->stat = EMPTY_STAT; diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h index 1b77c0b601..a2490b4553 100644 --- a/src/bthread/task_meta.h +++ b/src/bthread/task_meta.h @@ -28,6 +28,7 @@ #include "bthread/types.h" // bthread_attr_t #include "bthread/stack.h" // ContextualStack #include "bthread/timer_thread.h" +#include "butil/thread_local.h" namespace bthread { @@ -43,13 +44,15 @@ struct ButexWaiter; struct LocalStorage { KeyTable* keytable; void* assigned_data; - void* rpcz_parent_span; + void* rpcz_parent_span; // Points to std::weak_ptr* (managed by brpc) }; #define BTHREAD_LOCAL_STORAGE_INITIALIZER { NULL, NULL, NULL } const static LocalStorage LOCAL_STORAGE_INIT = BTHREAD_LOCAL_STORAGE_INITIALIZER; +EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(LocalStorage, tls_bls); + enum TaskStatus { TASK_STATUS_UNKNOWN, TASK_STATUS_CREATED, @@ -149,6 +152,24 @@ struct TaskMeta { } }; +// Global callback for creating a new bthread span when creating a new bthread. +// This is set by brpc layer. When a bthread is created with BTHREAD_INHERIT_SPAN, +// this callback is invoked to create a new span for the bthread. +// The returned void* points to a heap-allocated weak_ptr* managed by brpc layer. +// Returns NULL if span creation is disabled or fails. +extern void* (*g_create_bthread_span)(); + +// Global destructor callback for rpcz_parent_span. +// This is set by brpc layer to clean up the heap-allocated weak_ptr. +// bthread layer doesn't know the concrete type, it just calls this function +// with the void* pointer when cleaning up LocalStorage. +extern void (*g_rpcz_parent_span_dtor)(void*); + +// Global callback invoked when a bthread ends (used by higher layers to +// observe and react to bthread end events, e.g., to finish spans). This +// pointer is set by the upper layer during initialization. +extern void (*g_end_bthread_span)(); + } // namespace bthread #endif // BTHREAD_TASK_META_H diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h index 4580202f87..186d9ce65b 100644 --- a/src/bthread/unstable.h +++ b/src/bthread/unstable.h @@ -92,9 +92,6 @@ extern int bthread_set_worker_startfn(void (*start_fn)()); // Add a startup function with tag extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)); -// Add a create span function -extern int bthread_set_create_span_func(void* (*func)()); - // Stop all bthread and worker pthreads. // You should avoid calling this function which may cause bthread after main() // suspend indefinitely. diff --git a/src/bvar/collector.cpp b/src/bvar/collector.cpp index 34713a4a00..a01f45fdbc 100644 --- a/src/bvar/collector.cpp +++ b/src/bvar/collector.cpp @@ -410,6 +410,11 @@ void Collector::dump_thread() { } } +// Submit a sample for asynchronous dumping. The Collector holds only the Collected* +// pointer (e.g., SpanContainer*). Regardless of which branch is taken below, the +// sample will eventually be destroyed via either dump_and_destroy() or destroy(), +// both of which call 'delete this' to release the container and decrement the +// reference count of any managed resources (e.g., shared_ptr). void Collected::submit(int64_t cpuwide_us) { Collector* d = butil::get_leaky_singleton(); // Destroy the sample in-place if the grab_thread did not run for twice diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 66d1fbad9b..84a40fcbf1 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -47,13 +47,15 @@ namespace brpc { DECLARE_int32(idle_timeout_second); DECLARE_int32(max_connection_pool_size); class Server; +class Span; class MethodStatus; namespace policy { void SendRpcResponse(int64_t correlation_id, Controller* cntl, RpcPBMessages* messages, const Server* server_raw, - MethodStatus *, int64_t); + MethodStatus *, int64_t, + std::shared_ptr span); } // policy } // brpc @@ -279,9 +281,10 @@ class ChannelTest : public ::testing::Test{ int64_t, brpc::Controller*, brpc::RpcPBMessages*, const brpc::Server*, - brpc::MethodStatus*, int64_t>(&brpc::policy::SendRpcResponse, - meta.correlation_id(), cntl, - messages, &ts->_dummy, NULL, -1); + brpc::MethodStatus*, int64_t, std::shared_ptr>( + &brpc::policy::SendRpcResponse, + meta.correlation_id(), cntl, + messages, &ts->_dummy, NULL, -1, nullptr); ts->_svc.CallMethod(method, cntl, req, res, done); } diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp index dcb8d87323..bd31a3c430 100644 --- a/test/bthread_unittest.cpp +++ b/test/bthread_unittest.cpp @@ -17,6 +17,7 @@ #include #include +#include #include "butil/time.h" #include "butil/macros.h" #include "butil/logging.h" @@ -566,6 +567,13 @@ void* create_span_func() { return (void*)targets[idx]; } +void destroy_span_func(void* span) { + LOG(INFO) << "Destroy span " << (uint64_t)span; +} + +void end_span_func() { +} + TEST_F(BthreadTest, test_span) { uint64_t p1 = 0; uint64_t p2 = 0; @@ -587,7 +595,7 @@ TEST_F(BthreadTest, test_span) { LOG(INFO) << "Test bthread create span"; - bthread_set_create_span_func(create_span_func); + ASSERT_EQ(0, bthread_set_span_funcs(create_span_func, destroy_span_func, end_span_func)); bthread_t multi_th1; bthread_t multi_th2; @@ -602,6 +610,8 @@ TEST_F(BthreadTest, test_span) { ASSERT_NE(multi_p1, multi_p2); ASSERT_NE(std::find(targets, targets + 4, multi_p1), targets + 4); ASSERT_NE(std::find(targets, targets + 4, multi_p2), targets + 4); + + ASSERT_EQ(0, bthread_set_span_funcs(NULL, NULL, NULL)); } void* dummy_thread(void*) { From c5a8dee8d0acdd545b7cb064a66ef5926f679829 Mon Sep 17 00:00:00 2001 From: lhh Date: Tue, 11 Nov 2025 13:52:00 +0800 Subject: [PATCH 2/2] Add BRPC_SPAN_ENABLE_SHARED_PTR_API flag for backward-compatible Span lifecycle management (#3068) --- src/brpc/channel.cpp | 4 ++ src/brpc/controller.cpp | 27 ++++++++++++++ .../details/controller_private_accessor.h | 24 +++++++++--- src/brpc/policy/baidu_rpc_protocol.cpp | 4 ++ src/brpc/policy/http_rpc_protocol.cpp | 4 ++ src/brpc/policy/hulu_pbrpc_protocol.cpp | 4 ++ src/brpc/policy/nshead_protocol.cpp | 4 ++ src/brpc/policy/sofa_pbrpc_protocol.cpp | 4 ++ src/brpc/policy/thrift_protocol.cpp | 4 ++ src/brpc/span.h | 37 ++++++++++++++++++- 10 files changed, 109 insertions(+), 7 deletions(-) diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 6872ce117f..29d5f48311 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -510,7 +510,11 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, span->set_base_cid(correlation_id); span->set_protocol(_options.protocol); span->set_start_send_us(start_send_us); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API accessor.set_span(span); +#else + accessor.set_span(span.get()); +#endif } } // Override some options if they haven't been set by Controller diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index d44f3e4c8f..fe06b9414e 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1727,4 +1727,31 @@ void Controller::DoPrintLogPrefix(std::ostream& os) const { } } + +#if BRPC_SPAN_ENABLE_SHARED_PTR_API +ControllerPrivateAccessor& ControllerPrivateAccessor::set_span( + std::shared_ptr span) { + _cntl->_span = span; + return *this; +} +#else +ControllerPrivateAccessor& ControllerPrivateAccessor::set_span(Span* span) { + if (span) { + _cntl->_span = span->shared_from_this(); + } else { + _cntl->_span.reset(); + } + return *this; +} +#endif + +SpanPtr ControllerPrivateAccessor::span() const { + std::shared_ptr span_internal = _cntl->_span.lock(); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API + return span_internal; +#else + return span_internal.get(); +#endif +} + } // namespace brpc diff --git a/src/brpc/details/controller_private_accessor.h b/src/brpc/details/controller_private_accessor.h index 1a2c631208..65e05bb2f3 100644 --- a/src/brpc/details/controller_private_accessor.h +++ b/src/brpc/details/controller_private_accessor.h @@ -30,9 +30,20 @@ class Message; } } - namespace brpc { +class Span; + +#ifndef BRPC_SPAN_ENABLE_SHARED_PTR_API +#define BRPC_SPAN_ENABLE_SHARED_PTR_API 0 +#endif + +#if BRPC_SPAN_ENABLE_SHARED_PTR_API +using SpanPtr = std::shared_ptr; +#else +using SpanPtr = Span*; +#endif + class AuthContext; // A wrapper to access some private methods/fields of `Controller' @@ -90,17 +101,18 @@ class ControllerPrivateAccessor { return *this; } - ControllerPrivateAccessor &set_span(std::shared_ptr span) { - _cntl->_span = span; - return *this; - } +#if BRPC_SPAN_ENABLE_SHARED_PTR_API + ControllerPrivateAccessor &set_span(std::shared_ptr span); +#else + ControllerPrivateAccessor &set_span(Span* span); +#endif ControllerPrivateAccessor &set_request_protocol(ProtocolType protocol) { _cntl->_request_protocol = protocol; return *this; } - std::shared_ptr span() const { return _cntl->_span.lock(); } + SpanPtr span() const; uint32_t pipelined_count() const { return _cntl->_pipelined_count; } void set_pipelined_count(uint32_t count) { _cntl->_pipelined_count = count; } diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index f17e16e820..f2cfdbeaeb 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -647,7 +647,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { span = Span::CreateServerSpan( request_meta.trace_id(), request_meta.span_id(), request_meta.parent_span_id(), msg->base_real_us()); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API accessor.set_span(span); +#else + accessor.set_span(span.get()); +#endif span->set_log_id(request_meta.log_id()); span->set_remote_side(cntl->remote_side()); span->set_protocol(PROTOCOL_BAIDU_STD); diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index e4885bab29..8dd3697a74 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -1506,7 +1506,11 @@ void ProcessHttpRequest(InputMessageBase *msg) { } span = Span::CreateServerSpan( path, trace_id, span_id, parent_span_id, msg->base_real_us()); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API accessor.set_span(span); +#else + accessor.set_span(span.get()); +#endif span->set_log_id(cntl->log_id()); span->set_remote_side(user_addr); span->set_received_us(msg->received_us()); diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index 4978873743..a789dee472 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -416,7 +416,11 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { span = Span::CreateServerSpan( meta.trace_id(), meta.span_id(), meta.parent_span_id(), msg->base_real_us()); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API accessor.set_span(span); +#else + accessor.set_span(span.get()); +#endif span->set_log_id(meta.log_id()); span->set_remote_side(cntl->remote_side()); span->set_protocol(PROTOCOL_HULU_PBRPC); diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index 82f696e3c2..2fcff1a3b8 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -300,7 +300,11 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) { std::shared_ptr span; if (IsTraceable(false)) { span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us()); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API accessor.set_span(span); +#else + accessor.set_span(span.get()); +#endif span->set_log_id(req_head->log_id); span->set_remote_side(cntl->remote_side()); span->set_protocol(PROTOCOL_NSHEAD); diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index 3eaee4b0e6..352c33b435 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -376,7 +376,11 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { span = Span::CreateServerSpan( 0/*meta.trace_id()*/, 0/*meta.span_id()*/, 0/*meta.parent_span_id()*/, msg->base_real_us()); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API accessor.set_span(span); +#else + accessor.set_span(span.get()); +#endif span->set_remote_side(cntl->remote_side()); span->set_protocol(PROTOCOL_SOFA_PBRPC); span->set_received_us(msg->received_us()); diff --git a/src/brpc/policy/thrift_protocol.cpp b/src/brpc/policy/thrift_protocol.cpp index 2b5739ea3e..2e4d636937 100755 --- a/src/brpc/policy/thrift_protocol.cpp +++ b/src/brpc/policy/thrift_protocol.cpp @@ -518,7 +518,11 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { std::shared_ptr span; if (IsTraceable(false)) { span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us()); +#if BRPC_SPAN_ENABLE_SHARED_PTR_API accessor.set_span(span); +#else + accessor.set_span(span.get()); +#endif span->set_log_id(seq_id); span->set_remote_side(cntl->remote_side()); span->set_protocol(PROTOCOL_THRIFT); diff --git a/src/brpc/span.h b/src/brpc/span.h index e785f3a5e5..20cf5e58d9 100644 --- a/src/brpc/span.h +++ b/src/brpc/span.h @@ -42,8 +42,43 @@ extern __thread bthread::LocalStorage tls_bls; namespace brpc { +// ============================================================================ +// Span Lifecycle Management API Compatibility Layer +// ============================================================================ +// +// COMPATIBILITY NOTE: +// brpc uses std::shared_ptr internally +// to prevent use-after-free bugs in async RPC callbacks. +// +// For backward compatibility with existing protocol extensions, external APIs +// can return raw pointers (Span*) by default. To enable the modern shared_ptr +// API, compile with -DBRPC_SPAN_ENABLE_SHARED_PTR_API=1. +// +// MIGRATION GUIDE: +// - Legacy mode (default): SpanPtr = Span* +// Users must ensure the Span outlives their usage (typically by keeping +// the Controller alive). +// +// - Modern mode (recommended): SpanPtr = std::shared_ptr +// Automatic lifetime management, safer for async operations. +// +// ============================================================================ + +#ifndef BRPC_SPAN_ENABLE_SHARED_PTR_API +#define BRPC_SPAN_ENABLE_SHARED_PTR_API 0 // Default: legacy mode for compatibility +#endif + class Span; +#if BRPC_SPAN_ENABLE_SHARED_PTR_API +// Modern API: Return shared_ptr for safe lifecycle management +using SpanPtr = std::shared_ptr; +#else +// Legacy API: Return raw pointer for backward compatibility +// WARNING: Users must ensure the Span outlives their usage +using SpanPtr = Span*; +#endif + void SetTlsParentSpan(std::shared_ptr span); std::shared_ptr GetTlsParentSpan(); void ClearTlsParentSpan(); @@ -252,7 +287,7 @@ friend class SpanContainer; class SpanContainer : public bvar::Collected { public: - explicit SpanContainer(std::shared_ptr span) : _span(span) {} + explicit SpanContainer(const std::shared_ptr& span) : _span(span) {} ~SpanContainer() {} // Implementations of bvar::Collected