Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "bthread/countdown_event.h"

DEFINE_string(d, "", "POST this data to the http server");
DEFINE_bool(progressive, false, "whether or not progressive read data from server");
DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
Expand All @@ -36,6 +39,25 @@ namespace brpc {
DECLARE_bool(http_verbose);
}

class PartDataReader: public brpc::ProgressiveReader {
public:
explicit PartDataReader(bthread::CountdownEvent* done): _done(done){}

butil::Status OnReadOnePart(const void* data, size_t length) {
memcpy(_buffer, data, length);
LOG(INFO) << "data : " << _buffer << " size : " << length;
return butil::Status::OK();
}

void OnEndOfMessage(const butil::Status& status) {
_done->signal();
LOG(INFO) << "progressive read data final status : " << status;
}
private:
char _buffer[1024];
bthread::CountdownEvent* _done;
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
Expand Down Expand Up @@ -71,13 +93,25 @@ int main(int argc, char* argv[]) {
cntl.request_attachment().append(FLAGS_d);
}

if (FLAGS_progressive) {
cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms);
cntl.response_will_be_read_progressively();
}

// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return -1;
}

if (FLAGS_progressive) {
bthread::CountdownEvent done(1);
cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done));
done.wait();
LOG(INFO) << "wait client progressive read done safely";
}
// If -http_verbose is on, brpc already prints the response to stderr.
if (!brpc::FLAGS_http_verbose) {
std::cout << cntl.response_attachment() << std::endl;
Expand Down
7 changes: 7 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL");
DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL");
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");
DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout");

namespace example {

Expand Down Expand Up @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService {

// sleep a while to send another part.
bthread_usleep(10000);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down Expand Up @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService {

// sleep a while to send another part.
bthread_usleep(10000 * 10);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down
92 changes: 91 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_bool(graceful_quit_on_sighup, false,
"Register SIGHUP handle func to quit graceful");

DEFINE_bool(log_idle_progressive_read_close, false,
"Print log when an idle progressive read is closed");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
const IdlNames idl_multi_req_single_res = { "", "res" };
Expand Down Expand Up @@ -174,6 +175,80 @@ class IgnoreAllRead : public ProgressiveReader {
void OnEndOfMessage(const butil::Status&) {}
};

class ProgressiveTimeoutReader : public ProgressiveReader {
public:
explicit ProgressiveTimeoutReader(SocketId id, int32_t read_timeout_ms, ProgressiveReader* reader):
_socket_id(id),
_read_timeout_ms(read_timeout_ms),
_reader(reader),
_timeout_id(0),
_is_read_timeout(false) {
AddIdleReadTimeoutMonitor();
}

~ProgressiveTimeoutReader() {
if(_timeout_id > 0) {
bthread_timer_del(_timeout_id);
}
}

butil::Status OnReadOnePart(const void* data, size_t length) {
return _reader->OnReadOnePart(data, length);
}

void OnEndOfMessage(const butil::Status& status) {
if (_is_read_timeout) {
_reader->OnEndOfMessage(butil::Status(EPROGREADTIMEOUT, "The progressive read timeout"));
} else {
_reader->OnEndOfMessage(status);
}
if(_timeout_id > 0) {
bthread_timer_del(_timeout_id);
_timeout_id = 0;
}
}

private:
static void HandleIdleProgressiveReader(void* arg) {
if(arg == nullptr){
LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null.";
return;
}
ProgressiveTimeoutReader* reader = static_cast<ProgressiveTimeoutReader*>(arg);
SocketUniquePtr s;
if (Socket::Address(reader->_socket_id, &s) != 0) {
LOG(ERROR) << "not found the socket id : " << reader->_socket_id;
return;
}
auto log_idle = FLAGS_log_idle_progressive_read_close;
reader->_is_read_timeout = true;
LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->_socket_id
<< " progressive read timeout us : " << reader->_read_timeout_ms;
if (s->parsing_context() != NULL) {
s->parsing_context()->Destroy();
}
s->ReleaseReferenceIfIdle(0);
}
void AddIdleReadTimeoutMonitor() {
if (_read_timeout_ms <= 0) {
return;
}
bthread_timer_add(&_timeout_id,
butil::milliseconds_from_now(_read_timeout_ms),
HandleIdleProgressiveReader,
this
);
}

private:
SocketId _socket_id;
int32_t _read_timeout_ms;
ProgressiveReader* _reader;
// Timer registered to trigger progressive timeout event
bthread_timer_t _timeout_id;
butil::atomic<bool> _is_read_timeout;
};

static IgnoreAllRead* s_ignore_all_read = NULL;
static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT;
static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; }
Expand Down Expand Up @@ -260,6 +335,7 @@ void Controller::ResetPods() {
_backup_request_ms = UNSET_MAGIC_NUM;
_backup_request_policy = NULL;
_connect_timeout_ms = UNSET_MAGIC_NUM;
_progressive_read_timeout_ms = UNSET_MAGIC_NUM;
_real_timeout_ms = UNSET_MAGIC_NUM;
_deadline_us = -1;
_timeout_id = 0;
Expand Down Expand Up @@ -331,6 +407,15 @@ void Controller::Call::Reset() {
stream_user_data = NULL;
}

void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){
if(progressive_read_timeout_ms <= 0x7fffffff){
_progressive_read_timeout_ms = progressive_read_timeout_ms;
} else {
_progressive_read_timeout_ms = 0x7fffffff;
LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff";
}
}

void Controller::set_timeout_ms(int64_t timeout_ms) {
if (timeout_ms <= 0x7fffffff) {
_timeout_ms = timeout_ms;
Expand Down Expand Up @@ -1028,6 +1113,7 @@ void Controller::SubmitSpan() {
_span = NULL;
}


void Controller::HandleSendFailed() {
if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
Expand Down Expand Up @@ -1543,6 +1629,10 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) {
__FUNCTION__));
}
add_flag(FLAGS_PROGRESSIVE_READER);
if (progressive_read_timeout_ms() > 0) {
auto reader = new ProgressiveTimeoutReader(_rpa->GetSocketId(), _progressive_read_timeout_ms, r);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that _current_call.peer_id can be used instead of GetSocketId.

return _rpa->ReadProgressiveAttachmentBy(reader);
}
return _rpa->ReadProgressiveAttachmentBy(r);
}

Expand Down
10 changes: 7 additions & 3 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
#include "brpc/rpc_dump.h"

// EAUTH is defined in MAC
#ifndef EAUTH
#define EAUTH ERPCAUTH
Expand Down Expand Up @@ -163,7 +162,6 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
uint64_t log_id;
std::string request_id;
};

public:
Controller();
Controller(const Inheritable& parent_ctx);
Expand All @@ -177,6 +175,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Set/get timeout in milliseconds for the RPC call. Use
// ChannelOptions.timeout_ms on unset.
void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms);
int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; }

