blob: 87056d1292e57e45dce0d1a8e0efb10596264784 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/reactor.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <mutex>
#include <string>
#include <boost/intrusive/list.hpp>
#include <ev++.h>
#include <glog/logging.h>
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/rpc/client_negotiation.h"
#include "kudu/rpc/connection.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/negotiation.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/server_negotiation.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/debug/sanitizer_scopes.h"
#include "kudu/util/errno.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
// When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop.
// Otherwise we run into problems because 'select' can't handle connections when more than 1024
// file descriptors are open by the process.
#if defined(__APPLE__)
static const int kDefaultLibEvFlags = ev::KQUEUE;
#else
static const int kDefaultLibEvFlags = ev::AUTO;
#endif
using std::string;
using std::shared_ptr;
// TODO(KUDU-1580). This timeout has been bumped from 3 seconds up to
// 15 seconds to workaround a bug. We should drop it back down when
// KUDU-1580 is fixed.
DEFINE_int64(rpc_negotiation_timeout_ms, 15000,
"Timeout for negotiating an RPC connection.");
TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
namespace kudu {
namespace rpc {
namespace {
Status ShutdownError(bool aborted) {
const char* msg = "reactor is shutting down";
return aborted ?
Status::Aborted(msg, "", ESHUTDOWN) :
Status::ServiceUnavailable(msg, "", ESHUTDOWN);
}
} // anonymous namespace
ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
: loop_(kDefaultLibEvFlags),
cur_time_(MonoTime::Now()),
last_unused_tcp_scan_(cur_time_),
reactor_(reactor),
connection_keepalive_time_(bld.connection_keepalive_time_),
coarse_timer_granularity_(bld.coarse_timer_granularity_) {
}
Status ReactorThread::Init() {
DCHECK(thread_.get() == nullptr) << "Already started";
DVLOG(6) << "Called ReactorThread::Init()";
// Register to get async notifications in our epoll loop.
async_.set(loop_);
async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this);
async_.start();
// Register the timer watcher.
// The timer is used for closing old TCP connections and applying
// backpressure.
timer_.set(loop_);
timer_.set<ReactorThread, &ReactorThread::TimerHandler>(this); // NOLINT(*)
timer_.start(coarse_timer_granularity_.ToSeconds(),
coarse_timer_granularity_.ToSeconds());
// Create Reactor thread.
return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_);
}
void ReactorThread::Shutdown() {
CHECK(reactor_->closing()) << "Should be called after setting closing_ flag";
VLOG(1) << name() << ": shutting down Reactor thread.";
WakeThread();
}
void ReactorThread::ShutdownInternal() {
DCHECK(IsCurrentThread());
// Tear down any outbound TCP connections.
Status service_unavailable = ShutdownError(false);
VLOG(1) << name() << ": tearing down outbound TCP connections...";
for (auto c = client_conns_.begin(); c != client_conns_.end();
c = client_conns_.begin()) {
const scoped_refptr<Connection>& conn = (*c).second;
VLOG(1) << name() << ": shutting down " << conn->ToString();
conn->Shutdown(service_unavailable);
client_conns_.erase(c);
}
// Tear down any inbound TCP connections.
VLOG(1) << name() << ": tearing down inbound TCP connections...";
for (const scoped_refptr<Connection>& conn : server_conns_) {
VLOG(1) << name() << ": shutting down " << conn->ToString();
conn->Shutdown(service_unavailable);
}
server_conns_.clear();
// Abort any scheduled tasks.
//
// These won't be found in the ReactorThread's list of pending tasks
// because they've been "run" (that is, they've been scheduled).
Status aborted = ShutdownError(true); // aborted
for (DelayedTask* task : scheduled_tasks_) {
task->Abort(aborted); // should also free the task.
}
scheduled_tasks_.clear();
}
ReactorTask::ReactorTask() {
}
ReactorTask::~ReactorTask() {
}
Status ReactorThread::GetMetrics(ReactorMetrics *metrics) {
DCHECK(IsCurrentThread());
metrics->num_client_connections_ = client_conns_.size();
metrics->num_server_connections_ = server_conns_.size();
return Status::OK();
}
Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
DumpRunningRpcsResponsePB* resp) {
DCHECK(IsCurrentThread());
for (const scoped_refptr<Connection>& conn : server_conns_) {
RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
}
for (const conn_map_t::value_type& entry : client_conns_) {
Connection* conn = entry.second.get();
RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
}
return Status::OK();
}
void ReactorThread::WakeThread() {
// libev uses some lock-free synchronization, but doesn't have TSAN annotations.
// See http://lists.schmorp.de/pipermail/libev/2013q2/002178.html or KUDU-366
// for examples.
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
async_.send();
}
// Handle async events. These events are sent to the reactor by other
// threads that want to bring something to our attention, like the fact that
// we're shutting down, or the fact that there is a new outbound Transfer
// ready to send.
void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) {
DCHECK(IsCurrentThread());
if (PREDICT_FALSE(reactor_->closing())) {
ShutdownInternal();
loop_.break_loop(); // break the epoll loop and terminate the thread
return;
}
boost::intrusive::list<ReactorTask> tasks;
reactor_->DrainTaskQueue(&tasks);
while (!tasks.empty()) {
ReactorTask& task = tasks.front();
tasks.pop_front();
task.Run(this);
}
}
void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
DCHECK(IsCurrentThread());
Status s = StartConnectionNegotiation(conn);
if (PREDICT_FALSE(!s.ok())) {
LOG(ERROR) << "Server connection negotiation failed: " << s.ToString();
DestroyConnection(conn.get(), s);
return;
}
server_conns_.emplace_back(std::move(conn));
}
void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
DCHECK(IsCurrentThread());
scoped_refptr<Connection> conn;
Status s = FindOrStartConnection(call->conn_id(), &conn);
if (PREDICT_FALSE(!s.ok())) {
call->SetFailed(s);
return;
}
conn->QueueOutboundCall(call);
}
//
// Handles timer events. The periodic timer:
//
// 1. updates Reactor::cur_time_
// 2. every tcp_conn_timeo_ seconds, close down connections older than
// tcp_conn_timeo_ seconds.
//
void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
DCHECK(IsCurrentThread());
if (EV_ERROR & revents) {
LOG(WARNING) << "Reactor " << name() << " got an error in "
"the timer handler.";
return;
}
MonoTime now(MonoTime::Now());
cur_time_ = now;
ScanIdleConnections();
}
void ReactorThread::RegisterTimeout(ev::timer *watcher) {
watcher->set(loop_);
}
void ReactorThread::ScanIdleConnections() {
DCHECK(IsCurrentThread());
// enforce TCP connection timeouts
auto c = server_conns_.begin();
auto c_end = server_conns_.end();
uint64_t timed_out = 0;
for (; c != c_end; ) {
const scoped_refptr<Connection>& conn = *c;
if (!conn->Idle()) {
VLOG(10) << "Connection " << conn->ToString() << " not idle";
++c; // TODO(todd): clean up this loop
continue;
}
MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
if (connection_delta > connection_keepalive_time_) {
conn->Shutdown(Status::NetworkError(
StringPrintf("connection timed out after %s seconds",
connection_keepalive_time_.ToString().c_str())));
VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
<< connection_delta.ToSeconds() << "s";
server_conns_.erase(c++);
++timed_out;
} else {
++c;
}
}
// TODO: above only times out on the server side.
// Clients may want to set their keepalive timeout as well.
VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections.";
}
const std::string& ReactorThread::name() const {
return reactor_->name();
}
MonoTime ReactorThread::cur_time() const {
return cur_time_;
}
Reactor *ReactorThread::reactor() {
return reactor_;
}
bool ReactorThread::IsCurrentThread() const {
return thread_.get() == kudu::Thread::current_thread();
}
void ReactorThread::RunThread() {
ThreadRestrictions::SetWaitAllowed(false);
ThreadRestrictions::SetIOAllowed(false);
DVLOG(6) << "Calling ReactorThread::RunThread()...";
loop_.run(0);
VLOG(1) << name() << " thread exiting.";
// No longer need the messenger. This causes the messenger to
// get deleted when all the reactors exit.
reactor_->messenger_.reset();
}
Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
scoped_refptr<Connection>* conn) {
DCHECK(IsCurrentThread());
conn_map_t::const_iterator c = client_conns_.find(conn_id);
if (c != client_conns_.end()) {
*conn = (*c).second;
return Status::OK();
}
// No connection to this remote. Need to create one.
VLOG(2) << name() << " FindOrStartConnection: creating "
<< "new connection for " << conn_id.remote().ToString();
// Create a new socket and start connecting to the remote.
Socket sock;
RETURN_NOT_OK(CreateClientSocket(&sock));
bool connect_in_progress;
RETURN_NOT_OK(StartConnect(&sock, conn_id.remote(), &connect_in_progress));
std::unique_ptr<Socket> new_socket(new Socket(sock.Release()));
// Register the new connection in our map.
*conn = new Connection(this, conn_id.remote(), std::move(new_socket), Connection::CLIENT);
(*conn)->set_local_user_credentials(conn_id.user_credentials());
// Kick off blocking client connection negotiation.
Status s = StartConnectionNegotiation(*conn);
if (s.IsIllegalState()) {
// Return a nicer error message to the user indicating -- if we just
// forward the status we'd get something generic like "ThreadPool is closing".
return Status::ServiceUnavailable("Client RPC Messenger shutting down");
}
// Propagate any other errors as-is.
RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread");
// Insert into the client connection map to avoid duplicate connection requests.
client_conns_.insert(conn_map_t::value_type(conn_id, *conn));
return Status::OK();
}
Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>& conn) {
DCHECK(IsCurrentThread());
// Set a limit on how long the server will negotiate with a new client.
MonoTime deadline = MonoTime::Now() +
MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms);
scoped_refptr<Trace> trace(new Trace());
ADOPT_TRACE(trace.get());
TRACE("Submitting negotiation task for $0", conn->ToString());
auto authentication = reactor()->messenger()->authentication();
auto encryption = reactor()->messenger()->encryption();
RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure(
Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
return Status::OK();
}
void ReactorThread::CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
const Status& status) {
DCHECK(IsCurrentThread());
if (PREDICT_FALSE(!status.ok())) {
DestroyConnection(conn.get(), status);
return;
}
// Switch the socket back to non-blocking mode after negotiation.
Status s = conn->SetNonBlocking(true);
if (PREDICT_FALSE(!s.ok())) {
LOG(DFATAL) << "Unable to set connection to non-blocking mode: " << s.ToString();
DestroyConnection(conn.get(), s);
return;
}
conn->MarkNegotiationComplete();
conn->EpollRegister(loop_);
}
Status ReactorThread::CreateClientSocket(Socket *sock) {
Status ret = sock->Init(Socket::FLAG_NONBLOCKING);
if (ret.ok()) {
ret = sock->SetNoDelay(true);
}
LOG_IF(WARNING, !ret.ok())
<< "failed to create an outbound connection because a new socket could not be created: "
<< ret.ToString();
return ret;
}
Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote, bool *in_progress) {
Status ret = sock->Connect(remote);
if (ret.ok()) {
VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString();
*in_progress = false; // connect() finished immediately.
return Status::OK();
}
int posix_code = ret.posix_code();
if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) {
VLOG(3) << "StartConnect: connect in progress for " << remote.ToString();
*in_progress = true; // The connect operation is in progress.
return Status::OK();
}
LOG(WARNING) << "Failed to create an outbound connection to " << remote.ToString()
<< " because connect() failed: " << ret.ToString();
return ret;
}
void ReactorThread::DestroyConnection(Connection *conn,
const Status& conn_status) {
DCHECK(IsCurrentThread());
conn->Shutdown(conn_status);
// Unlink connection from lists.
if (conn->direction() == Connection::CLIENT) {
ConnectionId conn_id(conn->remote(), conn->local_user_credentials());
auto it = client_conns_.find(conn_id);
CHECK(it != client_conns_.end()) << "Couldn't find connection " << conn->ToString();
client_conns_.erase(it);
} else if (conn->direction() == Connection::SERVER) {
auto it = server_conns_.begin();
while (it != server_conns_.end()) {
if ((*it).get() == conn) {
server_conns_.erase(it);
break;
}
++it;
}
}
}
DelayedTask::DelayedTask(boost::function<void(const Status&)> func,
MonoDelta when)
: func_(std::move(func)),
when_(when),
thread_(nullptr) {
}
void DelayedTask::Run(ReactorThread* thread) {
DCHECK(thread_ == nullptr) << "Task has already been scheduled";
DCHECK(thread->IsCurrentThread());
// Schedule the task to run later.
thread_ = thread;
timer_.set(thread->loop_);
timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this);
timer_.start(when_.ToSeconds(), // after
0); // repeat
thread_->scheduled_tasks_.insert(this);
}
void DelayedTask::Abort(const Status& abort_status) {
func_(abort_status);
delete this;
}
void DelayedTask::TimerHandler(ev::timer& watcher, int revents) {
// We will free this task's memory.
thread_->scheduled_tasks_.erase(this);
if (EV_ERROR & revents) {
string msg = "Delayed task got an error in its timer handler";
LOG(WARNING) << msg;
Abort(Status::Aborted(msg)); // Will delete 'this'.
} else {
func_(Status::OK());
delete this;
}
}
Reactor::Reactor(shared_ptr<Messenger> messenger,
int index, const MessengerBuilder& bld)
: messenger_(std::move(messenger)),
name_(StringPrintf("%s_R%03d", messenger_->name().c_str(), index)),
closing_(false),
thread_(this, bld) {
}
Status Reactor::Init() {
DVLOG(6) << "Called Reactor::Init()";
return thread_.Init();
}
void Reactor::Shutdown() {
{
std::lock_guard<LockType> l(lock_);
if (closing_) {
return;
}
closing_ = true;
}
thread_.Shutdown();
// Abort all pending tasks. No new tasks can get scheduled after this
// because ScheduleReactorTask() tests the closing_ flag set above.
Status aborted = ShutdownError(true);
while (!pending_tasks_.empty()) {
ReactorTask& task = pending_tasks_.front();
pending_tasks_.pop_front();
task.Abort(aborted);
}
}
Reactor::~Reactor() {
Shutdown();
}
const std::string& Reactor::name() const {
return name_;
}
bool Reactor::closing() const {
std::lock_guard<LockType> l(lock_);
return closing_;
}
// Task to call an arbitrary function within the reactor thread.
class RunFunctionTask : public ReactorTask {
public:
explicit RunFunctionTask(boost::function<Status()> f)
: function_(std::move(f)), latch_(1) {}
void Run(ReactorThread* /*reactor*/) override {
status_ = function_();
latch_.CountDown();
}
void Abort(const Status& status) override {
status_ = status;
latch_.CountDown();
}
// Wait until the function has completed, and return the Status
// returned by the function.
Status Wait() {
latch_.Wait();
return status_;
}
private:
boost::function<Status()> function_;
Status status_;
CountDownLatch latch_;
};
Status Reactor::GetMetrics(ReactorMetrics *metrics) {
return RunOnReactorThread(boost::bind(&ReactorThread::GetMetrics, &thread_, metrics));
}
Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) {
RunFunctionTask task(f);
ScheduleReactorTask(&task);
return task.Wait();
}
Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
DumpRunningRpcsResponsePB* resp) {
return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
boost::ref(req), resp));
}
class RegisterConnectionTask : public ReactorTask {
public:
explicit RegisterConnectionTask(scoped_refptr<Connection> conn)
: conn_(std::move(conn)) {
}
void Run(ReactorThread* reactor) override {
reactor->RegisterConnection(std::move(conn_));
delete this;
}
void Abort(const Status& /*status*/) override {
// We don't need to Shutdown the connection since it was never registered.
// This is only used for inbound connections, and inbound connections will
// never have any calls added to them until they've been registered.
delete this;
}
private:
scoped_refptr<Connection> conn_;
};
void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) {
VLOG(3) << name_ << ": new inbound connection to " << remote.ToString();
std::unique_ptr<Socket> new_socket(new Socket(socket->Release()));
auto task = new RegisterConnectionTask(
new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER));
ScheduleReactorTask(task);
}
// Task which runs in the reactor thread to assign an outbound call
// to a connection.
class AssignOutboundCallTask : public ReactorTask {
public:
explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call)
: call_(std::move(call)) {}
void Run(ReactorThread* reactor) override {
reactor->AssignOutboundCall(call_);
delete this;
}
void Abort(const Status& status) override {
call_->SetFailed(status);
delete this;
}
private:
shared_ptr<OutboundCall> call_;
};
void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
DVLOG(3) << name_ << ": queueing outbound call "
<< call->ToString() << " to remote " << call->conn_id().remote().ToString();
AssignOutboundCallTask *task = new AssignOutboundCallTask(call);
ScheduleReactorTask(task);
}
void Reactor::ScheduleReactorTask(ReactorTask *task) {
{
std::unique_lock<LockType> l(lock_);
if (closing_) {
// We guarantee the reactor lock is not taken when calling Abort().
l.unlock();
task->Abort(ShutdownError(false));
return;
}
pending_tasks_.push_back(*task);
}
thread_.WakeThread();
}
bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
std::lock_guard<LockType> l(lock_);
if (closing_) {
return false;
}
tasks->swap(pending_tasks_);
return true;
}
} // namespace rpc
} // namespace kudu