| diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp |
| index b6c8e750..c52e9451 100644 |
| --- a/src/brpc/controller.cpp |
| +++ b/src/brpc/controller.cpp |
| @@ -1169,6 +1169,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { |
| wopt.pipelined_count = _pipelined_count; |
| wopt.auth_flags = _auth_flags; |
| wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED); |
| + wopt.write_in_background = write_to_socket_in_background(); |
| int rc; |
| size_t packet_size = 0; |
| if (user_packet_guard) { |
| diff --git a/src/brpc/controller.h b/src/brpc/controller.h |
| index 658cc695..9221f583 100644 |
| --- a/src/brpc/controller.h |
| +++ b/src/brpc/controller.h |
| @@ -144,6 +144,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); |
| static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19); |
| static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); |
| static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); |
| + static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); |
| |
| public: |
| struct Inheritable { |
| @@ -350,6 +351,17 @@ public: |
| bool is_done_allowed_to_run_in_place() const |
| { return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); } |
| |
| + // Create a background KEEPWRITE bthread to write to socket when issuing |
| + // RPCs, instead of trying to write to socket once in calling thread (see |
| + // `Socket::StartWrite` in socket.cpp). |
| + // The socket write could take some time (several microseconds maybe), if |
| + // you cares about it and don't want the calling thread to be blocked, you |
| + // can set this flag. |
| + // Should provides better batch effect in situations like when you are |
| + // continually issuing lots of async RPC calls in only one thread. |
| + void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); } |
| + bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); } |
| + |
| // ------------------------------------------------------------------------ |
| // Server-side methods. |
| // These calls shall be made from the server side only. Their results are |
| diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp |
| index e3878c19..27748434 100644 |
| --- a/src/brpc/socket.cpp |
| +++ b/src/brpc/socket.cpp |
| @@ -1620,7 +1620,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { |
| // in some protocols(namely RTMP). |
| req->Setup(this); |
| |
| - if (ssl_state() != SSL_OFF) { |
| + if (opt.write_in_background || ssl_state() != SSL_OFF) { |
| // Writing into SSL may block the current bthread, always write |
| // in the background. |
| goto KEEPWRITE_IN_BACKGROUND; |
| diff --git a/src/brpc/socket.h b/src/brpc/socket.h |
| index 6f710ee2..28a7ada6 100644 |
| --- a/src/brpc/socket.h |
| +++ b/src/brpc/socket.h |
| @@ -269,10 +269,20 @@ public: |
| // Default: false |
| bool ignore_eovercrowded; |
| |
| + // The calling thread directly creates KeepWrite thread to write into |
| + // this socket, skipping writing once. |
| + // In situations like when you are continually issuing lots of |
| + // StreamWrite or async RPC calls in only one thread, directly creating |
| + // KeepWrite thread at first provides batch write effect and better |
| + // performance. Otherwise, each write only writes one `msg` into socket |
| + // and no KeepWrite thread can be created, which brings poor |
| + // performance. |
| + bool write_in_background; |
| + |
| WriteOptions() |
| : id_wait(INVALID_BTHREAD_ID), abstime(NULL) |
| , pipelined_count(0), auth_flags(0) |
| - , ignore_eovercrowded(false) {} |
| + , ignore_eovercrowded(false), write_in_background(false) {} |
| }; |
| int Write(butil::IOBuf *msg, const WriteOptions* options = NULL); |
| |
| diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp |
| index d8466d2a..2d565759 100644 |
| --- a/src/brpc/stream.cpp |
| +++ b/src/brpc/stream.cpp |
| @@ -271,7 +271,8 @@ void Stream::TriggerOnConnectIfNeed() { |
| bthread_mutex_unlock(&_connect_mutex); |
| } |
| |
| -int Stream::AppendIfNotFull(const butil::IOBuf &data) { |
| +int Stream::AppendIfNotFull(const butil::IOBuf &data, |
| + const StreamWriteOptions* options) { |
| if (_cur_buf_size > 0) { |
| std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex); |
| if (_produced >= _remote_consumed + _cur_buf_size) { |
| @@ -290,7 +291,9 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) { |
| |
| size_t data_length = data.length(); |
| butil::IOBuf copied_data(data); |
| - const int rc = _fake_socket_weak_ref->Write(&copied_data); |
| + Socket::WriteOptions wopt; |
| + wopt.write_in_background = options != NULL && options->write_in_background; |
| + const int rc = _fake_socket_weak_ref->Write(&copied_data, &wopt); |
| if (rc != 0) { |
| // Stream may be closed by peer before |
| LOG(WARNING) << "Fail to write to _fake_socket, " << berror(); |
| @@ -679,13 +682,14 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { |
| policy::ProcessRpcResponse(msg); |
| } |
| |
| -int StreamWrite(StreamId stream_id, const butil::IOBuf &message) { |
| +int StreamWrite(StreamId stream_id, const butil::IOBuf &message, |
| + const StreamWriteOptions* options) { |
| SocketUniquePtr ptr; |
| if (Socket::Address(stream_id, &ptr) != 0) { |
| return EINVAL; |
| } |
| Stream* s = (Stream*)ptr->conn(); |
| - const int rc = s->AppendIfNotFull(message); |
| + const int rc = s->AppendIfNotFull(message, options); |
| if (rc == 0) { |
| return 0; |
| } |
| diff --git a/src/brpc/stream.h b/src/brpc/stream.h |
| index fbf2d51d..410a5a09 100644 |
| --- a/src/brpc/stream.h |
| +++ b/src/brpc/stream.h |
| @@ -82,6 +82,18 @@ struct StreamOptions { |
| StreamInputHandler* handler; |
| }; |
| |
| +struct StreamWriteOptions |
| +{ |
| + StreamWriteOptions() : write_in_background(false) {} |
| + |
| + // Write message to socket in background thread. |
| + // Provides batch write effect and better performance in situations when |
| + // you are continually issuing lots of StreamWrite or async RPC calls in |
| + // only one thread. Otherwise, each StreamWrite directly writes message into |
| + // socket and brings poor performance. |
| + bool write_in_background; |
| +}; |
| + |
| // [Called at the client side] |
| // Create a stream at client-side along with the |cntl|, which will be connected |
| // when receiving the response with a stream from server-side. If |options| is |
| @@ -104,7 +116,8 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, |
| // - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size |
| // which the remote side hasn't consumed yet excceeds the number. |
| // - EINVAL: |stream_id| is invalied or has been closed |
| -int StreamWrite(StreamId stream_id, const butil::IOBuf &message); |
| +int StreamWrite(StreamId stream_id, const butil::IOBuf &message, |
| + const StreamWriteOptions* options = NULL); |
| |
| // Write util the pending buffer size is less than |max_buf_size| or orrur |
| // occurs |
| diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h |
| index 259f0b77..f24b75a3 100644 |
| --- a/src/brpc/stream_impl.h |
| +++ b/src/brpc/stream_impl.h |
| @@ -42,7 +42,8 @@ public: |
| |
| // --------------------- SocketConnection -------------- |
| |
| - int AppendIfNotFull(const butil::IOBuf& msg); |
| + int AppendIfNotFull(const butil::IOBuf& msg, |
| + const StreamWriteOptions* options = NULL); |
| static int Create(const StreamOptions& options, |
| const StreamSettings *remote_settings, |
| StreamId *id); |