blob: b811911b1219585d187d98fb7b938f6aeaa42ecd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/connection.h"
#include <algorithm>
#include <boost/intrusive/list.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <iostream>
#include <set>
#include <stdint.h>
#include <string>
#include <vector>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/auth_store.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/constants.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/reactor.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/sasl_client.h"
#include "kudu/rpc/sasl_server.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
#include "kudu/util/trace.h"
using std::function;
using std::includes;
using std::set;
using std::shared_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace rpc {
///
/// Connection
///
Connection::Connection(ReactorThread *reactor_thread, Sockaddr remote,
int socket, Direction direction)
: reactor_thread_(reactor_thread),
socket_(socket),
remote_(std::move(remote)),
direction_(direction),
last_activity_time_(MonoTime::Now(MonoTime::FINE)),
is_epoll_registered_(false),
next_call_id_(1),
sasl_client_(kSaslAppName, socket),
sasl_server_(kSaslAppName, socket),
negotiation_complete_(false) {}
Status Connection::SetNonBlocking(bool enabled) {
return socket_.SetNonBlocking(enabled);
}
void Connection::EpollRegister(ev::loop_ref& loop) {
DCHECK(reactor_thread_->IsCurrentThread());
DVLOG(4) << "Registering connection for epoll: " << ToString();
write_io_.set(loop);
write_io_.set(socket_.GetFd(), ev::WRITE);
write_io_.set<Connection, &Connection::WriteHandler>(this);
if (direction_ == CLIENT && negotiation_complete_) {
write_io_.start();
}
read_io_.set(loop);
read_io_.set(socket_.GetFd(), ev::READ);
read_io_.set<Connection, &Connection::ReadHandler>(this);
read_io_.start();
is_epoll_registered_ = true;
}
Connection::~Connection() {
// Must clear the outbound_transfers_ list before deleting.
CHECK(outbound_transfers_.begin() == outbound_transfers_.end());
// It's crucial that the connection is Shutdown first -- otherwise
// our destructor will end up calling read_io_.stop() and write_io_.stop()
// from a possibly non-reactor thread context. This can then make all
// hell break loose with libev.
CHECK(!is_epoll_registered_);
}
bool Connection::Idle() const {
DCHECK(reactor_thread_->IsCurrentThread());
// check if we're in the middle of receiving something
InboundTransfer *transfer = inbound_.get();
if (transfer && (transfer->TransferStarted())) {
return false;
}
// check if we still need to send something
if (!outbound_transfers_.empty()) {
return false;
}
// can't kill a connection if calls are waiting response
if (!awaiting_response_.empty()) {
return false;
}
if (!calls_being_handled_.empty()) {
return false;
}
// We are not idle if we are in the middle of connection negotiation.
if (!negotiation_complete_) {
return false;
}
return true;
}
void Connection::Shutdown(const Status &status) {
DCHECK(reactor_thread_->IsCurrentThread());
shutdown_status_ = status;
if (inbound_ && inbound_->TransferStarted()) {
double secs_since_active = reactor_thread_->cur_time()
.GetDeltaSince(last_activity_time_).ToSeconds();
LOG(WARNING) << "Shutting down connection " << ToString() << " with pending inbound data ("
<< inbound_->StatusAsString() << ", last active "
<< HumanReadableElapsedTime::ToShortString(secs_since_active)
<< " ago, status=" << status.ToString() << ")";
}
// Clear any calls which have been sent and were awaiting a response.
for (const car_map_t::value_type &v : awaiting_response_) {
CallAwaitingResponse *c = v.second;
if (c->call) {
c->call->SetFailed(status);
}
// And we must return the CallAwaitingResponse to the pool
car_pool_.Destroy(c);
}
awaiting_response_.clear();
// Clear any outbound transfers.
while (!outbound_transfers_.empty()) {
OutboundTransfer *t = &outbound_transfers_.front();
outbound_transfers_.pop_front();
delete t;
}
read_io_.stop();
write_io_.stop();
is_epoll_registered_ = false;
WARN_NOT_OK(socket_.Close(), "Error closing socket");
}
void Connection::QueueOutbound(gscoped_ptr<OutboundTransfer> transfer) {
DCHECK(reactor_thread_->IsCurrentThread());
if (!shutdown_status_.ok()) {
// If we've already shut down, then we just need to abort the
// transfer rather than bothering to queue it.
transfer->Abort(shutdown_status_);
return;
}
DVLOG(3) << "Queueing transfer: " << transfer->HexDump();
outbound_transfers_.push_back(*transfer.release());
if (negotiation_complete_ && !write_io_.is_active()) {
// If we weren't currently in the middle of sending anything,
// then our write_io_ interest is stopped. Need to re-start it.
// Only do this after connection negotiation is done doing its work.
write_io_.start();
}
}
Connection::CallAwaitingResponse::~CallAwaitingResponse() {
DCHECK(conn->reactor_thread_->IsCurrentThread());
}
void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int revents) {
conn->HandleOutboundCallTimeout(this);
}
void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
DCHECK(reactor_thread_->IsCurrentThread());
DCHECK(car->call);
// The timeout timer is stopped by the car destructor exiting Connection::HandleCallResponse()
DCHECK(!car->call->IsFinished());
// Mark the call object as failed.
car->call->SetTimedOut();
// Drop the reference to the call. If the original caller has moved on after
// seeing the timeout, we no longer need to hold onto the allocated memory
// from the request.
car->call.reset();
// We still leave the CallAwaitingResponse in the map -- this is because we may still
// receive a response from the server, and we don't want a spurious log message
// when we do finally receive the response. The fact that CallAwaitingResponse::call
// is a NULL pointer indicates to the response processing code that the call
// already timed out.
}
// Callbacks after sending a call on the wire.
// This notifies the OutboundCall object to change its state to SENT once it
// has been fully transmitted.
struct CallTransferCallbacks : public TransferCallbacks {
public:
explicit CallTransferCallbacks(shared_ptr<OutboundCall> call)
: call_(std::move(call)) {}
virtual void NotifyTransferFinished() OVERRIDE {
// TODO: would be better to cancel the transfer while it is still on the queue if we
// timed out before the transfer started, but there is still a race in the case of
// a partial send that we have to handle here
if (call_->IsFinished()) {
DCHECK(call_->IsTimedOut());
} else {
call_->SetSent();
}
delete this;
}
virtual void NotifyTransferAborted(const Status &status) OVERRIDE {
VLOG(1) << "Connection torn down before " <<
call_->ToString() << " could send its call: " << status.ToString();
delete this;
}
private:
shared_ptr<OutboundCall> call_;
};
void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
DCHECK(call);
DCHECK_EQ(direction_, CLIENT);
DCHECK(reactor_thread_->IsCurrentThread());
if (PREDICT_FALSE(!shutdown_status_.ok())) {
// Already shutdown
call->SetFailed(shutdown_status_);
return;
}
// At this point the call has a serialized request, but no call header, since we haven't
// yet assigned a call ID.
DCHECK(!call->call_id_assigned());
// Assign the call ID.
int32_t call_id = GetNextCallId();
call->set_call_id(call_id);
// Serialize the actual bytes to be put on the wire.
slices_tmp_.clear();
Status s = call->SerializeTo(&slices_tmp_);
if (PREDICT_FALSE(!s.ok())) {
call->SetFailed(s);
return;
}
call->SetQueued();
scoped_car car(car_pool_.make_scoped_ptr(car_pool_.Construct()));
car->conn = this;
car->call = call;
// Set up the timeout timer.
const MonoDelta &timeout = call->controller()->timeout();
if (timeout.Initialized()) {
reactor_thread_->RegisterTimeout(&car->timeout_timer);
car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*)
&CallAwaitingResponse::HandleTimeout>(car.get());
car->timeout_timer.set(timeout.ToSeconds(), 0);
car->timeout_timer.start();
}
TransferCallbacks *cb = new CallTransferCallbacks(call);
awaiting_response_[call_id] = car.release();
QueueOutbound(gscoped_ptr<OutboundTransfer>(
new OutboundTransfer(call_id, slices_tmp_, call->RequiredRpcFeatures(), cb)));
}
// Callbacks for sending an RPC call response from the server.
// This takes ownership of the InboundCall object so that, once it has
// been responded to, we can free up all of the associated memory.
struct ResponseTransferCallbacks : public TransferCallbacks {
public:
ResponseTransferCallbacks(gscoped_ptr<InboundCall> call,
Connection *conn) :
call_(std::move(call)),
conn_(conn)
{}
~ResponseTransferCallbacks() {
// Remove the call from the map.
InboundCall *call_from_map = EraseKeyReturnValuePtr(
&conn_->calls_being_handled_, call_->call_id());
DCHECK_EQ(call_from_map, call_.get());
}
virtual void NotifyTransferFinished() OVERRIDE {
delete this;
}
virtual void NotifyTransferAborted(const Status &status) OVERRIDE {
LOG(WARNING) << "Connection torn down before " <<
call_->ToString() << " could send its response";
delete this;
}
private:
gscoped_ptr<InboundCall> call_;
Connection *conn_;
};
// Reactor task which puts a transfer on the outbound transfer queue.
class QueueTransferTask : public ReactorTask {
public:
QueueTransferTask(gscoped_ptr<OutboundTransfer> transfer,
Connection *conn)
: transfer_(std::move(transfer)),
conn_(conn)
{}
virtual void Run(ReactorThread *thr) OVERRIDE {
conn_->QueueOutbound(std::move(transfer_));
delete this;
}
virtual void Abort(const Status &status) OVERRIDE {
transfer_->Abort(status);
delete this;
}
private:
gscoped_ptr<OutboundTransfer> transfer_;
Connection *conn_;
};
void Connection::QueueResponseForCall(gscoped_ptr<InboundCall> call) {
// This is usually called by the IPC worker thread when the response
// is set, but in some circumstances may also be called by the
// reactor thread (e.g. if the service has shut down)
DCHECK_EQ(direction_, SERVER);
// If the connection is torn down, then the QueueOutbound() call that
// eventually runs in the reactor thread will take care of calling
// ResponseTransferCallbacks::NotifyTransferAborted.
std::vector<Slice> slices;
call->SerializeResponseTo(&slices);
TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
// After the response is sent, can delete the InboundCall object.
// We set a dummy call ID and required feature set, since these are not needed
// when sending responses.
gscoped_ptr<OutboundTransfer> t(new OutboundTransfer(-1, slices, {}, cb));
QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
reactor_thread_->reactor()->ScheduleReactorTask(task);
}
void Connection::set_user_credentials(const UserCredentials &user_credentials) {
user_credentials_.CopyFrom(user_credentials);
}
void Connection::ReadHandler(ev::io &watcher, int revents) {
DCHECK(reactor_thread_->IsCurrentThread());
DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")";
if (revents & EV_ERROR) {
reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
": ReadHandler encountered an error"));
return;
}
last_activity_time_ = reactor_thread_->cur_time();
while (true) {
if (!inbound_) {
inbound_.reset(new InboundTransfer());
}
Status status = inbound_->ReceiveBuffer(socket_);
if (PREDICT_FALSE(!status.ok())) {
if (status.posix_code() == ESHUTDOWN) {
VLOG(1) << ToString() << " shut down by remote end.";
} else {
LOG(WARNING) << ToString() << " recv error: " << status.ToString();
}
reactor_thread_->DestroyConnection(this, status);
return;
}
if (!inbound_->TransferFinished()) {
DVLOG(3) << ToString() << ": read is not yet finished yet.";
return;
}
DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size() << " bytes";
if (direction_ == CLIENT) {
HandleCallResponse(std::move(inbound_));
} else if (direction_ == SERVER) {
HandleIncomingCall(std::move(inbound_));
} else {
LOG(FATAL) << "Invalid direction: " << direction_;
}
// TODO: it would seem that it would be good to loop around and see if
// there is more data on the socket by trying another recv(), but it turns
// out that it really hurts throughput to do so. A better approach
// might be for each InboundTransfer to actually try to read an extra byte,
// and if it succeeds, then we'd copy that byte into a new InboundTransfer
// and loop around, since it's likely the next call also arrived at the
// same time.
break;
}
}
void Connection::HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer) {
DCHECK(reactor_thread_->IsCurrentThread());
gscoped_ptr<InboundCall> call(new InboundCall(this));
Status s = call->ParseFrom(std::move(transfer));
if (!s.ok()) {
LOG(WARNING) << ToString() << ": received bad data: " << s.ToString();
// TODO: shutdown? probably, since any future stuff on this socket will be
// "unsynchronized"
return;
}
if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get())) {
LOG(WARNING) << ToString() << ": received call ID " << call->call_id() <<
" but was already processing this ID! Ignoring";
reactor_thread_->DestroyConnection(
this, Status::RuntimeError("Received duplicate call id",
Substitute("$0", call->call_id())));
return;
}
reactor_thread_->reactor()->messenger()->QueueInboundCall(std::move(call));
}
void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) {
DCHECK(reactor_thread_->IsCurrentThread());
gscoped_ptr<CallResponse> resp(new CallResponse);
CHECK_OK(resp->ParseFrom(std::move(transfer)));
CallAwaitingResponse *car_ptr =
EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id());
if (PREDICT_FALSE(car_ptr == nullptr)) {
LOG(WARNING) << ToString() << ": Got a response for call id " << resp->call_id() << " which "
<< "was not pending! Ignoring.";
return;
}
// The car->timeout_timer ev::timer will be stopped automatically by its destructor.
scoped_car car(car_pool_.make_scoped_ptr(car_ptr));
if (PREDICT_FALSE(car->call.get() == nullptr)) {
// The call already failed due to a timeout.
VLOG(1) << "Got response to call id " << resp->call_id() << " after client already timed out";
return;
}
car->call->SetResponse(std::move(resp));
}
void Connection::WriteHandler(ev::io &watcher, int revents) {
DCHECK(reactor_thread_->IsCurrentThread());
if (revents & EV_ERROR) {
reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
": writeHandler encountered an error"));
return;
}
DVLOG(3) << ToString() << ": writeHandler: revents = " << revents;
OutboundTransfer *transfer;
if (outbound_transfers_.empty()) {
LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is "
"nothing to write.";
write_io_.stop();
return;
}
while (!outbound_transfers_.empty()) {
transfer = &(outbound_transfers_.front());
if (!transfer->TransferStarted()) {
// If this is the start of the transfer, then check if the server has the
// required RPC flags. We have to wait until just before the transfer in
// order to ensure that the negotiation has taken place, so that the flags
// are available.
const set<RpcFeatureFlag>& required_features = transfer->required_features();
const set<RpcFeatureFlag>& server_features = sasl_client_.server_features();
if (!includes(server_features.begin(), server_features.end(),
required_features.begin(), required_features.end())) {
outbound_transfers_.pop_front();
CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
Status s = Status::NotSupported("server does not support the required RPC features");
transfer->Abort(s);
car->call->SetFailed(s);
car->call.reset();
delete transfer;
continue;
}
}
last_activity_time_ = reactor_thread_->cur_time();
Status status = transfer->SendBuffer(socket_);
if (PREDICT_FALSE(!status.ok())) {
LOG(WARNING) << ToString() << " send error: " << status.ToString();
reactor_thread_->DestroyConnection(this, status);
return;
}
if (!transfer->TransferFinished()) {
DVLOG(3) << ToString() << ": writeHandler: xfer not finished.";
return;
}
outbound_transfers_.pop_front();
delete transfer;
}
// If we were able to write all of our outbound transfers,
// we don't have any more to write.
write_io_.stop();
}
std::string Connection::ToString() const {
// This may be called from other threads, so we cannot
// include anything in the output about the current state,
// which might concurrently change from another thread.
return strings::Substitute(
"$0 $1",
direction_ == SERVER ? "server connection from" : "client connection to",
remote_.ToString());
}
Status Connection::InitSaslClient() {
RETURN_NOT_OK(sasl_client().Init(kSaslProtoName));
RETURN_NOT_OK(sasl_client().EnableAnonymous());
RETURN_NOT_OK(sasl_client().EnablePlain(user_credentials().real_user(),
user_credentials().password()));
return Status::OK();
}
Status Connection::InitSaslServer() {
// TODO: Do necessary configuration plumbing to enable user authentication.
// Right now we just enable PLAIN with a "dummy" auth store, which allows everyone in.
RETURN_NOT_OK(sasl_server().Init(kSaslProtoName));
gscoped_ptr<AuthStore> auth_store(new DummyAuthStore());
RETURN_NOT_OK(sasl_server().EnablePlain(std::move(auth_store)));
return Status::OK();
}
// Reactor task that transitions this Connection from connection negotiation to
// regular RPC handling. Destroys Connection on negotiation error.
class NegotiationCompletedTask : public ReactorTask {
public:
NegotiationCompletedTask(Connection* conn,
const Status& negotiation_status)
: conn_(conn),
negotiation_status_(negotiation_status) {
}
virtual void Run(ReactorThread *rthread) OVERRIDE {
rthread->CompleteConnectionNegotiation(conn_, negotiation_status_);
delete this;
}
virtual void Abort(const Status &status) OVERRIDE {
DCHECK(conn_->reactor_thread()->reactor()->closing());
VLOG(1) << "Failed connection negotiation due to shut down reactor thread: " <<
status.ToString();
delete this;
}
private:
scoped_refptr<Connection> conn_;
Status negotiation_status_;
};
void Connection::CompleteNegotiation(const Status& negotiation_status) {
auto task = new NegotiationCompletedTask(this, negotiation_status);
reactor_thread_->reactor()->ScheduleReactorTask(task);
}
void Connection::MarkNegotiationComplete() {
DCHECK(reactor_thread_->IsCurrentThread());
negotiation_complete_ = true;
}
Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
RpcConnectionPB* resp) {
DCHECK(reactor_thread_->IsCurrentThread());
resp->set_remote_ip(remote_.ToString());
if (negotiation_complete_) {
resp->set_state(RpcConnectionPB::OPEN);
resp->set_remote_user_credentials(user_credentials_.ToString());
} else {
// It's racy to dump credentials while negotiating, since the Connection
// object is owned by the negotiation thread at that point.
resp->set_state(RpcConnectionPB::NEGOTIATING);
}
if (direction_ == CLIENT) {
for (const car_map_t::value_type& entry : awaiting_response_) {
CallAwaitingResponse *c = entry.second;
if (c->call) {
c->call->DumpPB(req, resp->add_calls_in_flight());
}
}
} else if (direction_ == SERVER) {
for (const inbound_call_map_t::value_type& entry : calls_being_handled_) {
InboundCall* c = entry.second;
c->DumpPB(req, resp->add_calls_in_flight());
}
} else {
LOG(FATAL);
}
return Status::OK();
}
} // namespace rpc
} // namespace kudu