diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index b19fbb379b..a01c5cc0b7 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -25,6 +25,7 @@ #include "butil/iobuf.h" // butil::IOBuf #include "butil/raw_pack.h" // RawPacker RawUnpacker #include "brpc/controller.h" // Controller +#include "brpc/errno.pb.h" #include "brpc/socket.h" // Socket #include "brpc/server.h" // Server #include "brpc/span.h" @@ -216,7 +217,15 @@ void SendRpcResponse(int64_t correlation_id, if (Socket::Address(response_stream_id, &stream_ptr) == 0) { Stream* s = (Stream*)stream_ptr->conn(); s->FillSettings(meta.mutable_stream_settings()); - s->SetHostSocket(sock); + // If failed to set host socket here, + // s->SetConnected will fail at CHECK(_host_socket != NULL) + if (s->SetHostSocket(sock) != 0) { + LOG(WARNING) << "Failed to set host socket " << *sock; + cntl->SetFailed(EFAILEDSOCKET, "Fail to set host socket %s", + sock->description().c_str()); + ((Stream *)stream_ptr->conn())->Close(); + return; + } } else { LOG(WARNING) << "Stream=" << response_stream_id << " was closed before sending response"; @@ -247,6 +256,14 @@ void SendRpcResponse(int64_t correlation_id, // Send rpc response over stream even if server side failed to create // stream for some reason. if(cntl->has_remote_stream()){ + // If we don't set connected here before send the response, + // client-side may close the stream before server-side set connected. + // This will cause missing on_closed message on the client-side. + if (stream_ptr) { + // Now it's ok the mark this server-side stream as connectted as all the + // written user data would follower the RPC response. + ((Stream*)stream_ptr->conn())->SetConnected(); + } // Send the response over stream to notify that this stream connection // is successfully built. // Response_stream can be INVALID_STREAM_ID when error occurs. @@ -262,12 +279,6 @@ void SendRpcResponse(int64_t correlation_id, } return; } - - if(stream_ptr) { - // Now it's ok the mark this server-side stream as connected as all the - // written user data would follower the RPC response. - ((Stream*)stream_ptr->conn())->SetConnected(); - } } else{ // Have the risk of unlimited pending responses, in which case, tell // users to set max_concurrency.