blob: af8d72412e057e35b9f66d3c625a5f7031f8299a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/messenger.h"
#include <cstdlib>
#include <functional>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <glog/logging.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/acceptor_pool.h"
#include "kudu/rpc/connection_id.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/outbound_call.h"
#include "kudu/rpc/reactor.h"
#include "kudu/rpc/remote_method.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_service.h"
#include "kudu/rpc/rpcz_store.h"
#include "kudu/rpc/sasl_common.h"
#include "kudu/rpc/server_negotiation.h"
#include "kudu/rpc/service_if.h"
#include "kudu/security/tls_context.h"
#include "kudu/security/token_verifier.h"
#include "kudu/util/flags.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/openssl_util.h"
#include "kudu/util/status.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/threadpool.h"
using kudu::security::RpcAuthentication;
using kudu::security::RpcEncryption;
using std::string;
using std::shared_ptr;
using std::unique_ptr;
using strings::Substitute;
namespace kudu {
namespace rpc {
const int64_t MessengerBuilder::kRpcNegotiationTimeoutMs = 3000;
MessengerBuilder::MessengerBuilder(string name)
: name_(std::move(name)),
connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
num_reactors_(4),
min_negotiation_threads_(0),
max_negotiation_threads_(4),
coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
rpc_negotiation_timeout_ms_(kRpcNegotiationTimeoutMs),
sasl_proto_name_("kudu"),
rpc_authentication_("optional"),
rpc_encryption_("optional"),
rpc_loopback_encryption_(false),
rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
rpc_tls_ciphersuites_(kudu::security::SecurityDefaults::kDefaultTlsCipherSuites),
rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
enable_inbound_tls_(false),
reuseport_(false) {
}
Status MessengerBuilder::Build(shared_ptr<Messenger>* msgr) {
// Initialize SASL library before we start making requests
RETURN_NOT_OK(SaslInit(!keytab_file_.empty()));
// See docs on Messenger::retain_self_ for info about this odd hack.
//
// Note: can't use make_shared() as it doesn't support custom deleters.
shared_ptr<Messenger> new_msgr(new Messenger(*this),
std::mem_fn(&Messenger::AllExternalReferencesDropped));
RETURN_NOT_OK(ParseTriState("--rpc_authentication",
rpc_authentication_,
&new_msgr->authentication_));
RETURN_NOT_OK(ParseTriState("--rpc_encryption",
rpc_encryption_,
&new_msgr->encryption_));
new_msgr->loopback_encryption_ = rpc_loopback_encryption_;
RETURN_NOT_OK(new_msgr->Init());
if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) {
auto* tls_context = new_msgr->mutable_tls_context();
if (!rpc_certificate_file_.empty()) {
CHECK(!rpc_private_key_file_.empty());
CHECK(!rpc_ca_certificate_file_.empty());
// TODO(KUDU-1920): should we try and enforce that the server
// is in the subject or alt names of the cert?
RETURN_NOT_OK(tls_context->LoadCertificateAuthority(rpc_ca_certificate_file_));
if (rpc_private_key_password_cmd_.empty()) {
RETURN_NOT_OK(tls_context->LoadCertificateAndKey(rpc_certificate_file_,
rpc_private_key_file_));
} else {
RETURN_NOT_OK(tls_context->LoadCertificateAndPasswordProtectedKey(
rpc_certificate_file_, rpc_private_key_file_,
[&](){
string ret;
WARN_NOT_OK(security::GetPasswordFromShellCommand(
rpc_private_key_password_cmd_, &ret),
"could not get RPC password from configured command");
return ret;
}
));
}
} else {
RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey());
}
}
*msgr = std::move(new_msgr);
return Status::OK();
}
// See comment on Messenger::retain_self_ member.
void Messenger::AllExternalReferencesDropped() {
// The last external ref may have been dropped in the context of a task
// running on a reactor thread. If that's the case, a SYNC shutdown here
// would deadlock.
//
// If a SYNC shutdown is desired, Shutdown() should be called explicitly.
ShutdownInternal(ShutdownMode::ASYNC);
CHECK(retain_self_.get());
// If we have no more external references, then we no longer
// need to retain ourself. We'll destruct as soon as all our
// internal-facing references are dropped (ie those from reactor
// threads).
retain_self_.reset();
}
void Messenger::Shutdown() {
ShutdownInternal(ShutdownMode::SYNC);
}
void Messenger::ShutdownInternal(ShutdownMode mode) {
if (mode == ShutdownMode::SYNC) {
ThreadRestrictions::AssertWaitAllowed();
}
// Since we're shutting down, it's OK to block.
//
// TODO(adar): this ought to be removed (i.e. if ASYNC, waiting should be
// forbidden, and if SYNC, we already asserted above), but that's not
// possible while shutting down thread and acceptor pools still involves
// joining threads.
ThreadRestrictions::ScopedAllowWait allow_wait;
acceptor_vec_t pools_to_shutdown;
RpcServicesMap services_to_release;
{
std::lock_guard<percpu_rwlock> guard(lock_);
if (state_ == kClosing) {
return;
}
VLOG(1) << "shutting down messenger " << name_;
state_ = kClosing;
services_to_release = std::move(rpc_services_);
pools_to_shutdown = std::move(acceptor_pools_);
}
// Destroy state outside of the lock.
services_to_release.clear();
for (const auto& p : pools_to_shutdown) {
p->Shutdown();
}
// Need to shut down negotiation pool before the reactors, since the
// reactors close the Connection sockets, and may race against the negotiation
// threads' blocking reads & writes.
client_negotiation_pool_->Shutdown();
server_negotiation_pool_->Shutdown();
for (Reactor* reactor : reactors_) {
reactor->Shutdown(mode);
}
}
Status Messenger::AddAcceptorPool(const Sockaddr& accept_addr,
shared_ptr<AcceptorPool>* pool) {
// Before listening, if we expect to require Kerberos, we want to verify
// that everything is set up correctly. This way we'll generate errors on
// startup rather than later on when we first receive a client connection.
if (!keytab_file_.empty()) {
RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(sasl_proto_name()),
"GSSAPI/Kerberos not properly configured");
}
Socket sock;
RETURN_NOT_OK(sock.Init(accept_addr.family(), 0));
RETURN_NOT_OK(sock.SetReuseAddr(true));
if (reuseport_) {
RETURN_NOT_OK(sock.SetReusePort(true));
}
RETURN_NOT_OK(sock.Bind(accept_addr));
Sockaddr remote;
RETURN_NOT_OK(sock.GetSocketAddress(&remote));
auto acceptor_pool(std::make_shared<AcceptorPool>(this, &sock, remote));
std::lock_guard<percpu_rwlock> guard(lock_);
acceptor_pools_.push_back(acceptor_pool);
pool->swap(acceptor_pool);
return Status::OK();
}
// Register a new RpcService to handle inbound requests.
Status Messenger::RegisterService(const string& service_name,
const scoped_refptr<RpcService>& service) {
DCHECK(service);
std::lock_guard<percpu_rwlock> guard(lock_);
DCHECK_NE(kServicesUnregistered, state_);
DCHECK_NE(kClosing, state_);
if (InsertIfNotPresent(&rpc_services_, service_name, service)) {
return Status::OK();
}
return Status::AlreadyPresent("This service is already present");
}
void Messenger::UnregisterAllServices() {
RpcServicesMap to_release;
{
std::lock_guard<percpu_rwlock> guard(lock_);
to_release = std::move(rpc_services_);
state_ = kServicesUnregistered;
}
// Release the map outside of the lock.
}
Status Messenger::UnregisterService(const string& service_name) {
scoped_refptr<RpcService> to_release;
{
std::lock_guard<percpu_rwlock> guard(lock_);
to_release = EraseKeyReturnValuePtr(&rpc_services_, service_name);
if (!to_release) {
return Status::ServiceUnavailable(Substitute(
"service $0 not registered on $1", service_name, name_));
}
}
// Release the service outside of the lock.
return Status::OK();
}
void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
Reactor *reactor = RemoteToReactor(call->conn_id().remote());
reactor->QueueOutboundCall(call);
}
void Messenger::QueueInboundCall(unique_ptr<InboundCall> call) {
// This lock acquisition spans the entirety of the function to avoid having to
// take a ref on the RpcService. In doing so, we guarantee that the service
// isn't shut down here, which would be problematic because shutdown is a
// blocking operation and QueueInboundCall is called by the reactor thread.
//
// See KUDU-2946 for more details.
shared_lock<rw_spinlock> guard(lock_.get_lock());
scoped_refptr<RpcService>* service = FindOrNull(rpc_services_,
call->remote_method().service_name());
if (PREDICT_FALSE(!service)) {
const auto msg = Substitute("service $0 not registered on $1",
call->remote_method().service_name(), name_);
if (state_ == kServicesRegistered) {
// NOTE: this message is only actually interesting if it's not transient.
LOG(INFO) << msg;
call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, Status::NotFound(msg));
} else {
call.release()->RespondFailure(
ErrorStatusPB::ERROR_UNAVAILABLE, Status::ServiceUnavailable(msg));
}
return;
}
call->set_method_info((*service)->LookupMethod(call->remote_method()));
// The RpcService will respond to the client on success or failure.
WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call");
}
void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) {
Reactor *reactor = RemoteToReactor(call->conn_id().remote());
reactor->QueueCancellation(call);
}
void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) {
Reactor *reactor = RemoteToReactor(remote);
reactor->RegisterInboundSocket(new_socket, remote);
}
Messenger::Messenger(const MessengerBuilder &bld)
: name_(bld.name_),
state_(kStarted),
authentication_(RpcAuthentication::REQUIRED),
encryption_(RpcEncryption::REQUIRED),
tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_,
bld.rpc_tls_ciphersuites_,
bld.rpc_tls_min_protocol_,
bld.rpc_tls_excluded_protocols_)),
token_verifier_(new security::TokenVerifier),
rpcz_store_(new RpczStore),
metric_entity_(bld.metric_entity_),
rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
sasl_proto_name_(bld.sasl_proto_name_),
keytab_file_(bld.keytab_file_),
reuseport_(bld.reuseport_),
retain_self_(this) {
for (int i = 0; i < bld.num_reactors_; i++) {
reactors_.push_back(new Reactor(retain_self_, i, bld));
}
CHECK_OK(ThreadPoolBuilder("client-negotiator")
.set_min_threads(bld.min_negotiation_threads_)
.set_max_threads(bld.max_negotiation_threads_)
.Build(&client_negotiation_pool_));
CHECK_OK(ThreadPoolBuilder("server-negotiator")
.set_min_threads(bld.min_negotiation_threads_)
.set_max_threads(bld.max_negotiation_threads_)
.Build(&server_negotiation_pool_));
}
Messenger::~Messenger() {
CHECK_EQ(state_, kClosing) << "Should have already shut down";
STLDeleteElements(&reactors_);
}
Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) {
// This is just a static partitioning; we could get a lot
// fancier with assigning Sockaddrs to Reactors.
return reactors_[remote.HashCode() % reactors_.size()];
}
Status Messenger::Init() {
RETURN_NOT_OK(tls_context_->Init());
for (Reactor* r : reactors_) {
RETURN_NOT_OK(r->Init());
}
return Status::OK();
}
Status Messenger::DumpConnections(const DumpConnectionsRequestPB& req,
DumpConnectionsResponsePB* resp) {
for (Reactor* reactor : reactors_) {
RETURN_NOT_OK(reactor->DumpConnections(req, resp));
}
return Status::OK();
}
void Messenger::ScheduleOnReactor(std::function<void(const Status&)> func,
MonoDelta when) {
DCHECK(!reactors_.empty());
// If we're already running on a reactor thread, reuse it.
Reactor* chosen = nullptr;
for (Reactor* r : reactors_) {
if (r->IsCurrentThread()) {
chosen = r;
}
}
if (chosen == nullptr) {
// Not running on a reactor thread, pick one at random.
chosen = reactors_[rand() % reactors_.size()];
}
DelayedTask* task = new DelayedTask(std::move(func), when);
chosen->ScheduleReactorTask(task);
}
const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_name) const {
scoped_refptr<RpcService> service;
{
shared_lock<rw_spinlock> guard(lock_.get_lock());
if (!FindCopy(rpc_services_, service_name, &service)) {
return scoped_refptr<RpcService>(nullptr);
}
}
return service;
}
ThreadPool* Messenger::negotiation_pool(Connection::Direction dir) {
switch (dir) {
case Connection::CLIENT: return client_negotiation_pool_.get();
case Connection::SERVER: return server_negotiation_pool_.get();
}
DCHECK(false) << "Unknown Connection::Direction value: " << dir;
return nullptr;
}
} // namespace rpc
} // namespace kudu