blob: f2d1c0871c2397e284d1331ace919b2958ecea4a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <inttypes.h>
#include <gflags/gflags.h>
#include "butil/fd_guard.h" // fd_guard
#include "butil/fd_utility.h" // make_close_on_exec
#include "butil/time.h" // gettimeofday_us
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/acceptor.h"
namespace brpc {
static const int INITIAL_CONNECTION_CAP = 65536;
Acceptor::Acceptor(bthread_keytable_pool_t* pool)
: InputMessenger()
, _keytable_pool(pool)
, _status(UNINITIALIZED)
, _idle_timeout_sec(-1)
, _close_idle_tid(INVALID_BTHREAD)
, _listened_fd(-1)
, _acception_id(0)
, _empty_cond(&_map_mutex)
, _force_ssl(false)
, _ssl_ctx(NULL)
, _use_rdma(false) {
}
Acceptor::~Acceptor() {
StopAccept(0);
Join();
}
int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool force_ssl) {
if (listened_fd < 0) {
LOG(FATAL) << "Invalid listened_fd=" << listened_fd;
return -1;
}
if (!ssl_ctx && force_ssl) {
LOG(ERROR) << "Fail to force SSL for all connections "
" because ssl_ctx is NULL";
return -1;
}
BAIDU_SCOPED_LOCK(_map_mutex);
if (_status == UNINITIALIZED) {
if (Initialize() != 0) {
LOG(FATAL) << "Fail to initialize Acceptor";
return -1;
}
_status = READY;
}
if (_status != READY) {
LOG(FATAL) << "Acceptor hasn't stopped yet: status=" << status();
return -1;
}
if (idle_timeout_sec > 0) {
if (bthread_start_background(&_close_idle_tid, NULL,
CloseIdleConnections, this) != 0) {
LOG(FATAL) << "Fail to start bthread";
return -1;
}
}
_idle_timeout_sec = idle_timeout_sec;
_force_ssl = force_ssl;
_ssl_ctx = ssl_ctx;
// Creation of _acception_id is inside lock so that OnNewConnections
// (which may run immediately) should see sane fields set below.
SocketOptions options;
options.fd = listened_fd;
options.user = this;
options.on_edge_triggered_events = OnNewConnections;
if (Socket::Create(options, &_acception_id) != 0) {
// Close-idle-socket thread will be stopped inside destructor
LOG(FATAL) << "Fail to create _acception_id";
return -1;
}
_listened_fd = listened_fd;
_status = RUNNING;
return 0;
}
void* Acceptor::CloseIdleConnections(void* arg) {
Acceptor* am = static_cast<Acceptor*>(arg);
std::vector<SocketId> checking_fds;
const uint64_t CHECK_INTERVAL_US = 1000000UL;
while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
// TODO: this is not efficient for a lot of connections(>100K)
am->ListConnections(&checking_fds);
for (size_t i = 0; i < checking_fds.size(); ++i) {
SocketUniquePtr s;
if (Socket::Address(checking_fds[i], &s) == 0) {
s->ReleaseReferenceIfIdle(am->_idle_timeout_sec);
}
}
}
return NULL;
}
void Acceptor::StopAccept(int /*closewait_ms*/) {
// Currently `closewait_ms' is useless since we have to wait until
// existing requests are finished. Otherwise, contexts depended by
// the requests may be deleted and invalid.
{
BAIDU_SCOPED_LOCK(_map_mutex);
if (_status != RUNNING) {
return;
}
_status = STOPPING;
}
// Don't set _acception_id to 0 because BeforeRecycle needs it.
Socket::SetFailed(_acception_id);
// SetFailed all existing connections. Connections added after this piece
// of code will be SetFailed directly in OnNewConnectionsUntilEAGAIN
std::vector<SocketId> erasing_ids;
ListConnections(&erasing_ids);
for (size_t i = 0; i < erasing_ids.size(); ++i) {
SocketUniquePtr socket;
if (Socket::Address(erasing_ids[i], &socket) == 0) {
if (socket->shall_fail_me_at_server_stop()) {
// Mainly streaming connections, should be SetFailed() to
// trigger callbacks to NotifyOnFailed() to remove references,
// otherwise the sockets are often referenced by corresponding
// objects and delay server's stopping which requires all
// existing sockets to be recycled.
socket->SetFailed(ELOGOFF, "Server is stopping");
} else {
// Message-oriented RPC connections. Just release the addtional
// reference in the socket, which will be recycled when current
// requests have been processed.
socket->ReleaseAdditionalReference();
}
} // else: This socket already called `SetFailed' before
}
}
int Acceptor::Initialize() {
if (_socket_map.init(INITIAL_CONNECTION_CAP) != 0) {
LOG(FATAL) << "Fail to initialize FlatMap, size="
<< INITIAL_CONNECTION_CAP;
return -1;
}
return 0;
}
// NOTE: Join() can happen before StopAccept()
void Acceptor::Join() {
std::unique_lock<butil::Mutex> mu(_map_mutex);
if (_status != STOPPING && _status != RUNNING) { // no need to join.
return;
}
// `_listened_fd' will be set to -1 once it has been recycled
while (_listened_fd > 0 || !_socket_map.empty()) {
_empty_cond.Wait();
}
const int saved_idle_timeout_sec = _idle_timeout_sec;
_idle_timeout_sec = 0;
const bthread_t saved_close_idle_tid = _close_idle_tid;
mu.unlock();
// Join the bthread outside lock.
if (saved_idle_timeout_sec > 0) {
bthread_stop(saved_close_idle_tid);
bthread_join(saved_close_idle_tid, NULL);
}
{
BAIDU_SCOPED_LOCK(_map_mutex);
_status = READY;
}
}
size_t Acceptor::ConnectionCount() const {
// Notice that _socket_map may be modified concurrently. This actually
// assumes that size() is safe to call concurrently.
return _socket_map.size();
}
void Acceptor::ListConnections(std::vector<SocketId>* conn_list,
size_t max_copied) {
if (conn_list == NULL) {
LOG(FATAL) << "Param[conn_list] is NULL";
return;
}
conn_list->clear();
// Add additional 10(randomly small number) so that even if
// ConnectionCount is inaccurate, enough space is reserved
conn_list->reserve(ConnectionCount() + 10);
std::unique_lock<butil::Mutex> mu(_map_mutex);
if (!_socket_map.initialized()) {
// Optional. Uninitialized FlatMap should be iteratable.
return;
}
// Copy all the SocketId (protected by mutex) into a temporary
// container to avoid dealing with sockets inside the mutex.
size_t ntotal = 0;
size_t n = 0;
for (SocketMap::const_iterator it = _socket_map.begin();
it != _socket_map.end(); ++it, ++ntotal) {
if (ntotal >= max_copied) {
return;
}
if (++n >= 256/*max iterated one pass*/) {
SocketMap::PositionHint hint;
_socket_map.save_iterator(it, &hint);
n = 0;
mu.unlock(); // yield
mu.lock();
it = _socket_map.restore_iterator(hint);
if (it == _socket_map.begin()) { // resized
conn_list->clear();
}
if (it == _socket_map.end()) {
break;
}
}
conn_list->push_back(it->first);
}
}
void Acceptor::ListConnections(std::vector<SocketId>* conn_list) {
return ListConnections(conn_list, std::numeric_limits<size_t>::max());
}
void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
while (1) {
struct sockaddr_storage in_addr;
bzero(&in_addr, sizeof(in_addr));
socklen_t in_len = sizeof(in_addr);
butil::fd_guard in_fd(accept(acception->fd(), (sockaddr*)&in_addr, &in_len));
if (in_fd < 0) {
// no EINTR because listened fd is non-blocking.
if (errno == EAGAIN) {
return;
}
// Do NOT return -1 when `accept' failed, otherwise `_listened_fd'
// will be closed. Continue to consume all the events until EAGAIN
// instead.
// If the accept was failed, the error may repeat constantly,
// limit frequency of logging.
PLOG_EVERY_SECOND(ERROR)
<< "Fail to accept from listened_fd=" << acception->fd();
continue;
}
Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
if (NULL == am) {
LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
return;
}
SocketId socket_id;
SocketOptions options;
options.keytable_pool = am->_keytable_pool;
options.fd = in_fd;
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
options.user = acception->user();
options.force_ssl = am->_force_ssl;
options.initial_ssl_ctx = am->_ssl_ctx;
#if BRPC_WITH_RDMA
if (am->_use_rdma) {
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
} else {
#else
{
#endif
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
}
options.use_rdma = am->_use_rdma;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
}
in_fd.release(); // transfer ownership to socket_id
// There's a funny race condition here. After Socket::Create, messages
// from the socket are already handled and a RPC is possibly done
// before the socket is added into _socket_map below. This is found in
// ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
// on machines with few cores) where the _messenger.ConnectionCount()
// may surprisingly be 0 even if the RPC is already done.
SocketUniquePtr sock;
if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
bool is_running = true;
{
BAIDU_SCOPED_LOCK(am->_map_mutex);
is_running = (am->status() == RUNNING);
// Always add this socket into `_socket_map' whether it
// has been `SetFailed' or not, whether `Acceptor' is
// running or not. Otherwise, `Acceptor::BeforeRecycle'
// may be called (inside Socket::OnRecycle) after `Acceptor'
// has been destroyed
am->_socket_map.insert(socket_id, ConnectStatistics());
}
if (!is_running) {
LOG(WARNING) << "Acceptor on fd=" << acception->fd()
<< " has been stopped, discard newly created " << *sock;
sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, "
"discard newly created %s", acception->fd(),
sock->description().c_str());
return;
}
} // else: The socket has already been destroyed, Don't add its id
// into _socket_map
}
}
void Acceptor::OnNewConnections(Socket* acception) {
int progress = Socket::PROGRESS_INIT;
do {
OnNewConnectionsUntilEAGAIN(acception);
if (acception->Failed()) {
return;
}
} while (acception->MoreReadEvents(&progress));
}
void Acceptor::BeforeRecycle(Socket* sock) {
BAIDU_SCOPED_LOCK(_map_mutex);
if (sock->id() == _acception_id) {
// Set _listened_fd to -1 when acception socket has been recycled
// so that we are ensured no more events will arrive (and `Join'
// will return to its caller)
_listened_fd = -1;
_empty_cond.Broadcast();
return;
}
// If a Socket could not be addressed shortly after its creation, it
// was not added into `_socket_map'.
_socket_map.erase(sock->id());
if (_socket_map.empty()) {
_empty_cond.Broadcast();
}
}
} // namespace brpc