blob: e3878c198eadc3b1fcd6e8dc9961a1f6915b4a64 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "butil/compat.h" // OS_MACOSX
#include <openssl/ssl.h>
#include <openssl/err.h>
#ifdef USE_MESALINK
#include <mesalink/openssl/ssl.h>
#include <mesalink/openssl/err.h>
#include <mesalink/openssl/x509.h>
#endif
#include <netinet/tcp.h> // getsockopt
#include <gflags/gflags.h>
#include "bthread/unstable.h" // bthread_timer_del
#include "butil/fd_utility.h" // make_non_blocking
#include "butil/fd_guard.h" // fd_guard
#include "butil/time.h" // cpuwide_time_us
#include "butil/object_pool.h" // get_object
#include "butil/logging.h" // CHECK
#include "butil/macros.h"
#include "butil/class_name.h" // butil::class_name
#include "brpc/log.h"
#include "brpc/reloadable_flags.h" // BRPC_VALIDATE_GFLAG
#include "brpc/errno.pb.h"
#include "brpc/event_dispatcher.h" // RemoveConsumer
#include "brpc/socket.h"
#include "brpc/describable.h" // Describable
#include "brpc/circuit_breaker.h" // CircuitBreaker
#include "brpc/input_messenger.h"
#include "brpc/details/sparse_minute_counter.h"
#include "brpc/stream_impl.h"
#include "brpc/shared_object.h"
#include "brpc/policy/rtmp_protocol.h" // FIXME
#include "brpc/periodic_task.h"
#include "brpc/details/health_check.h"
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/rdma/rdma_helper.h"
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
namespace bthread {
size_t __attribute__((weak))
get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n);
}
namespace brpc {
// NOTE: This flag was true by default before r31206. Connected to somewhere
// is not an important event now, we can check the connection in /connections
// if we're in doubt.
DEFINE_bool(log_connected, false, "Print log when a connection is established");
BRPC_VALIDATE_GFLAG(log_connected, PassValidate);
DEFINE_bool(log_idle_connection_close, false,
"Print log when an idle connection is closed");
BRPC_VALIDATE_GFLAG(log_idle_connection_close, PassValidate);
DEFINE_int32(socket_recv_buffer_size, -1,
"Set the recv buffer size of socket if this value is positive");
// Default value of SNDBUF is 2500000 on most machines.
DEFINE_int32(socket_send_buffer_size, -1,
"Set send buffer size of sockets if this value is positive");
DEFINE_int32(ssl_bio_buffer_size, 16*1024, "Set buffer size for SSL read/write");
DEFINE_int64(socket_max_unwritten_bytes, 64 * 1024 * 1024,
"Max unwritten bytes in each socket, if the limit is reached,"
" Socket.Write fails with EOVERCROWDED");
DEFINE_int64(socket_max_streams_unconsumed_bytes, 0,
"Max stream receivers' unconsumed bytes in one socket,"
" it used in stream for receiver buffer control.");
DEFINE_int32(max_connection_pool_size, 100,
"Max number of pooled connections to a single endpoint");
BRPC_VALIDATE_GFLAG(max_connection_pool_size, PassValidate);
DEFINE_int32(connect_timeout_as_unreachable, 3,
"If the socket failed to connect due to ETIMEDOUT for so many "
"times *continuously*, the error is changed to ENETUNREACH which "
"fails the main socket as well when this socket is pooled.");
DECLARE_int32(health_check_timeout_ms);
static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
return v >= 2 && v < 1000/*large enough*/;
}
BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable,
validate_connect_timeout_as_unreachable);
const int WAIT_EPOLLOUT_TIMEOUT_MS = 50;
class BAIDU_CACHELINE_ALIGNMENT SocketPool {
friend class Socket;
public:
explicit SocketPool(const SocketOptions& opt);
~SocketPool();
// Get an address-able socket. If the pool is empty, create one.
// Returns 0 on success.
int GetSocket(SocketUniquePtr* ptr);
// Return a socket (which was returned by GetSocket) back to the pool,
// if the pool is full, setfail the socket directly.
void ReturnSocket(Socket* sock);
// Get all pooled sockets inside.
void ListSockets(std::vector<SocketId>* list, size_t max_count);
private:
// options used to create this instance
SocketOptions _options;
butil::Mutex _mutex;
std::vector<SocketId> _pool;
butil::EndPoint _remote_side;
butil::atomic<int> _numfree; // #free sockets in all sub pools.
butil::atomic<int> _numinflight; // #inflight sockets in all sub pools.
};
// NOTE: sizeof of this class is 1200 bytes. If we have 10K sockets, total
// memory is 12MB, not lightweight, but acceptable.
struct ExtendedSocketStat : public SocketStat {
// For computing stat.
size_t last_in_size;
size_t last_in_num_messages;
size_t last_out_size;
size_t last_out_num_messages;
struct Sampled {
uint32_t in_size_s;
uint32_t in_num_messages_s;
uint32_t out_size_s;
uint32_t out_num_messages_s;
};
SparseMinuteCounter<Sampled> _minute_counter;
ExtendedSocketStat()
: last_in_size(0)
, last_in_num_messages(0)
, last_out_size(0)
, last_out_num_messages(0) {
memset((SocketStat*)this, 0, sizeof(SocketStat));
}
};
// Shared by main socket and derivative sockets.
class Socket::SharedPart : public SharedObject {
public:
// A pool of sockets on which only a request can be sent, corresponding
// to CONNECTION_TYPE_POOLED. When RPC begins, it picks one socket from
// this pool and send the request, when the RPC ends, it returns the
// socket back to this pool.
// Before rev <= r32136, the pool is managed globally in socket_map.cpp
// which has the disadvantage that accesses to different pools contend
// with each other.
butil::atomic<SocketPool*> socket_pool;
// The socket newing this object.
SocketId creator_socket_id;
// Counting number of continuous ETIMEDOUT
butil::atomic<int> num_continuous_connect_timeouts;
// _in_size, _in_num_messages, _out_size, _out_num_messages of pooled
// sockets are counted into the corresponding fields in their _main_socket.
butil::atomic<size_t> in_size;
butil::atomic<size_t> in_num_messages;
butil::atomic<size_t> out_size;
butil::atomic<size_t> out_num_messages;
// For computing stats.
ExtendedSocketStat* extended_stat;
CircuitBreaker circuit_breaker;
butil::atomic<uint64_t> recent_error_count;
explicit SharedPart(SocketId creator_socket_id);
~SharedPart();
// Call this method every second (roughly)
void UpdateStatsEverySecond(int64_t now_ms);
};
Socket::SharedPart::SharedPart(SocketId creator_socket_id2)
: socket_pool(NULL)
, creator_socket_id(creator_socket_id2)
, num_continuous_connect_timeouts(0)
, in_size(0)
, in_num_messages(0)
, out_size(0)
, out_num_messages(0)
, extended_stat(NULL)
, recent_error_count(0) {
}
Socket::SharedPart::~SharedPart() {
delete extended_stat;
extended_stat = NULL;
delete socket_pool.exchange(NULL, butil::memory_order_relaxed);
}
void Socket::SharedPart::UpdateStatsEverySecond(int64_t now_ms) {
ExtendedSocketStat* stat = extended_stat;
if (stat == NULL) {
stat = new (std::nothrow) ExtendedSocketStat;
if (stat == NULL) {
return;
}
extended_stat = stat;
}
// Save volatile counters.
const size_t in_sz = in_size.load(butil::memory_order_relaxed);
const size_t in_nmsg = in_num_messages.load(butil::memory_order_relaxed);
const size_t out_sz = out_size.load(butil::memory_order_relaxed);
const size_t out_nmsg = out_num_messages.load(butil::memory_order_relaxed);
// Notice that we don't normalize any data, mainly because normalization
// often make data inaccurate and confuse users. This assumes that this
// function is called exactly every second. This may not be true when the
// running machine gets very busy. TODO(gejun): Figure out a method to
// selectively normalize data when the calling interval is far from 1 second.
stat->in_size_s = in_sz - stat->last_in_size;
stat->in_num_messages_s = in_nmsg - stat->last_in_num_messages;
stat->out_size_s = out_sz - stat->last_out_size;
stat->out_num_messages_s = out_nmsg - stat->last_out_num_messages;
stat->last_in_size = in_sz;
stat->last_in_num_messages = in_nmsg;
stat->last_out_size = out_sz;
stat->last_out_num_messages = out_nmsg;
ExtendedSocketStat::Sampled popped;
if (stat->in_size_s |/*bitwise or*/
stat->in_num_messages_s |
stat->out_size_s |
stat->out_num_messages_s) {
ExtendedSocketStat::Sampled s = {
stat->in_size_s, stat->in_num_messages_s,
stat->out_size_s, stat->out_num_messages_s
};
stat->in_size_m += s.in_size_s;
stat->in_num_messages_m += s.in_num_messages_s;
stat->out_size_m += s.out_size_s;
stat->out_num_messages_m += s.out_num_messages_s;
if (stat->_minute_counter.Add(now_ms, s, &popped)) {
stat->in_size_m -= popped.in_size_s;
stat->in_num_messages_m -= popped.in_num_messages_s;
stat->out_size_m -= popped.out_size_s;
stat->out_num_messages_m -= popped.out_num_messages_s;
}
}
while (stat->_minute_counter.TryPop(now_ms, &popped)) {
stat->in_size_m -= popped.in_size_s;
stat->in_num_messages_m -= popped.in_num_messages_s;
stat->out_size_m -= popped.out_size_s;
stat->out_num_messages_m -= popped.out_num_messages_s;
}
}
SocketVarsCollector* g_vars = NULL;
static pthread_once_t s_create_vars_once = PTHREAD_ONCE_INIT;
static void CreateVars() {
g_vars = new SocketVarsCollector;
}
void Socket::CreateVarsOnce() {
CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars));
}
// Used by ConnectionService
int64_t GetChannelConnectionCount() {
if (g_vars) {
return g_vars->channel_conn.get_value();
}
return 0;
}
bool Socket::CreatedByConnect() const {
return _user == static_cast<SocketUser*>(get_client_side_messenger());
}
SocketMessage* const DUMMY_USER_MESSAGE = (SocketMessage*)0x1;
const uint32_t MAX_PIPELINED_COUNT = 16384;
struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest {
static WriteRequest* const UNCONNECTED;
butil::IOBuf data;
WriteRequest* next;
bthread_id_t id_wait;
Socket* socket;
uint32_t pipelined_count() const {
return (_pc_and_udmsg >> 48) & 0x3FFF;
}
uint32_t get_auth_flags() const {
return (_pc_and_udmsg >> 62) & 0x03;
}
void clear_pipelined_count_and_auth_flags() {
_pc_and_udmsg &= 0xFFFFFFFFFFFFULL;
}
SocketMessage* user_message() const {
return (SocketMessage*)(_pc_and_udmsg & 0xFFFFFFFFFFFFULL);
}
void clear_user_message() {
_pc_and_udmsg &= 0xFFFF000000000000ULL;
}
void set_pipelined_count_and_user_message(
uint32_t pc, SocketMessage* msg, uint32_t auth_flags) {
if (auth_flags) {
pc |= (auth_flags & 0x03) << 14;
}
_pc_and_udmsg = ((uint64_t)pc << 48) | (uint64_t)(uintptr_t)msg;
}
bool reset_pipelined_count_and_user_message() {
SocketMessage* msg = user_message();
if (msg) {
if (msg != DUMMY_USER_MESSAGE) {
butil::IOBuf dummy_buf;
// We don't care about the return value since the request
// is already failed.
(void)msg->AppendAndDestroySelf(&dummy_buf, NULL);
}
set_pipelined_count_and_user_message(0, NULL, 0);
return true;
}
return false;
}
// Register pipelined_count and user_message
void Setup(Socket* s);
private:
uint64_t _pc_and_udmsg;
};
void Socket::WriteRequest::Setup(Socket* s) {
SocketMessage* msg = user_message();
if (msg) {
clear_user_message();
if (msg != DUMMY_USER_MESSAGE) {
butil::Status st = msg->AppendAndDestroySelf(&data, s);
if (!st.ok()) {
// Abandon the request.
data.clear();
bthread_id_error2(id_wait, st.error_code(), st.error_cstr());
return;
}
}
const int64_t before_write =
s->_unwritten_bytes.fetch_add(data.size(), butil::memory_order_relaxed);
if (before_write + (int64_t)data.size() >= FLAGS_socket_max_unwritten_bytes) {
s->_overcrowded = true;
}
}
const uint32_t pc = pipelined_count();
if (pc) {
// For positional correspondence between responses and requests,
// which is common in cache servers: memcache, redis...
// The struct will be popped when reading a message from the socket.
PipelinedInfo pi;
pi.count = pc;
pi.auth_flags = get_auth_flags();
pi.id_wait = id_wait;
clear_pipelined_count_and_auth_flags(); // avoid being pushed again
s->PushPipelinedInfo(pi);
}
}
Socket::WriteRequest* const Socket::WriteRequest::UNCONNECTED =
(Socket::WriteRequest*)(intptr_t)-1;
class Socket::EpollOutRequest : public SocketUser {
public:
EpollOutRequest() : fd(-1), timer_id(0)
, on_epollout_event(NULL), data(NULL) {}
~EpollOutRequest() {
// Remove the timer at last inside destructor to avoid
// race with the place that registers the timer
if (timer_id) {
bthread_timer_del(timer_id);
timer_id = 0;
}
}
void BeforeRecycle(Socket*) {
// Recycle itself
delete this;
}
int fd;
bthread_timer_t timer_id;
int (*on_epollout_event)(int fd, int err, void* data);
void* data;
};
static const uint64_t AUTH_FLAG = (1ul << 32);
Socket::Socket(Forbidden)
// must be even because Address() relies on evenness of version
: _versioned_ref(0)
, _shared_part(NULL)
, _nevent(0)
, _keytable_pool(NULL)
, _fd(-1)
, _tos(0)
, _reset_fd_real_us(-1)
, _on_edge_triggered_events(NULL)
, _user(NULL)
, _conn(NULL)
, _this_id(0)
, _preferred_index(-1)
, _hc_count(0)
, _last_msg_size(0)
, _avg_msg_size(0)
, _last_readtime_us(0)
, _parsing_context(NULL)
, _correlation_id(0)
, _health_check_interval_s(-1)
, _is_hc_related_ref_held(false)
, _hc_started(false)
, _ninprocess(1)
, _auth_flag_error(0)
, _auth_id(INVALID_BTHREAD_ID)
, _auth_context(NULL)
, _ssl_state(SSL_UNKNOWN)
, _ssl_session(NULL)
, _rdma_ep(NULL)
, _rdma_state(RDMA_OFF)
, _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
, _controller_released_socket(false)
, _overcrowded(false)
, _fail_me_at_server_stop(false)
, _logoff_flag(false)
, _additional_ref_status(REF_USING)
, _error_code(0)
, _pipeline_q(NULL)
, _last_writetime_us(0)
, _unwritten_bytes(0)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
, _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
{
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
_epollout_butex = bthread::butex_create_checked<butil::atomic<int> >();
}
Socket::~Socket() {
pthread_mutex_destroy(&_id_wait_list_mutex);
bthread::butex_destroy(_epollout_butex);
}
void Socket::ReturnSuccessfulWriteRequest(Socket::WriteRequest* p) {
DCHECK(p->data.empty());
AddOutputMessages(1);
const bthread_id_t id_wait = p->id_wait;
butil::return_object(p);
if (id_wait != INVALID_BTHREAD_ID) {
NotifyOnFailed(id_wait);
}
}
void Socket::ReturnFailedWriteRequest(Socket::WriteRequest* p, int error_code,
const std::string& error_text) {
if (!p->reset_pipelined_count_and_user_message()) {
CancelUnwrittenBytes(p->data.size());
}
p->data.clear(); // data is probably not written.
const bthread_id_t id_wait = p->id_wait;
butil::return_object(p);
if (id_wait != INVALID_BTHREAD_ID) {
bthread_id_error2(id_wait, error_code, error_text);
}
}
Socket::WriteRequest* Socket::ReleaseWriteRequestsExceptLast(
Socket::WriteRequest* req, int error_code, const std::string& error_text) {
WriteRequest* p = req;
while (p->next != NULL) {
WriteRequest* const saved_next = p->next;
ReturnFailedWriteRequest(p, error_code, error_text);
p = saved_next;
}
return p;
}
void Socket::ReleaseAllFailedWriteRequests(Socket::WriteRequest* req) {
CHECK(Failed());
pthread_mutex_lock(&_id_wait_list_mutex);
const int error_code = non_zero_error_code();
const std::string error_text = _error_text;
pthread_mutex_unlock(&_id_wait_list_mutex);
// Notice that `req' is not tail if Address after IsWriteComplete fails.
do {
req = ReleaseWriteRequestsExceptLast(req, error_code, error_text);
if (!req->reset_pipelined_count_and_user_message()) {
CancelUnwrittenBytes(req->data.size());
}
req->data.clear(); // MUST, otherwise IsWriteComplete is false
} while (!IsWriteComplete(req, true, NULL));
ReturnFailedWriteRequest(req, error_code, error_text);
}
int Socket::ResetFileDescriptor(int fd) {
// Reset message sizes when fd is changed.
_last_msg_size = 0;
_avg_msg_size = 0;
// MUST store `_fd' before adding itself into epoll device to avoid
// race conditions with the callback function inside epoll
_fd.store(fd, butil::memory_order_release);
_reset_fd_real_us = butil::gettimeofday_us();
if (!ValidFileDescriptor(fd)) {
return 0;
}
// OK to fail, non-socket fd does not support this.
if (butil::get_local_side(fd, &_local_side) != 0) {
_local_side = butil::EndPoint();
}
// FIXME : close-on-exec should be set by new syscalls or worse: set right
// after fd-creation syscall. Setting at here has higher probabilities of
// race condition.
butil::make_close_on_exec(fd);
// Make the fd non-blocking.
if (butil::make_non_blocking(fd) != 0) {
PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";
return -1;
}
// turn off nagling.
// OK to fail, namely unix domain socket does not support this.
butil::make_no_delay(fd);
if (_tos > 0 &&
setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) < 0) {
PLOG(FATAL) << "Fail to set tos of fd=" << fd << " to " << _tos;
}
if (FLAGS_socket_send_buffer_size > 0) {
int buff_size = FLAGS_socket_send_buffer_size;
socklen_t size = sizeof(buff_size);
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, size) != 0) {
PLOG(FATAL) << "Fail to set sndbuf of fd=" << fd << " to "
<< buff_size;
}
}
if (FLAGS_socket_recv_buffer_size > 0) {
int buff_size = FLAGS_socket_recv_buffer_size;
socklen_t size = sizeof(buff_size);
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, size) != 0) {
PLOG(FATAL) << "Fail to set rcvbuf of fd=" << fd << " to "
<< buff_size;
}
}
if (_on_edge_triggered_events) {
if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
}
}
return 0;
}
// SocketId = 32-bit version + 32-bit slot.
// version: from version part of _versioned_nref, must be an EVEN number.
// slot: designated by ResourcePool.
int Socket::Create(const SocketOptions& options, SocketId* id) {
butil::ResourceId<Socket> slot;
Socket* const m = butil::get_resource(&slot, Forbidden());
if (m == NULL) {
LOG(FATAL) << "Fail to get_resource<Socket>";
return -1;
}
g_vars->nsocket << 1;
CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
m->_nevent.store(0, butil::memory_order_relaxed);
m->_keytable_pool = options.keytable_pool;
m->_tos = 0;
m->_remote_side = options.remote_side;
m->_on_edge_triggered_events = options.on_edge_triggered_events;
m->_user = options.user;
m->_conn = options.conn;
m->_app_connect = options.app_connect;
// nref can be non-zero due to concurrent AddressSocket().
// _this_id will only be used in destructor/Destroy of referenced
// slots, which is safe and properly fenced. Although it's better
// to put the id into SocketUniquePtr.
m->_this_id = MakeSocketId(
VersionOfVRef(m->_versioned_ref.fetch_add(
1, butil::memory_order_release)), slot);
m->_preferred_index = -1;
m->_hc_count = 0;
CHECK(m->_read_buf.empty());
const int64_t cpuwide_now = butil::cpuwide_time_us();
m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->reset_parsing_context(options.initial_parsing_context);
m->_correlation_id = 0;
m->_health_check_interval_s = options.health_check_interval_s;
m->_is_hc_related_ref_held = false;
m->_hc_started.store(false, butil::memory_order_relaxed);
m->_ninprocess.store(1, butil::memory_order_relaxed);
m->_auth_flag_error.store(0, butil::memory_order_relaxed);
const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
if (rc2) {
LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);
m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
return -1;
}
// Disable SSL check if there is no SSL context
m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
m->_ssl_session = NULL;
m->_ssl_ctx = options.initial_ssl_ctx;
#if BRPC_WITH_RDMA
CHECK(m->_rdma_ep == NULL);
if (options.use_rdma) {
m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
if (!m->_rdma_ep) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to create RdmaEndpoint";
m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
berror(saved_errno));
return -1;
}
m->_rdma_state = RDMA_UNKNOWN;
} else {
m->_rdma_state = RDMA_OFF;
}
#endif
m->_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
m->_controller_released_socket.store(false, butil::memory_order_relaxed);
m->_overcrowded = false;
// May be non-zero for RTMP connections.
m->_fail_me_at_server_stop = false;
m->_logoff_flag.store(false, butil::memory_order_relaxed);
m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed);
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) {
LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);
m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
return -1;
}
m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
// Must be last one! Internal fields of this Socket may be access
// just after calling ResetFileDescriptor.
if (m->ResetFileDescriptor(options.fd) != 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to ResetFileDescriptor";
m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
berror(saved_errno));
return -1;
}
*id = m->_this_id;
return 0;
}
int Socket::WaitAndReset(int32_t expected_nref) {
const uint32_t id_ver = VersionOfSocketId(_this_id);
uint64_t vref;
// Wait until nref == expected_nref.
while (1) {
// The acquire fence pairs with release fence in Dereference to avoid
// inconsistent states to be seen by others.
vref = _versioned_ref.load(butil::memory_order_acquire);
if (VersionOfVRef(vref) != id_ver + 1) {
LOG(WARNING) << "SocketId=" << _this_id << " is already alive or recycled";
return -1;
}
if (NRefOfVRef(vref) > expected_nref) {
if (bthread_usleep(1000L/*FIXME*/) < 0) {
PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep";
return -1;
}
} else if (NRefOfVRef(vref) < expected_nref) {
RPC_VLOG << "SocketId=" << _this_id
<< " was abandoned during health checking";
return -1;
} else {
// nobody holds a health-checking-related reference,
// so no need to do health checking.
if (!_is_hc_related_ref_held) {
RPC_VLOG << "Nobody holds a health-checking-related reference"
<< " for SocketId=" << _this_id;
return -1;
}
break;
}
}
// It's safe to close previous fd (provided expected_nref is correct).
const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
if (ValidFileDescriptor(prev_fd)) {
if (_on_edge_triggered_events != NULL) {
GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd);
}
close(prev_fd);
if (CreatedByConnect()) {
g_vars->channel_conn << -1;
}
}
#if BRPC_WITH_RDMA
if (_rdma_ep) {
_rdma_ep->Reset();
_rdma_state = RDMA_UNKNOWN;
}
#endif
_local_side = butil::EndPoint();
if (_ssl_session) {
SSL_free(_ssl_session);
_ssl_session = NULL;
}
_ssl_state = SSL_UNKNOWN;
_nevent.store(0, butil::memory_order_relaxed);
// parsing_context is very likely to be associated with the fd,
// removing it is a safer choice and required by http2.
reset_parsing_context(NULL);
// Must clear _read_buf otehrwise even if the connections is recovered,
// the kept old data is likely to make parsing fail.
_read_buf.clear();
_ninprocess.store(1, butil::memory_order_relaxed);
_auth_flag_error.store(0, butil::memory_order_relaxed);
bthread_id_error(_auth_id, 0);
const int rc = bthread_id_create(&_auth_id, NULL, NULL);
if (rc != 0) {
LOG(FATAL) << "Fail to create _auth_id, " << berror(rc);
return -1;
}
const int64_t cpuwide_now = butil::cpuwide_time_us();
_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
_logoff_flag.store(false, butil::memory_order_relaxed);
{
BAIDU_SCOPED_LOCK(_pipeline_mutex);
if (_pipeline_q) {
_pipeline_q->clear();
}
}
SharedPart* sp = GetSharedPart();
if (sp) {
sp->circuit_breaker.Reset();
sp->recent_error_count.store(0, butil::memory_order_relaxed);
}
return 0;
}
// We don't care about the return value of Revive.
void Socket::Revive() {
const uint32_t id_ver = VersionOfSocketId(_this_id);
uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
_additional_ref_status.store(REF_REVIVING, butil::memory_order_relaxed);
while (1) {
CHECK_EQ(id_ver + 1, VersionOfVRef(vref));
int32_t nref = NRefOfVRef(vref);
if (nref <= 1) {
// Set status to REF_RECYLED since no one uses this socket
_additional_ref_status.store(REF_RECYCLED, butil::memory_order_relaxed);
CHECK_EQ(1, nref);
LOG(WARNING) << *this << " was abandoned during revival";
return;
}
// +1 is the additional ref added in Create(). TODO(gejun): we should
// remove this additional nref someday.
if (_versioned_ref.compare_exchange_weak(
vref, MakeVRef(id_ver, nref + 1/*note*/),
butil::memory_order_release,
butil::memory_order_relaxed)) {
// Set status to REF_USING since we add additional ref again
_additional_ref_status.store(REF_USING, butil::memory_order_relaxed);
if (_user) {
_user->AfterRevived(this);
} else {
LOG(INFO) << "Revived " << *this << " (Connectable)";
}
return;
}
}
}
int Socket::ReleaseAdditionalReference() {
do {
AdditionalRefStatus expect = REF_USING;
if (_additional_ref_status.compare_exchange_strong(
expect,
REF_RECYCLED,
butil::memory_order_relaxed,
butil::memory_order_relaxed)) {
return Dereference();
}
if (expect == REF_REVIVING) { // sched_yield to wait until status is not REF_REVIVING
sched_yield();
} else {
return -1; // REF_RECYCLED
}
} while (1);
}
void Socket::AddRecentError() {
SharedPart* sp = GetSharedPart();
if (sp) {
sp->recent_error_count.fetch_add(1, butil::memory_order_relaxed);
}
}
int64_t Socket::recent_error_count() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->recent_error_count.load(butil::memory_order_relaxed);
}
return 0;
}
int Socket::isolated_times() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->circuit_breaker.isolated_times();
}
return 0;
}
int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
if (error_code == 0) {
CHECK(false) << "error_code is 0";
error_code = EFAILEDSOCKET;
}
const uint32_t id_ver = VersionOfSocketId(_this_id);
uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
for (;;) { // need iteration to retry compare_exchange_strong
if (VersionOfVRef(vref) != id_ver) {
return -1;
}
// Try to set version=id_ver+1 (to make later Address() return NULL),
// retry on fail.
if (_versioned_ref.compare_exchange_strong(
vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
butil::memory_order_release,
butil::memory_order_relaxed)) {
// Update _error_text
std::string error_text;
if (error_fmt != NULL) {
va_list ap;
va_start(ap, error_fmt);
butil::string_vprintf(&error_text, error_fmt, ap);
va_end(ap);
}
pthread_mutex_lock(&_id_wait_list_mutex);
_error_code = error_code;
_error_text = error_text;
pthread_mutex_unlock(&_id_wait_list_mutex);
// Do health-checking even if we're not connected before, needed
// by Channel to revive never-connected socket when server side
// comes online.
if (_health_check_interval_s > 0) {
bool expect = false;
if (_hc_started.compare_exchange_strong(expect,
true,
butil::memory_order_relaxed,
butil::memory_order_relaxed)) {
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
StartHealthCheck(id(),
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
} else {
// No need to run 2 health checking at the same time.
RPC_VLOG << "There is already a health checking running "
"for SocketId=" << _this_id;
}
}
// Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
bthread::butex_wake_all(_epollout_butex);
// Wake up all unresponded RPC.
CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
&_id_wait_list, error_code, error_text,
&_id_wait_list_mutex));
ResetAllStreams();
// _app_connect shouldn't be set to NULL in SetFailed otherwise
// HC is always not supported.
// FIXME: Design a better interface for AppConnect
// if (_app_connect) {
// AppConnect* const saved_app_connect = _app_connect;
// _app_connect = NULL;
// saved_app_connect->StopConnect(this);
// }
// Deref additionally which is added at creation so that this
// Socket's reference will hit 0(recycle) when no one addresses it.
ReleaseAdditionalReference();
// NOTE: This Socket may be recycled at this point, don't
// touch anything.
return 0;
}
}
}
int Socket::SetFailed() {
return SetFailed(EFAILEDSOCKET, NULL);
}
void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
if (SetFailed(main_socket_id()) == 0) {
LOG(ERROR) << "Socket[" << *this << "] isolated by circuit breaker";
}
}
}
int Socket::ReleaseReferenceIfIdle(int idle_seconds) {
const int64_t last_active_us = last_active_time_us();
if (butil::cpuwide_time_us() - last_active_us <= idle_seconds * 1000000L) {
return 0;
}
LOG_IF(WARNING, FLAGS_log_idle_connection_close)
<< "Close " << *this << " due to no data transmission for "
<< idle_seconds << " seconds";
if (shall_fail_me_at_server_stop()) {
// sockets for streaming purposes (say RTMP) are probably referenced
// by many places, ReleaseAdditionalReference() cannot notify other
// places to release refs, SetFailed() is a must.
return SetFailed(EUNUSED, "No data transmission for %d seconds",
idle_seconds);
}
return ReleaseAdditionalReference();
}
int Socket::SetFailed(SocketId id) {
SocketUniquePtr ptr;
if (Address(id, &ptr) != 0) {
return -1;
}
return ptr->SetFailed();
}
void Socket::NotifyOnFailed(bthread_id_t id) {
pthread_mutex_lock(&_id_wait_list_mutex);
if (!Failed()) {
const int rc = bthread_id_list_add(&_id_wait_list, id);
pthread_mutex_unlock(&_id_wait_list_mutex);
if (rc != 0) {
bthread_id_error(id, rc);
}
} else {
const int rc = non_zero_error_code();
const std::string desc = _error_text;
pthread_mutex_unlock(&_id_wait_list_mutex);
bthread_id_error2(id, rc, desc);
}
}
// For unit-test.
int Socket::Status(SocketId id, int32_t* nref) {
const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
Socket* const m = address_resource(slot);
if (m != NULL) {
const uint64_t vref = m->_versioned_ref.load(butil::memory_order_relaxed);
if (VersionOfVRef(vref) == VersionOfSocketId(id)) {
if (nref) {
*nref = NRefOfVRef(vref);
}
return 0;
} else if (VersionOfVRef(vref) == VersionOfSocketId(id) + 1) {
if (nref) {
*nref = NRefOfVRef(vref);
}
return 1;
}
}
return -1;
}
void Socket::OnRecycle() {
const bool create_by_connect = CreatedByConnect();
if (_app_connect) {
std::shared_ptr<AppConnect> tmp;
_app_connect.swap(tmp);
tmp->StopConnect(this);
}
if (_conn) {
SocketConnection* const saved_conn = _conn;
_conn = NULL;
saved_conn->BeforeRecycle(this);
}
if (_user) {
SocketUser* const saved_user = _user;
_user = NULL;
saved_user->BeforeRecycle(this);
}
SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
if (sp) {
sp->RemoveRefManually();
}
const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
if (ValidFileDescriptor(prev_fd)) {
if (_on_edge_triggered_events != NULL) {
GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd);
}
close(prev_fd);
if (create_by_connect) {
g_vars->channel_conn << -1;
}
}
#if BRPC_WITH_RDMA
if (_rdma_ep) {
delete _rdma_ep;
_rdma_ep = NULL;
_rdma_state = RDMA_UNKNOWN;
}
#endif
reset_parsing_context(NULL);
_read_buf.clear();
_auth_flag_error.store(0, butil::memory_order_relaxed);
bthread_id_error(_auth_id, 0);
bthread_id_list_destroy(&_id_wait_list);
if (_ssl_session) {
SSL_free(_ssl_session);
_ssl_session = NULL;
}
_ssl_ctx = NULL;
delete _pipeline_q;
_pipeline_q = NULL;
delete _auth_context;
_auth_context = NULL;
delete _stream_set;
_stream_set = NULL;
const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) {
SocketUniquePtr ptr;
if (Socket::Address(asid, &ptr) == 0) {
ptr->ReleaseAdditionalReference();
}
}
g_vars->nsocket << -1;
}
void* Socket::ProcessEvent(void* arg) {
// the enclosed Socket is valid and free to access inside this function.
SocketUniquePtr s(static_cast<Socket*>(arg));
s->_on_edge_triggered_events(s.get());
return NULL;
}
// Check if there're new requests appended.
// If yes, point old_head to reversed new requests and return false;
// If no:
// old_head is fully written, set _write_head to NULL and return true;
// old_head is not written yet, keep _write_head unchanged and return false;
// `old_head' is last new_head got from this function or (in another word)
// tail of current writing list.
// `singular_node' is true iff `old_head' is the only node in its list.
bool Socket::IsWriteComplete(Socket::WriteRequest* old_head,
bool singular_node,
Socket::WriteRequest** new_tail) {
CHECK(NULL == old_head->next);
// Try to set _write_head to NULL to mark that the write is done.
WriteRequest* new_head = old_head;
WriteRequest* desired = NULL;
bool return_when_no_more = true;
if (!old_head->data.empty() || !singular_node) {
desired = old_head;
// Write is obviously not complete if old_head is not fully written.
return_when_no_more = false;
}
if (_write_head.compare_exchange_strong(
new_head, desired, butil::memory_order_acquire)) {
// No one added new requests.
if (new_tail) {
*new_tail = old_head;
}
return return_when_no_more;
}
CHECK_NE(new_head, old_head);
// Above acquire fence pairs release fence of exchange in Write() to make
// sure that we see all fields of requests set.
// Someone added new requests.
// Reverse the list until old_head.
WriteRequest* tail = NULL;
WriteRequest* p = new_head;
do {
while (p->next == WriteRequest::UNCONNECTED) {
// TODO(gejun): elaborate this
sched_yield();
}
WriteRequest* const saved_next = p->next;
p->next = tail;
tail = p;
p = saved_next;
CHECK(p != NULL);
} while (p != old_head);
// Link old list with new list.
old_head->next = tail;
// Call Setup() from oldest to newest, notice that the calling sequence
// matters for protocols using pipelined_count, this is why we don't
// call Setup in above loop which is from newest to oldest.
for (WriteRequest* q = tail; q; q = q->next) {
q->Setup(this);
}
if (new_tail) {
*new_tail = new_head;
}
return false;
}
int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) {
if (!ValidFileDescriptor(fd)) {
return 0;
}
// Do not need to check addressable since it will be called by
// health checker which called `SetFailed' before
const int expected_val = _epollout_butex->load(butil::memory_order_relaxed);
EventDispatcher& edisp = GetGlobalEventDispatcher(fd);
if (edisp.AddEpollOut(id(), fd, pollin) != 0) {
return -1;
}
int rc = bthread::butex_wait(_epollout_butex, expected_val, abstime);
const int saved_errno = errno;
if (rc < 0 && errno == EWOULDBLOCK) {
// Could be writable or spurious wakeup
rc = 0;
}
// Ignore return value since `fd' might have been removed
// by `RemoveConsumer' in `SetFailed'
butil::ignore_result(edisp.RemoveEpollOut(id(), fd, pollin));
errno = saved_errno;
// Could be writable or spurious wakeup (by former epollout)
return rc;
}
int Socket::Connect(const timespec* abstime,
int (*on_connect)(int, int, void*), void* data) {
if (_ssl_ctx) {
_ssl_state = SSL_CONNECTING;
} else {
_ssl_state = SSL_OFF;
}
struct sockaddr_storage serv_addr;
socklen_t addr_size = 0;
if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) {
PLOG(ERROR) << "Fail to get sockaddr";
return -1;
}
butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
if (sockfd < 0) {
PLOG(ERROR) << "Fail to create socket";
return -1;
}
CHECK_EQ(0, butil::make_close_on_exec(sockfd));
// We need to do async connect (to manage the timeout by ourselves).
CHECK_EQ(0, butil::make_non_blocking(sockfd));
const int rc = ::connect(
sockfd, (struct sockaddr*)&serv_addr, addr_size);
if (rc != 0 && errno != EINPROGRESS) {
PLOG(WARNING) << "Fail to connect to " << remote_side();
return -1;
}
if (on_connect) {
EpollOutRequest* req = new(std::nothrow) EpollOutRequest;
if (req == NULL) {
LOG(FATAL) << "Fail to new EpollOutRequest";
return -1;
}
req->fd = sockfd;
req->timer_id = 0;
req->on_epollout_event = on_connect;
req->data = data;
// A temporary Socket to hold `EpollOutRequest', which will
// be added into epoll device soon
SocketId connect_id;
SocketOptions options;
options.user = req;
if (Socket::Create(options, &connect_id) != 0) {
LOG(FATAL) << "Fail to create Socket";
delete req;
return -1;
}
// From now on, ownership of `req' has been transferred to
// `connect_id'. We hold an additional reference here to
// ensure `req' to be valid in this scope
SocketUniquePtr s;
CHECK_EQ(0, Socket::Address(connect_id, &s));
// Add `sockfd' into epoll so that `HandleEpollOutRequest' will
// be called with `req' when epoll event reaches
if (GetGlobalEventDispatcher(sockfd).
AddEpollOut(connect_id, sockfd, false) != 0) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
(int)sockfd, berror(saved_errno));
return -1;
}
// Register a timer for EpollOutRequest. Note that the timeout
// callback has no race with the one above as both of them try
// to `SetFailed' `connect_id' while only one of them can succeed
// It also work when `HandleEpollOutRequest' has already been
// called before adding the timer since it will be removed
// inside destructor of `EpollOutRequest' after leaving this scope
if (abstime) {
int rc = bthread_timer_add(&req->timer_id, *abstime,
HandleEpollOutTimeout,
(void*)connect_id);
if (rc) {
LOG(ERROR) << "Fail to add timer: " << berror(rc);
s->SetFailed(rc, "Fail to add timer: %s", berror(rc));
return -1;
}
}
} else {
if (WaitEpollOut(sockfd, false, abstime) != 0) {
PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;
return -1;
}
if (CheckConnected(sockfd) != 0) {
return -1;
}
}
return sockfd.release();
}
int Socket::CheckConnected(int sockfd) {
if (sockfd == STREAM_FAKE_FD) {
return 0;
}
int err = 0;
socklen_t errlen = sizeof(err);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
PLOG(ERROR) << "Fail to getsockopt of fd=" << sockfd;
return -1;
}
if (err != 0) {
CHECK_NE(err, EINPROGRESS);
errno = err;
return -1;
}
butil::EndPoint local_point;
CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
LOG_IF(INFO, FLAGS_log_connected)
<< "Connected to " << remote_side()
<< " via fd=" << (int)sockfd << " SocketId=" << id()
<< " local_side=" << local_point;
if (CreatedByConnect()) {
g_vars->channel_conn << 1;
}
// Doing SSL handshake after TCP connected
return SSLHandshake(sockfd, false);
}
int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
if (_fd.load(butil::memory_order_consume) >= 0) {
return 0;
}
// Have to hold a reference for `req'
SocketUniquePtr s;
ReAddress(&s);
req->socket = s.get();
if (_conn) {
if (_conn->Connect(this, abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
} else {
if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
}
s.release();
return 1;
}
void Socket::WakeAsEpollOut() {
_epollout_butex->fetch_add(1, butil::memory_order_release);
bthread::butex_wake_except(_epollout_butex, 0);
}
int Socket::HandleEpollOut(SocketId id) {
SocketUniquePtr s;
// Since Sockets might have been `SetFailed' before they were
// added into epoll, these sockets miss the signal inside
// `SetFailed' and therefore must be signalled here using
// `AddressFailedAsWell' to prevent waiting forever
if (Socket::AddressFailedAsWell(id, &s) < 0) {
// Ignore recycled sockets
return -1;
}
EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
if (req != NULL) {
return s->HandleEpollOutRequest(0, req);
}
// Currently `WaitEpollOut' needs `_epollout_butex'
// TODO(jiangrujie): Remove this in the future
s->_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
bthread::butex_wake_except(s->_epollout_butex, 0);
return 0;
}
void Socket::HandleEpollOutTimeout(void* arg) {
SocketId id = (SocketId)arg;
SocketUniquePtr s;
if (Socket::Address(id, &s) != 0) {
return;
}
EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
if (req == NULL) {
LOG(FATAL) << "Impossible! SocketUser MUST be EpollOutRequest here";
return;
}
s->HandleEpollOutRequest(ETIMEDOUT, req);
}
int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
// Only one thread can `SetFailed' this `Socket' successfully
// Also after this `req' will be destroyed when its reference
// hits zero
if (SetFailed() != 0) {
return -1;
}
// We've got the right to call user callback
// The timer will be removed inside destructor of EpollOutRequest
GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
return req->on_epollout_event(req->fd, error_code, req->data);
}
void Socket::AfterAppConnected(int err, void* data) {
WriteRequest* req = static_cast<WriteRequest*>(data);
if (err == 0) {
Socket* const s = req->socket;
SharedPart* sp = s->GetSharedPart();
if (sp) {
sp->num_continuous_connect_timeouts.store(0, butil::memory_order_relaxed);
}
// requests are not setup yet. check the comment on Setup() in Write()
req->Setup(s);
bthread_t th;
if (bthread_start_background(
&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) {
PLOG(WARNING) << "Fail to start KeepWrite";
KeepWrite(req);
}
} else {
SocketUniquePtr s(req->socket);
if (err == ETIMEDOUT) {
SharedPart* sp = s->GetOrNewSharedPart();
if (sp->num_continuous_connect_timeouts.fetch_add(
1, butil::memory_order_relaxed) + 1 >=
FLAGS_connect_timeout_as_unreachable) {
// the race between store and fetch_add(in another thread) is
// OK since a critial error is about to return.
sp->num_continuous_connect_timeouts.store(
0, butil::memory_order_relaxed);
err = ENETUNREACH;
}
}
s->SetFailed(err, "Fail to connect %s: %s",
s->description().c_str(), berror(err));
s->ReleaseAllFailedWriteRequests(req);
}
}
static void* RunClosure(void* arg) {
google::protobuf::Closure* done = (google::protobuf::Closure*)arg;
done->Run();
return NULL;
}
int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
WriteRequest* req = static_cast<WriteRequest*>(data);
Socket* s = req->socket;
if (err == 0 && s->ssl_state() == SSL_CONNECTING) {
// Run ssl connect in a new bthread to avoid blocking
// the current bthread (thus blocking the EventDispatcher)
bthread_t th;
google::protobuf::Closure* thrd_func = brpc::NewCallback(
Socket::CheckConnectedAndKeepWrite, fd, err, data);
if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
RunClosure, thrd_func)) == 0) {
return 0;
} else {
PLOG(ERROR) << "Fail to start bthread";
// Fall through with non zero `err'
}
}
CheckConnectedAndKeepWrite(fd, err, data);
return 0;
}
void Socket::CheckConnectedAndKeepWrite(int fd, int err, void* data) {
butil::fd_guard sockfd(fd);
WriteRequest* req = static_cast<WriteRequest*>(data);
Socket* s = req->socket;
CHECK_GE(sockfd, 0);
if (err == 0 && s->CheckConnected(sockfd) == 0
&& s->ResetFileDescriptor(sockfd) == 0) {
if (s->_app_connect) {
s->_app_connect->StartConnect(req->socket, AfterAppConnected, req);
} else {
// Successfully created a connection
AfterAppConnected(0, req);
}
// Release this socket for KeepWrite
sockfd.release();
} else {
if (err == 0) {
err = errno ? errno : -1;
}
AfterAppConnected(err, req);
}
}
inline int SetError(bthread_id_t id_wait, int ec) {
if (id_wait != INVALID_BTHREAD_ID) {
bthread_id_error(id_wait, ec);
return 0;
} else {
errno = ec;
return -1;
}
}
int Socket::ConductError(bthread_id_t id_wait) {
pthread_mutex_lock(&_id_wait_list_mutex);
if (Failed()) {
const int error_code = non_zero_error_code();
if (id_wait != INVALID_BTHREAD_ID) {
const std::string error_text = _error_text;
pthread_mutex_unlock(&_id_wait_list_mutex);
bthread_id_error2(id_wait, error_code, error_text);
return 0;
} else {
pthread_mutex_unlock(&_id_wait_list_mutex);
errno = error_code;
return -1;
}
} else {
pthread_mutex_unlock(&_id_wait_list_mutex);
return 1;
}
}
X509* Socket::GetPeerCertificate() const {
if (ssl_state() != SSL_CONNECTED) {
return NULL;
}
return SSL_get_peer_certificate(_ssl_session);
}
int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
WriteOptions opt;
if (options_in) {
opt = *options_in;
}
if (data->empty()) {
return SetError(opt.id_wait, EINVAL);
}
if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
<< " is too large";
return SetError(opt.id_wait, EOVERFLOW);
}
if (Failed()) {
const int rc = ConductError(opt.id_wait);
if (rc <= 0) {
return rc;
}
}
if (!opt.ignore_eovercrowded && _overcrowded) {
return SetError(opt.id_wait, EOVERCROWDED);
}
WriteRequest* req = butil::get_object<WriteRequest>();
if (!req) {
return SetError(opt.id_wait, ENOMEM);
}
req->data.swap(*data);
// Set `req->next' to UNCONNECTED so that the KeepWrite thread will
// wait until it points to a valid WriteRequest or NULL.
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(
opt.pipelined_count, DUMMY_USER_MESSAGE, opt.auth_flags);
return StartWrite(req, opt);
}
int Socket::Write(SocketMessagePtr<>& msg, const WriteOptions* options_in) {
WriteOptions opt;
if (options_in) {
opt = *options_in;
}
if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
<< " is too large";
return SetError(opt.id_wait, EOVERFLOW);
}
if (Failed()) {
const int rc = ConductError(opt.id_wait);
if (rc <= 0) {
return rc;
}
}
if (!opt.ignore_eovercrowded && _overcrowded) {
return SetError(opt.id_wait, EOVERCROWDED);
}
WriteRequest* req = butil::get_object<WriteRequest>();
if (!req) {
return SetError(opt.id_wait, ENOMEM);
}
// Set `req->next' to UNCONNECTED so that the KeepWrite thread will
// wait until it points to a valid WriteRequest or NULL.
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(opt.pipelined_count, msg.release(), opt.auth_flags);
return StartWrite(req, opt);
}
int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
// Release fence makes sure the thread getting request sees *req
WriteRequest* const prev_head =
_write_head.exchange(req, butil::memory_order_release);
if (prev_head != NULL) {
// Someone is writing to the fd. The KeepWrite thread may spin
// until req->next to be non-UNCONNECTED. This process is not
// lock-free, but the duration is so short(1~2 instructions,
// depending on compiler) that the spin rarely occurs in practice
// (I've not seen any spin in highly contended tests).
req->next = prev_head;
return 0;
}
int saved_errno = 0;
bthread_t th;
SocketUniquePtr ptr_for_keep_write;
ssize_t nw = 0;
// We've got the right to write.
req->next = NULL;
// Connect to remote_side() if not.
int ret = ConnectIfNot(opt.abstime, req);
if (ret < 0) {
saved_errno = errno;
SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
goto FAIL_TO_WRITE;
} else if (ret == 1) {
// We are doing connection. Callback `KeepWriteIfConnected'
// will be called with `req' at any moment after
return 0;
}
// NOTE: Setup() MUST be called after Connect which may call app_connect,
// which is assumed to run before any SocketMessage.AppendAndDestroySelf()
// in some protocols(namely RTMP).
req->Setup(this);
if (ssl_state() != SSL_OFF) {
// Writing into SSL may block the current bthread, always write
// in the background.
goto KEEPWRITE_IN_BACKGROUND;
}
// Write once in the calling thread. If the write is not complete,
// continue it in KeepWrite thread.
if (_conn) {
butil::IOBuf* data_arr[1] = { &req->data };
nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {
#if BRPC_WITH_RDMA
if (_rdma_ep && _rdma_state != RDMA_OFF) {
butil::IOBuf* data_arr[1] = { &req->data };
nw = _rdma_ep->CutFromIOBufList(data_arr, 1);
} else {
#else
{
#endif
nw = req->data.cut_into_file_descriptor(fd());
}
}
if (nw < 0) {
// RTMP may return EOVERCROWDED
if (errno != EAGAIN && errno != EOVERCROWDED) {
saved_errno = errno;
// EPIPE is common in pooled connections + backup requests.
PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
SetFailed(saved_errno, "Fail to write into %s: %s",
description().c_str(), berror(saved_errno));
goto FAIL_TO_WRITE;
}
} else {
AddOutputBytes(nw);
}
if (IsWriteComplete(req, true, NULL)) {
ReturnSuccessfulWriteRequest(req);
return 0;
}
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->socket = ptr_for_keep_write.release();
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
KeepWrite, req) != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
}
return 0;
FAIL_TO_WRITE:
// `SetFailed' before `ReturnFailedWriteRequest' (which will calls
// `on_reset' callback inside the id object) so that we immediately
// know this socket has failed inside the `on_reset' callback
ReleaseAllFailedWriteRequests(req);
errno = saved_errno;
return -1;
}
static const size_t DATA_LIST_MAX = 256;
void* Socket::KeepWrite(void* void_arg) {
g_vars->nkeepwrite << 1;
WriteRequest* req = static_cast<WriteRequest*>(void_arg);
SocketUniquePtr s(req->socket);
// When error occurs, spin until there's no more requests instead of
// returning directly otherwise _write_head is permantly non-NULL which
// makes later Write() abnormal.
WriteRequest* cur_tail = NULL;
do {
// req was written, skip it.
if (req->next != NULL && req->data.empty()) {
WriteRequest* const saved_req = req;
req = req->next;
s->ReturnSuccessfulWriteRequest(saved_req);
}
const ssize_t nw = s->DoWrite(req);
if (nw < 0) {
if (errno != EAGAIN && errno != EOVERCROWDED) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to keep-write into " << *s;
s->SetFailed(saved_errno, "Fail to keep-write into %s: %s",
s->description().c_str(), berror(saved_errno));
break;
}
} else {
s->AddOutputBytes(nw);
}
// Release WriteRequest until non-empty data or last request.
while (req->next != NULL && req->data.empty()) {
WriteRequest* const saved_req = req;
req = req->next;
s->ReturnSuccessfulWriteRequest(saved_req);
}
// TODO(gejun): wait for epollout when we actually have written
// all the data. This weird heuristic reduces 30us delay...
// Update(12/22/2015): seem not working. better switch to correct code.
// Update(1/8/2016, r31823): Still working.
// Update(8/15/2017): Not working, performance downgraded.
//if (nw <= 0 || req->data.empty()/*note*/) {
if (nw <= 0) {
// NOTE: Waiting epollout within timeout is a must to force
// KeepWrite to check and setup pending WriteRequests periodically,
// which may turn on _overcrowded to stop pending requests from
// growing infinitely.
const timespec duetime =
butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
#if BRPC_WITH_RDMA
if (s->_rdma_state == RDMA_ON) {
const int expected_val = s->_epollout_butex
->load(butil::memory_order_acquire);
CHECK(s->_rdma_ep != NULL);
if (!s->_rdma_ep->IsWritable()) {
g_vars->nwaitepollout << 1;
if (bthread::butex_wait(s->_epollout_butex,
expected_val, &duetime) < 0) {
if (errno != EAGAIN && errno != ETIMEDOUT) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to wait rdma window of " << *s;
s->SetFailed(saved_errno, "Fail to wait rdma window of %s: %s",
s->description().c_str(), berror(saved_errno));
}
if (s->Failed()) {
// NOTE:
// Different from TCP, we cannot find the RDMA channel
// failed by writing to it. Thus we must check if it
// is already failed here.
break;
}
}
}
} else {
#else
{
#endif
g_vars->nwaitepollout << 1;
bool pollin = (s->_on_edge_triggered_events != NULL);
const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
if (rc < 0 && errno != ETIMEDOUT) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to wait epollout of " << *s;
s->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
s->description().c_str(), berror(saved_errno));
break;
}
}
}
if (NULL == cur_tail) {
for (cur_tail = req; cur_tail->next != NULL;
cur_tail = cur_tail->next);
}
// Return when there's no more WriteRequests and req is completely
// written.
if (s->IsWriteComplete(cur_tail, (req == cur_tail), &cur_tail)) {
CHECK_EQ(cur_tail, req);
s->ReturnSuccessfulWriteRequest(req);
return NULL;
}
} while (1);
// Error occurred, release all requests until no new requests.
s->ReleaseAllFailedWriteRequests(req);
return NULL;
}
ssize_t Socket::DoWrite(WriteRequest* req) {
// Group butil::IOBuf in the list into a batch array.
butil::IOBuf* data_list[DATA_LIST_MAX];
size_t ndata = 0;
for (WriteRequest* p = req; p != NULL && ndata < DATA_LIST_MAX;
p = p->next) {
data_list[ndata++] = &p->data;
}
if (ssl_state() == SSL_OFF) {
// Write IOBuf in the batch array into the fd.
if (_conn) {
return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
} else {
#if BRPC_WITH_RDMA
if (_rdma_ep && _rdma_state != RDMA_OFF) {
return _rdma_ep->CutFromIOBufList(data_list, ndata);
}
#endif
return butil::IOBuf::cut_multiple_into_file_descriptor(
fd(), data_list, ndata);
}
}
CHECK_EQ(SSL_CONNECTED, ssl_state());
if (_conn) {
// TODO: Separate SSL stuff from SocketConnection
return _conn->CutMessageIntoSSLChannel(_ssl_session, data_list, ndata);
}
int ssl_error = 0;
ssize_t nw = butil::IOBuf::cut_multiple_into_SSL_channel(
_ssl_session, data_list, ndata, &ssl_error);
switch (ssl_error) {
case SSL_ERROR_NONE:
break;
case SSL_ERROR_WANT_READ:
// Disable renegotiation
errno = EPROTO;
return -1;
case SSL_ERROR_WANT_WRITE:
errno = EAGAIN;
break;
default: {
const unsigned long e = ERR_get_error();
if (e != 0) {
LOG(WARNING) << "Fail to write into ssl_fd=" << fd() << ": "
<< SSLError(e);
errno = ESSL;
} else {
// System error with corresponding errno set
PLOG(WARNING) << "Fail to write into ssl_fd=" << fd();
}
break;
}
}
return nw;
}
int Socket::SSLHandshake(int fd, bool server_mode) {
if (_ssl_ctx == NULL) {
if (server_mode) {
LOG(ERROR) << "Lack SSL configuration to handle SSL request";
return -1;
}
return 0;
}
// TODO: Reuse ssl session id for client
if (_ssl_session) {
// Free the last session, which may be deprecated when socket failed
SSL_free(_ssl_session);
}
_ssl_session = CreateSSLSession(_ssl_ctx->raw_ctx, id(), fd, server_mode);
if (_ssl_session == NULL) {
LOG(ERROR) << "Fail to CreateSSLSession";
return -1;
}
#if defined(SSL_CTRL_SET_TLSEXT_HOSTNAME) || defined(USE_MESALINK)
if (!_ssl_ctx->sni_name.empty()) {
SSL_set_tlsext_host_name(_ssl_session, _ssl_ctx->sni_name.c_str());
}
#endif
_ssl_state = SSL_CONNECTING;
// Loop until SSL handshake has completed. For SSL_ERROR_WANT_READ/WRITE,
// we use bthread_fd_wait as polling mechanism instead of EventDispatcher
// as it may confuse the origin event processing code.
while (true) {
ERR_clear_error();
int rc = SSL_do_handshake(_ssl_session);
if (rc == 1) {
_ssl_state = SSL_CONNECTED;
AddBIOBuffer(_ssl_session, fd, FLAGS_ssl_bio_buffer_size);
return 0;
}
int ssl_error = SSL_get_error(_ssl_session, rc);
switch (ssl_error) {
case SSL_ERROR_WANT_READ:
#if defined(OS_LINUX)
if (bthread_fd_wait(fd, EPOLLIN) != 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(fd, EVFILT_READ) != 0) {
#endif
return -1;
}
break;
case SSL_ERROR_WANT_WRITE:
#if defined(OS_LINUX)
if (bthread_fd_wait(fd, EPOLLOUT) != 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(fd, EVFILT_WRITE) != 0) {
#endif
return -1;
}
break;
default: {
const unsigned long e = ERR_get_error();
if (ssl_error == SSL_ERROR_ZERO_RETURN || e == 0) {
errno = ECONNRESET;
LOG(ERROR) << "SSL connection was shutdown by peer: " << _remote_side;
} else if (ssl_error == SSL_ERROR_SYSCALL) {
PLOG(ERROR) << "Fail to SSL_do_handshake";
} else {
errno = ESSL;
LOG(ERROR) << "Fail to SSL_do_handshake: " << SSLError(e);
}
return -1;
}
}
}
}
ssize_t Socket::DoRead(size_t size_hint) {
if (ssl_state() == SSL_UNKNOWN) {
int error_code = 0;
_ssl_state = DetectSSLState(fd(), &error_code);
switch (ssl_state()) {
case SSL_UNKNOWN:
if (error_code == 0) { // EOF
return 0;
} else {
errno = error_code;
return -1;
}
case SSL_CONNECTING:
if (SSLHandshake(fd(), true) != 0) {
errno = EINVAL;
return -1;
}
break;
case SSL_CONNECTED:
CHECK(false) << "Impossible to reach here";
break;
case SSL_OFF:
break;
}
}
// _ssl_state has been set
if (ssl_state() == SSL_OFF) {
CHECK(_rdma_state == RDMA_OFF);
return _read_buf.append_from_file_descriptor(fd(), size_hint);
}
CHECK_EQ(SSL_CONNECTED, ssl_state());
int ssl_error = 0;
ssize_t nr = _read_buf.append_from_SSL_channel(_ssl_session, &ssl_error, size_hint);
switch (ssl_error) {
case SSL_ERROR_NONE: // `nr' > 0
break;
case SSL_ERROR_WANT_READ:
// Regard this error as EAGAIN
errno = EAGAIN;
break;
case SSL_ERROR_WANT_WRITE:
// Disable renegotiation
errno = EPROTO;
return -1;
default: {
const unsigned long e = ERR_get_error();
if (nr == 0) {
// Socket EOF or SSL session EOF
} else if (e != 0) {
LOG(WARNING) << "Fail to read from ssl_fd=" << fd()
<< ": " << SSLError(e);
errno = ESSL;
} else {
// System error with corresponding errno set
PLOG(WARNING) << "Fail to read from ssl_fd=" << fd();
}
break;
}
}
return nr;
}
int Socket::FightAuthentication(int* auth_error) {
// Use relaxed fence since `bthread_id_trylock' ensures thread safety
// Here `flag_error' just acts like a cache information
uint64_t flag_error = _auth_flag_error.load(butil::memory_order_relaxed);
if (flag_error & AUTH_FLAG) {
// Already authenticated
*auth_error = (int32_t)(flag_error & 0xFFFFFFFFul);
return EINVAL;
}
if (0 == bthread_id_trylock(_auth_id, NULL)) {
// Winner
return 0;
} else {
// Use relaxed fence since `bthread_id_join' has acquire fence to ensure
// `_auth_flag_error' to be the latest value
bthread_id_join(_auth_id);
flag_error = _auth_flag_error.load(butil::memory_order_relaxed);
*auth_error = (int32_t)(flag_error & 0xFFFFFFFFul);
return EINVAL;
}
}
void Socket::SetAuthentication(int error_code) {
uint64_t expected = 0;
// `bthread_id_destroy' has release fence to prevent this CAS being
// reordered after it.
if (_auth_flag_error.compare_exchange_strong(
expected, (AUTH_FLAG | error_code),
butil::memory_order_relaxed)) {
// As expected
if (error_code != 0) {
SetFailed(error_code, "Fail to authenticate %s", description().c_str());
}
CHECK_EQ(0, bthread_id_unlock_and_destroy(_auth_id));
}
}
AuthContext* Socket::mutable_auth_context() {
if (_auth_context != NULL) {
LOG(FATAL) << "Impossible! This function is supposed to be called "
"only once when verification succeeds in server side";
return NULL;
}
_auth_context = new(std::nothrow) AuthContext();
CHECK(_auth_context);
return _auth_context;
}
int Socket::StartInputEvent(SocketId id, uint32_t events,
const bthread_attr_t& thread_attr) {
SocketUniquePtr s;
if (Address(id, &s) < 0) {
return -1;
}
if (NULL == s->_on_edge_triggered_events) {
// Callback can be NULL when receiving error epoll events
// (Added into epoll by `WaitConnected')
return 0;
}
if (s->fd() < 0) {
#if defined(OS_LINUX)
CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;
#elif defined(OS_MACOSX)
CHECK((short)events != EVFILT_READ) << "kqueue filter=" << events;
#endif
return -1;
}
// if (events & has_epollrdhup) {
// s->_eof = 1;
// }
// Passing e[i].events causes complex visibility issues and
// requires stronger memory fences, since reading the fd returns
// error as well, we don't pass the events.
if (s->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
// According to the stats, above fetch_add is very effective. In a
// server processing 1 million requests per second, this counter
// is just 1500~1700/s
g_vars->neventthread << 1;
bthread_t tid;
// transfer ownership as well, don't use s anymore!
Socket* const p = s.release();
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
ProcessEvent(p);
}
}
return 0;
}
void DereferenceSocket(Socket* s) {
if (s) {
s->Dereference();
}
}
void Socket::UpdateStatsEverySecond(int64_t now_ms) {
SharedPart* sp = GetSharedPart();
if (sp) {
sp->UpdateStatsEverySecond(now_ms);
}
}
template <typename T>
struct ObjectPtr {
ObjectPtr(const T* obj) : _obj(obj) {}
const T* _obj;
};
template <typename T>
ObjectPtr<T> ShowObject(const T* obj) { return ObjectPtr<T>(obj); }
template <typename T>
std::ostream& operator<<(std::ostream& os, const ObjectPtr<T>& obj) {
if (obj._obj != NULL) {
os << '(' << butil::class_name_str(*obj._obj) << "*)";
}
return os << obj._obj;
}
void Socket::DebugSocket(std::ostream& os, SocketId id) {
SocketUniquePtr ptr;
int ret = Socket::AddressFailedAsWell(id, &ptr);
if (ret < 0) {
os << "SocketId=" << id << " is invalid or recycled";
return;
} else if (ret > 0) {
// NOTE: Printing a broken socket w/o HC is informational for
// debugging referential issues.
// if (ptr->_health_check_interval_s <= 0) {
// // Sockets without HC will soon be destroyed
// os << "Invalid SocketId=" << id;
// return;
// }
os << "# This is a broken Socket\n";
}
const uint64_t vref = ptr->_versioned_ref.load(butil::memory_order_relaxed);
size_t npipelined = 0;
size_t idsizes[4];
size_t nidsize = 0;
{
BAIDU_SCOPED_LOCK(ptr->_pipeline_mutex);
if (ptr->_pipeline_q) {
npipelined = ptr->_pipeline_q->size();
}
}
{
BAIDU_SCOPED_LOCK(ptr->_id_wait_list_mutex);
if (bthread::get_sizes) {
nidsize = bthread::get_sizes(
&ptr->_id_wait_list, idsizes, arraysize(idsizes));
}
}
const int preferred_index = ptr->preferred_index();
SharedPart* sp = ptr->GetSharedPart();
os << "version=" << VersionOfVRef(vref);
if (sp) {
os << "\nshared_part={\n ref_count=" << sp->ref_count()
<< "\n socket_pool=";
SocketPool* pool = sp->socket_pool.load(butil::memory_order_consume);
if (pool) {
os << '[';
std::vector<SocketId> pooled_sockets;
pool->ListSockets(&pooled_sockets, 0);
for (size_t i = 0; i < pooled_sockets.size(); ++i) {
if (i) {
os << ' ';
}
os << pooled_sockets[i];
}
os << "]\n numfree="
<< pool->_numfree.load(butil::memory_order_relaxed)
<< "\n numinflight="
<< pool->_numinflight.load(butil::memory_order_relaxed);
} else {
os << "null";
}
os << "\n creator_socket=" << sp->creator_socket_id
<< "\n in_size=" << sp->in_size.load(butil::memory_order_relaxed)
<< "\n in_num_messages=" << sp->in_num_messages.load(butil::memory_order_relaxed)
<< "\n out_size=" << sp->out_size.load(butil::memory_order_relaxed)
<< "\n out_num_messages=" << sp->out_num_messages.load(butil::memory_order_relaxed)
<< "\n}";
}
const int fd = ptr->_fd.load(butil::memory_order_relaxed);
os << "\nnref=" << NRefOfVRef(vref) - 1
// ^
// minus the ref of current callsite(calling PrintSocket)
<< "\nnevent=" << ptr->_nevent.load(butil::memory_order_relaxed)
<< "\nfd=" << fd
<< "\ntos=" << ptr->_tos
<< "\nreset_fd_to_now=" << butil::gettimeofday_us() - ptr->_reset_fd_real_us << "us"
<< "\nremote_side=" << ptr->_remote_side
<< "\nlocal_side=" << ptr->_local_side
<< "\non_et_events=" << (void*)ptr->_on_edge_triggered_events
<< "\nuser=" << ShowObject(ptr->_user)
<< "\nthis_id=" << ptr->_this_id
<< "\npreferred_index=" << preferred_index;
InputMessenger* messenger = dynamic_cast<InputMessenger*>(ptr->user());
if (messenger != NULL) {
os << " (" << messenger->NameOfProtocol(preferred_index) << ')';
}
const int64_t cpuwide_now = butil::cpuwide_time_us();
os << "\nhc_count=" << ptr->_hc_count
<< "\navg_input_msg_size=" << ptr->_avg_msg_size
// NOTE: We're assuming that butil::IOBuf.size() is thread-safe, it is now
// however it's not guaranteed.
<< "\nread_buf=" << ptr->_read_buf.size()
<< "\nlast_read_to_now=" << cpuwide_now - ptr->_last_readtime_us << "us"
<< "\nlast_write_to_now=" << cpuwide_now - ptr->_last_writetime_us << "us"
<< "\novercrowded=" << ptr->_overcrowded;
os << "\nid_wait_list={";
for (size_t i = 0; i < nidsize; ++i) {
if (i) {
os << ' ';
}
os << idsizes[i];
}
os << '}';
Destroyable* const parsing_context = ptr->parsing_context();
Describable* parsing_context_desc = dynamic_cast<Describable*>(parsing_context);
if (parsing_context_desc) {
os << "\nparsing_context=" << butil::class_name_str(*parsing_context) << '{';
DescribeOptions opt;
opt.verbose = true;
IndentingOStream os2(os, 2);
parsing_context_desc->Describe(os2, opt);
os << '}';
} else {
os << "\nparsing_context=" << ShowObject(parsing_context);
}
const SSLState ssl_state = ptr->ssl_state();
os << "\npipeline_q=" << npipelined
<< "\nhc_interval_s=" << ptr->_health_check_interval_s
<< "\nis_hc_related_ref_held=" << ptr->_is_hc_related_ref_held
<< "\nninprocess=" << ptr->_ninprocess.load(butil::memory_order_relaxed)
<< "\nauth_flag_error=" << ptr->_auth_flag_error.load(butil::memory_order_relaxed)
<< "\nauth_id=" << ptr->_auth_id.value
<< "\nauth_context=" << ptr->_auth_context
<< "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\n_additional_ref_status="
<< ptr->_additional_ref_status.load(butil::memory_order_relaxed)
<< "\ntotal_streams_buffer_size="
<< ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed)
<< "\nninflight_app_health_check="
<< ptr->_ninflight_app_health_check.load(butil::memory_order_relaxed)
<< "\nagent_socket_id=";
const SocketId asid = ptr->_agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) {
os << asid;
} else {
os << "(none)";
}
os << "\ncid=" << ptr->_correlation_id
<< "\nwrite_head=" << ptr->_write_head.load(butil::memory_order_relaxed)
<< "\nssl_state=" << SSLStateToString(ssl_state);
const SocketSSLContext* ssl_ctx = ptr->_ssl_ctx.get();
if (ssl_ctx) {
os << "\ninitial_ssl_ctx=" << ssl_ctx->raw_ctx;
if (!ssl_ctx->sni_name.empty()) {
os << "\nsni_name=" << ssl_ctx->sni_name;
}
}
if (ssl_state == SSL_CONNECTED) {
os << "\nssl_session={\n ";
Print(os, ptr->_ssl_session, "\n ");
os << "\n}";
}
#if defined(OS_MACOSX)
struct tcp_connection_info ti;
socklen_t len = sizeof(ti);
if (fd >= 0 && getsockopt(fd, IPPROTO_TCP, TCP_CONNECTION_INFO, &ti, &len) == 0) {
os << "\ntcpi={\n state=" << (uint32_t)ti.tcpi_state
<< "\n snd_wscale=" << (uint32_t)ti.tcpi_snd_wscale
<< "\n rcv_wscale=" << (uint32_t)ti.tcpi_rcv_wscale
<< "\n options=" << (uint32_t)ti.tcpi_options
<< "\n flags=" << (uint32_t)ti.tcpi_flags
<< "\n rto=" << ti.tcpi_rto
<< "\n maxseg=" << ti.tcpi_maxseg
<< "\n snd_ssthresh=" << ti.tcpi_snd_ssthresh
<< "\n snd_cwnd=" << ti.tcpi_snd_cwnd
<< "\n snd_wnd=" << ti.tcpi_snd_wnd
<< "\n snd_sbbytes=" << ti.tcpi_snd_sbbytes
<< "\n rcv_wnd=" << ti.tcpi_rcv_wnd
<< "\n srtt=" << ti.tcpi_srtt
<< "\n rttvar=" << ti.tcpi_rttvar
<< "\n}";
}
#elif defined(OS_LINUX)
struct tcp_info ti;
socklen_t len = sizeof(ti);
if (fd >= 0 && getsockopt(fd, SOL_TCP, TCP_INFO, &ti, &len) == 0) {
os << "\ntcpi={\n state=" << (uint32_t)ti.tcpi_state
<< "\n ca_state=" << (uint32_t)ti.tcpi_ca_state
<< "\n retransmits=" << (uint32_t)ti.tcpi_retransmits
<< "\n probes=" << (uint32_t)ti.tcpi_probes
<< "\n backoff=" << (uint32_t)ti.tcpi_backoff
<< "\n options=" << (uint32_t)ti.tcpi_options
<< "\n snd_wscale=" << (uint32_t)ti.tcpi_snd_wscale
<< "\n rcv_wscale=" << (uint32_t)ti.tcpi_rcv_wscale
<< "\n rto=" << ti.tcpi_rto
<< "\n ato=" << ti.tcpi_ato
<< "\n snd_mss=" << ti.tcpi_snd_mss
<< "\n rcv_mss=" << ti.tcpi_rcv_mss
<< "\n unacked=" << ti.tcpi_unacked
<< "\n sacked=" << ti.tcpi_sacked
<< "\n lost=" << ti.tcpi_lost
<< "\n retrans=" << ti.tcpi_retrans
<< "\n fackets=" << ti.tcpi_fackets
<< "\n last_data_sent=" << ti.tcpi_last_data_sent
<< "\n last_ack_sent=" << ti.tcpi_last_ack_sent
<< "\n last_data_recv=" << ti.tcpi_last_data_recv
<< "\n last_ack_recv=" << ti.tcpi_last_ack_recv
<< "\n pmtu=" << ti.tcpi_pmtu
<< "\n rcv_ssthresh=" << ti.tcpi_rcv_ssthresh
<< "\n rtt=" << ti.tcpi_rtt // smoothed
<< "\n rttvar=" << ti.tcpi_rttvar
<< "\n snd_ssthresh=" << ti.tcpi_snd_ssthresh
<< "\n snd_cwnd=" << ti.tcpi_snd_cwnd
<< "\n advmss=" << ti.tcpi_advmss
<< "\n reordering=" << ti.tcpi_reordering
<< "\n}";
}
#endif
#if BRPC_WITH_RDMA
if (ptr->_rdma_state == RDMA_ON && ptr->_rdma_ep) {
ptr->_rdma_ep->DebugInfo(os);
}
#endif
}
int Socket::CheckHealth() {
if (_hc_count == 0) {
LOG(INFO) << "Checking " << *this;
}
const timespec duetime =
butil::milliseconds_from_now(FLAGS_health_check_timeout_ms);
const int connected_fd = Connect(&duetime, NULL, NULL);
if (connected_fd >= 0) {
::close(connected_fd);
return 0;
}
return errno;
}
int Socket::AddStream(StreamId stream_id) {
_stream_mutex.lock();
if (Failed()) {
_stream_mutex.unlock();
return -1;
}
if (_stream_set == NULL) {
_stream_set = new std::set<StreamId>();
}
_stream_set->insert(stream_id);
_stream_mutex.unlock();
return 0;
}
int Socket::RemoveStream(StreamId stream_id) {
_stream_mutex.lock();
if (_stream_set == NULL) {
_stream_mutex.unlock();
CHECK(false) << "AddStream was not called";
return -1;
}
_stream_set->erase(stream_id);
_stream_mutex.unlock();
return 0;
}
void Socket::ResetAllStreams() {
DCHECK(Failed());
std::set<StreamId> saved_stream_set;
_stream_mutex.lock();
if (_stream_set != NULL) {
// Not delete _stream_set because there are likely more streams added
// after reviving if the Socket is still in use, or it is to be deleted in
// OnRecycle()
saved_stream_set.swap(*_stream_set);
}
_stream_mutex.unlock();
for (std::set<StreamId>::const_iterator
it = saved_stream_set.begin(); it != saved_stream_set.end(); ++it) {
Stream::SetFailed(*it);
}
}
int SocketUser::CheckHealth(Socket* ptr) {
return ptr->CheckHealth();
}
void SocketUser::AfterRevived(Socket* ptr) {
LOG(INFO) << "Revived " << *ptr << " (Connectable)";
}
////////// SocketPool //////////////
inline SocketPool::SocketPool(const SocketOptions& opt)
: _options(opt)
, _remote_side(opt.remote_side)
, _numfree(0)
, _numinflight(0) {
}
inline SocketPool::~SocketPool() {
for (std::vector<SocketId>::iterator it = _pool.begin();
it != _pool.end(); ++it) {
SocketUniquePtr ptr;
if (Socket::Address(*it, &ptr) == 0) {
ptr->ReleaseAdditionalReference();
}
}
}
inline int SocketPool::GetSocket(SocketUniquePtr* ptr) {
const int connection_pool_size = FLAGS_max_connection_pool_size;
// In prev rev, SocketPool could be sharded into multiple SubSocketPools to
// reduce thread contentions. The sharding key is mixed from pthread-id so
// that data locality are better kept.
// However sharding also makes the socket more frequently to be created
// and closed, especially in real-world applications that one client
// connects to many servers where one socket is lowly contended, different
// threads accessing the socket may create pooled sockets in different sub
// pools without reusing sockets left in other sub pools, which will
// probably be closed by the CloseIdleConnections thread in socket_map.cpp,
// resulting in frequent-create-and-close of connections.
// Thus the sharding is merely a mechanism only meaningful in benchmarking
// scenarios where one server is connected by one client with many threads.
// Starting from r32203 the sharding capability is removed.
SocketId sid = 0;
if (connection_pool_size > 0) {
for (;;) {
{
BAIDU_SCOPED_LOCK(_mutex);
if (_pool.empty()) {
break;
}
sid = _pool.back();
_pool.pop_back();
}
_numfree.fetch_sub(1, butil::memory_order_relaxed);
// Not address inside the lock since at most time the pooled socket
// is likely to be valid.
if (Socket::Address(sid, ptr) == 0) {
_numinflight.fetch_add(1, butil::memory_order_relaxed);
return 0;
}
}
}
// Not found in pool
SocketOptions opt = _options;
opt.health_check_interval_s = -1;
if (get_client_side_messenger()->Create(opt, &sid) == 0 &&
Socket::Address(sid, ptr) == 0) {
_numinflight.fetch_add(1, butil::memory_order_relaxed);
return 0;
}
return -1;
}
inline void SocketPool::ReturnSocket(Socket* sock) {
// NOTE: save the gflag which may be reloaded at any time.
const int connection_pool_size = FLAGS_max_connection_pool_size;
// Check if the pool is full.
if (_numfree.fetch_add(1, butil::memory_order_relaxed) <
connection_pool_size) {
const SocketId sid = sock->id();
BAIDU_SCOPED_LOCK(_mutex);
_pool.push_back(sid);
} else {
// Cancel the addition and close the pooled socket.
_numfree.fetch_sub(1, butil::memory_order_relaxed);
sock->SetFailed(EUNUSED, "Close unused pooled socket");
}
_numinflight.fetch_sub(1, butil::memory_order_relaxed);
}
inline void SocketPool::ListSockets(std::vector<SocketId>* out, size_t max_count) {
out->clear();
// NOTE: size() of vector is thread-unsafe and may return a very
// large value during resizing.
_mutex.lock();
size_t expected_size = _pool.size();
if (max_count > 0 && max_count < _pool.size()) {
expected_size = max_count;
}
if (out->capacity() < expected_size) {
_mutex.unlock();
out->reserve(expected_size + 4); // pool may add sockets.
_mutex.lock();
}
if (max_count == 0) {
out->insert(out->end(), _pool.begin(), _pool.end());
} else {
for (size_t i = 0; i < expected_size; ++i) {
out->push_back(_pool[i]);
}
}
_mutex.unlock();
}
Socket::SharedPart* Socket::GetOrNewSharedPartSlower() {
// Create _shared_part optimistically.
SharedPart* shared_part = GetSharedPart();
if (shared_part == NULL) {
shared_part = new SharedPart(id());
shared_part->AddRefManually();
SharedPart* expected = NULL;
if (!_shared_part.compare_exchange_strong(
expected, shared_part, butil::memory_order_acq_rel)) {
shared_part->RemoveRefManually();
CHECK(expected);
shared_part = expected;
}
}
return shared_part;
}
void Socket::ShareStats(Socket* main_socket) {
SharedPart* main_sp = main_socket->GetOrNewSharedPart();
main_sp->AddRefManually();
SharedPart* my_sp =
_shared_part.exchange(main_sp, butil::memory_order_acq_rel);
if (my_sp) {
my_sp->RemoveRefManually();
}
}
int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) {
if (pooled_socket == NULL) {
LOG(ERROR) << "pooled_socket is NULL";
return -1;
}
SharedPart* main_sp = GetOrNewSharedPart();
if (main_sp == NULL) {
LOG(ERROR) << "_shared_part is NULL";
return -1;
}
// Create socket_pool optimistically.
SocketPool* socket_pool = main_sp->socket_pool.load(butil::memory_order_consume);
if (socket_pool == NULL) {
SocketOptions opt;
opt.remote_side = remote_side();
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
opt.keytable_pool = _keytable_pool;
opt.app_connect = _app_connect;
opt.use_rdma = (_rdma_ep) ? true : false;
socket_pool = new SocketPool(opt);
SocketPool* expected = NULL;
if (!main_sp->socket_pool.compare_exchange_strong(
expected, socket_pool, butil::memory_order_acq_rel)) {
delete socket_pool;
CHECK(expected);
socket_pool = expected;
}
}
if (socket_pool->GetSocket(pooled_socket) != 0) {
return -1;
}
(*pooled_socket)->ShareStats(this);
CHECK((*pooled_socket)->parsing_context() == NULL)
<< "context=" << (*pooled_socket)->parsing_context()
<< " is not NULL when " << *(*pooled_socket) << " is got from"
" SocketPool, the protocol implementation is buggy";
return 0;
}
int Socket::ReturnToPool() {
SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
if (sp == NULL) {
LOG(ERROR) << "_shared_part is NULL";
SetFailed(EINVAL, "_shared_part is NULL");
return -1;
}
SocketPool* pool = sp->socket_pool.load(butil::memory_order_consume);
if (pool == NULL) {
LOG(ERROR) << "_shared_part->socket_pool is NULL";
SetFailed(EINVAL, "_shared_part->socket_pool is NULL");
sp->RemoveRefManually();
return -1;
}
CHECK(parsing_context() == NULL)
<< "context=" << parsing_context() << " is not released when "
<< *this << " is returned to SocketPool, the protocol "
"implementation is buggy";
// NOTE: be careful with the sequence.
// - related fields must be reset before returning to pool
// - sp must be released after returning to pool because it owns pool
_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
_controller_released_socket.store(false, butil::memory_order_relaxed);
// Reset the write timestamp to make the returned connection live (longer)
// This is useful for using a fake Socket + SocketConnection impl. to integrate
// 3rd-party client into bRPC (like MySQL Client).
_last_writetime_us.store(butil::cpuwide_time_us(), butil::memory_order_relaxed);
pool->ReturnSocket(this);
sp->RemoveRefManually();
return 0;
}
bool Socket::HasSocketPool() const {
SharedPart* sp = GetSharedPart();
if (sp != NULL) {
return sp->socket_pool.load(butil::memory_order_consume) != NULL;
}
return false;
}
void Socket::ListPooledSockets(std::vector<SocketId>* out, size_t max_count) {
out->clear();
SharedPart* sp = GetSharedPart();
if (sp == NULL) {
return;
}
SocketPool* pool = sp->socket_pool.load(butil::memory_order_consume);
if (pool == NULL) {
return;
}
pool->ListSockets(out, max_count);
}
bool Socket::GetPooledSocketStats(int* numfree, int* numinflight) {
SharedPart* sp = GetSharedPart();
if (sp == NULL) {
return false;
}
SocketPool* pool = sp->socket_pool.load(butil::memory_order_consume);
if (pool == NULL) {
return false;
}
*numfree = pool->_numfree.load(butil::memory_order_relaxed);
*numinflight = pool->_numinflight.load(butil::memory_order_relaxed);
return true;
}
int Socket::GetShortSocket(SocketUniquePtr* short_socket) {
if (short_socket == NULL) {
LOG(ERROR) << "short_socket is NULL";
return -1;
}
SocketId id;
SocketOptions opt;
opt.remote_side = remote_side();
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
opt.keytable_pool = _keytable_pool;
opt.app_connect = _app_connect;
opt.use_rdma = (_rdma_ep) ? true : false;
if (get_client_side_messenger()->Create(opt, &id) != 0 ||
Socket::Address(id, short_socket) != 0) {
return -1;
}
(*short_socket)->ShareStats(this);
return 0;
}
int Socket::GetAgentSocket(SocketUniquePtr* out, bool (*checkfn)(Socket*)) {
SocketId id = _agent_socket_id.load(butil::memory_order_relaxed);
SocketUniquePtr tmp_sock;
do {
if (Socket::Address(id, &tmp_sock) == 0) {
if (checkfn == NULL || checkfn(tmp_sock.get())) {
out->swap(tmp_sock);
return 0;
}
tmp_sock->ReleaseAdditionalReference();
}
do {
if (GetShortSocket(&tmp_sock) != 0) {
LOG(ERROR) << "Fail to get short socket from " << *this;
return -1;
}
if (checkfn == NULL || checkfn(tmp_sock.get())) {
break;
}
tmp_sock->ReleaseAdditionalReference();
} while (1);
if (_agent_socket_id.compare_exchange_strong(
id, tmp_sock->id(), butil::memory_order_acq_rel)) {
out->swap(tmp_sock);
return 0;
}
tmp_sock->ReleaseAdditionalReference();
// id was updated, re-address
} while (1);
}
int Socket::PeekAgentSocket(SocketUniquePtr* out) const {
SocketId id = _agent_socket_id.load(butil::memory_order_relaxed);
if (id == INVALID_SOCKET_ID) {
return -1;
}
return Address(id, out);
}
void Socket::GetStat(SocketStat* s) const {
BAIDU_CASSERT(offsetof(Socket, _preferred_index) >= 64, different_cacheline);
BAIDU_CASSERT(sizeof(WriteRequest) == 64, sizeof_write_request_is_64);
SharedPart* sp = GetSharedPart();
if (sp != NULL && sp->extended_stat != NULL) {
*s = *sp->extended_stat;
} else {
memset(s, 0, sizeof(*s));
}
}
void Socket::AddInputBytes(size_t bytes) {
GetOrNewSharedPart()->in_size.fetch_add(bytes, butil::memory_order_relaxed);
}
void Socket::AddInputMessages(size_t count) {
GetOrNewSharedPart()->in_num_messages.fetch_add(count, butil::memory_order_relaxed);
}
void Socket::CancelUnwrittenBytes(size_t bytes) {
const int64_t before_minus =
_unwritten_bytes.fetch_sub(bytes, butil::memory_order_relaxed);
if (before_minus < (int64_t)bytes + FLAGS_socket_max_unwritten_bytes) {
_overcrowded = false;
}
}
void Socket::AddOutputBytes(size_t bytes) {
GetOrNewSharedPart()->out_size.fetch_add(bytes, butil::memory_order_relaxed);
_last_writetime_us.store(butil::cpuwide_time_us(),
butil::memory_order_relaxed);
CancelUnwrittenBytes(bytes);
}
void Socket::AddOutputMessages(size_t count) {
GetOrNewSharedPart()->out_num_messages.fetch_add(count, butil::memory_order_relaxed);
}
SocketId Socket::main_socket_id() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->creator_socket_id;
}
return INVALID_SOCKET_ID;
}
void Socket::OnProgressiveReadCompleted() {
if (is_read_progressive() &&
(_controller_released_socket.load(butil::memory_order_relaxed) ||
_controller_released_socket.exchange(
true, butil::memory_order_relaxed))) {
if (_connection_type_for_progressive_read == CONNECTION_TYPE_POOLED) {
ReturnToPool();
} else if (_connection_type_for_progressive_read == CONNECTION_TYPE_SHORT) {
SetFailed(EUNUSED, "[%s]Close short connection", __FUNCTION__);
}
}
}
std::string Socket::description() const {
// NOTE: The output should be consistent with operator<<()
std::string result;
result.reserve(64);
butil::string_appendf(&result, "Socket{id=%" PRIu64, id());
const int saved_fd = fd();
if (saved_fd >= 0) {
butil::string_appendf(&result, " fd=%d", saved_fd);
}
butil::string_appendf(&result, " addr=%s",
butil::endpoint2str(remote_side()).c_str());
const int local_port = local_side().port;
if (local_port > 0) {
butil::string_appendf(&result, ":%d", local_port);
}
butil::string_appendf(&result, "} (0x%p)", this);
return result;
}
SocketSSLContext::SocketSSLContext()
: raw_ctx(NULL)
{}
SocketSSLContext::~SocketSSLContext() {
if (raw_ctx) {
SSL_CTX_free(raw_ctx);
}
}
} // namespace brpc
namespace std {
ostream& operator<<(ostream& os, const brpc::Socket& sock) {
// NOTE: The output should be consistent with Socket::description()
os << "Socket{id=" << sock.id();
const int fd = sock.fd();
if (fd >= 0) {
os << " fd=" << fd;
}
os << " addr=" << sock.remote_side();
const int local_port = sock.local_side().port;
if (local_port > 0) {
os << ':' << local_port;
}
os << "} (" << (void*)&sock << ')';
return os;
}
}