void set_timeout_ms(int64_t timeout_ms);
int64_t timeout_ms() const { return _timeout_ms; }

Expand Down Expand Up @@ -323,7 +324,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
void response_will_be_read_progressively() {
add_flag(FLAGS_READ_PROGRESSIVELY);
}
// Make the RPC end when the HTTP request has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
Expand Down Expand Up @@ -837,6 +840,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
int32_t _progressive_read_timeout_ms;
// Priority: `_backup_request_policy' > `_backup_request_ms'.
BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real timeout for current call
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/errno.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ enum Errno {
ESSL = 1016; // SSL related error
EH2RUNOUTSTREAMS = 1017; // The H2 socket was run out of streams
EREJECT = 1018; // The Request is rejected

EPROGREADTIMEOUT = 1019; // The Progressive read timeout

// Errno caused by server
EINTERNAL = 2001; // Internal Server Error
ERESPONSE = 2002; // Bad Response
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
LOG(FATAL) << "Fail to new HttpContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
http_imsg->SetSocketId(socket->id());
// Parsing http is costly, parsing an incomplete http message from the
// beginning repeatedly should be avoided, otherwise the cost may reach
// O(n^2) in the worst case. Save incomplete http messages in sockets
Expand Down
12 changes: 11 additions & 1 deletion src/brpc/policy/http_rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,20 @@ class HttpContext : public ReadableProgressiveAttachment
, public InputMessageBase
, public HttpMessage {
public:
SocketId GetSocketId() override {
return _socket_id;
}

void SetSocketId(SocketId id) {
_socket_id = id;
}

explicit HttpContext(bool read_body_progressively,
HttpMethod request_method = HTTP_METHOD_GET)
: InputMessageBase()
, HttpMessage(read_body_progressively, request_method)
, _is_stage2(false) {
, _is_stage2(false)
, _socket_id(0) {
// add one ref for Destroy
butil::intrusive_ptr<HttpContext>(this).detach();
}
Expand Down Expand Up @@ -122,6 +131,7 @@ class HttpContext : public ReadableProgressiveAttachment

private:
bool _is_stage2;
SocketId _socket_id;
};

// Implement functions required in protocol.h
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/progressive_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define BRPC_PROGRESSIVE_READER_H

#include "brpc/shared_object.h"
#include "brpc/socket.h"


namespace brpc {
Expand Down Expand Up @@ -84,6 +85,7 @@ class ReadableProgressiveAttachment : public SharedObject {
// Any error occurred should destroy the reader by calling r->Destroy().
// r->Destroy() should be guaranteed to be called once and only once.
virtual void ReadProgressiveAttachmentBy(ProgressiveReader* r) = 0;
virtual SocketId GetSocketId() = 0;
};

} // namespace brpc
Expand Down
Loading