| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/rpc/messenger.h" |
| |
| #include <cstdlib> |
| #include <functional> |
| #include <mutex> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/acceptor_pool.h" |
| #include "kudu/rpc/connection_id.h" |
| #include "kudu/rpc/inbound_call.h" |
| #include "kudu/rpc/outbound_call.h" |
| #include "kudu/rpc/reactor.h" |
| #include "kudu/rpc/remote_method.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/rpc/rpc_service.h" |
| #include "kudu/rpc/rpcz_store.h" |
| #include "kudu/rpc/sasl_common.h" |
| #include "kudu/rpc/server_negotiation.h" |
| #include "kudu/rpc/service_if.h" |
| #include "kudu/security/openssl_util.h" |
| #include "kudu/security/tls_context.h" |
| #include "kudu/security/token_verifier.h" |
| #include "kudu/util/flags.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/socket.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/thread_restrictions.h" |
| #include "kudu/util/threadpool.h" |
| |
| using kudu::security::RpcAuthentication; |
| using kudu::security::RpcEncryption; |
| using std::string; |
| using std::shared_ptr; |
| using std::unique_ptr; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace rpc { |
| |
| const int64_t MessengerBuilder::kRpcNegotiationTimeoutMs = 3000; |
| |
| MessengerBuilder::MessengerBuilder(string name) |
| : name_(std::move(name)), |
| connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)), |
| num_reactors_(4), |
| min_negotiation_threads_(0), |
| max_negotiation_threads_(4), |
| coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)), |
| rpc_negotiation_timeout_ms_(kRpcNegotiationTimeoutMs), |
| sasl_proto_name_("kudu"), |
| rpc_authentication_("optional"), |
| rpc_encryption_("optional"), |
| rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers), |
| rpc_tls_ciphersuites_(kudu::security::SecurityDefaults::kDefaultTlsCipherSuites), |
| rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion), |
| enable_inbound_tls_(false), |
| reuseport_(false) { |
| } |
| |
| Status MessengerBuilder::Build(shared_ptr<Messenger>* msgr) { |
| // Initialize SASL library before we start making requests |
| RETURN_NOT_OK(SaslInit(!keytab_file_.empty())); |
| |
| // See docs on Messenger::retain_self_ for info about this odd hack. |
| // |
| // Note: can't use make_shared() as it doesn't support custom deleters. |
| shared_ptr<Messenger> new_msgr(new Messenger(*this), |
| std::mem_fn(&Messenger::AllExternalReferencesDropped)); |
| RETURN_NOT_OK(ParseTriState("--rpc_authentication", |
| rpc_authentication_, |
| &new_msgr->authentication_)); |
| RETURN_NOT_OK(ParseTriState("--rpc_encryption", |
| rpc_encryption_, |
| &new_msgr->encryption_)); |
| RETURN_NOT_OK(new_msgr->Init()); |
| if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) { |
| auto* tls_context = new_msgr->mutable_tls_context(); |
| |
| if (!rpc_certificate_file_.empty()) { |
| CHECK(!rpc_private_key_file_.empty()); |
| CHECK(!rpc_ca_certificate_file_.empty()); |
| |
| // TODO(KUDU-1920): should we try and enforce that the server |
| // is in the subject or alt names of the cert? |
| RETURN_NOT_OK(tls_context->LoadCertificateAuthority(rpc_ca_certificate_file_)); |
| if (rpc_private_key_password_cmd_.empty()) { |
| RETURN_NOT_OK(tls_context->LoadCertificateAndKey(rpc_certificate_file_, |
| rpc_private_key_file_)); |
| } else { |
| RETURN_NOT_OK(tls_context->LoadCertificateAndPasswordProtectedKey( |
| rpc_certificate_file_, rpc_private_key_file_, |
| [&](){ |
| string ret; |
| WARN_NOT_OK(security::GetPasswordFromShellCommand( |
| rpc_private_key_password_cmd_, &ret), |
| "could not get RPC password from configured command"); |
| return ret; |
| } |
| )); |
| } |
| } else { |
| RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey()); |
| } |
| } |
| |
| *msgr = std::move(new_msgr); |
| return Status::OK(); |
| } |
| |
| // See comment on Messenger::retain_self_ member. |
| void Messenger::AllExternalReferencesDropped() { |
| // The last external ref may have been dropped in the context of a task |
| // running on a reactor thread. If that's the case, a SYNC shutdown here |
| // would deadlock. |
| // |
| // If a SYNC shutdown is desired, Shutdown() should be called explicitly. |
| ShutdownInternal(ShutdownMode::ASYNC); |
| |
| CHECK(retain_self_.get()); |
| // If we have no more external references, then we no longer |
| // need to retain ourself. We'll destruct as soon as all our |
| // internal-facing references are dropped (ie those from reactor |
| // threads). |
| retain_self_.reset(); |
| } |
| |
| void Messenger::Shutdown() { |
| ShutdownInternal(ShutdownMode::SYNC); |
| } |
| |
| void Messenger::ShutdownInternal(ShutdownMode mode) { |
| if (mode == ShutdownMode::SYNC) { |
| ThreadRestrictions::AssertWaitAllowed(); |
| } |
| |
| // Since we're shutting down, it's OK to block. |
| // |
| // TODO(adar): this ought to be removed (i.e. if ASYNC, waiting should be |
| // forbidden, and if SYNC, we already asserted above), but that's not |
| // possible while shutting down thread and acceptor pools still involves |
| // joining threads. |
| ThreadRestrictions::ScopedAllowWait allow_wait; |
| |
| acceptor_vec_t pools_to_shutdown; |
| RpcServicesMap services_to_release; |
| { |
| std::lock_guard<percpu_rwlock> guard(lock_); |
| if (state_ == kClosing) { |
| return; |
| } |
| VLOG(1) << "shutting down messenger " << name_; |
| state_ = kClosing; |
| |
| services_to_release = std::move(rpc_services_); |
| pools_to_shutdown = std::move(acceptor_pools_); |
| } |
| |
| // Destroy state outside of the lock. |
| services_to_release.clear(); |
| for (const auto& p : pools_to_shutdown) { |
| p->Shutdown(); |
| } |
| |
| // Need to shut down negotiation pool before the reactors, since the |
| // reactors close the Connection sockets, and may race against the negotiation |
| // threads' blocking reads & writes. |
| client_negotiation_pool_->Shutdown(); |
| server_negotiation_pool_->Shutdown(); |
| |
| for (Reactor* reactor : reactors_) { |
| reactor->Shutdown(mode); |
| } |
| } |
| |
| Status Messenger::AddAcceptorPool(const Sockaddr& accept_addr, |
| shared_ptr<AcceptorPool>* pool) { |
| // Before listening, if we expect to require Kerberos, we want to verify |
| // that everything is set up correctly. This way we'll generate errors on |
| // startup rather than later on when we first receive a client connection. |
| if (!keytab_file_.empty()) { |
| RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(sasl_proto_name()), |
| "GSSAPI/Kerberos not properly configured"); |
| } |
| |
| Socket sock; |
| RETURN_NOT_OK(sock.Init(accept_addr.family(), 0)); |
| RETURN_NOT_OK(sock.SetReuseAddr(true)); |
| if (reuseport_) { |
| RETURN_NOT_OK(sock.SetReusePort(true)); |
| } |
| RETURN_NOT_OK(sock.Bind(accept_addr)); |
| Sockaddr remote; |
| RETURN_NOT_OK(sock.GetSocketAddress(&remote)); |
| auto acceptor_pool(std::make_shared<AcceptorPool>(this, &sock, remote)); |
| |
| std::lock_guard<percpu_rwlock> guard(lock_); |
| acceptor_pools_.push_back(acceptor_pool); |
| pool->swap(acceptor_pool); |
| return Status::OK(); |
| } |
| |
| // Register a new RpcService to handle inbound requests. |
| Status Messenger::RegisterService(const string& service_name, |
| const scoped_refptr<RpcService>& service) { |
| DCHECK(service); |
| std::lock_guard<percpu_rwlock> guard(lock_); |
| DCHECK_NE(kServicesUnregistered, state_); |
| DCHECK_NE(kClosing, state_); |
| if (InsertIfNotPresent(&rpc_services_, service_name, service)) { |
| return Status::OK(); |
| } |
| return Status::AlreadyPresent("This service is already present"); |
| } |
| |
| void Messenger::UnregisterAllServices() { |
| RpcServicesMap to_release; |
| { |
| std::lock_guard<percpu_rwlock> guard(lock_); |
| to_release = std::move(rpc_services_); |
| state_ = kServicesUnregistered; |
| } |
| // Release the map outside of the lock. |
| } |
| |
| Status Messenger::UnregisterService(const string& service_name) { |
| scoped_refptr<RpcService> to_release; |
| { |
| std::lock_guard<percpu_rwlock> guard(lock_); |
| to_release = EraseKeyReturnValuePtr(&rpc_services_, service_name); |
| if (!to_release) { |
| return Status::ServiceUnavailable(Substitute( |
| "service $0 not registered on $1", service_name, name_)); |
| } |
| } |
| // Release the service outside of the lock. |
| return Status::OK(); |
| } |
| |
| void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) { |
| Reactor *reactor = RemoteToReactor(call->conn_id().remote()); |
| reactor->QueueOutboundCall(call); |
| } |
| |
| void Messenger::QueueInboundCall(unique_ptr<InboundCall> call) { |
| // This lock acquisition spans the entirety of the function to avoid having to |
| // take a ref on the RpcService. In doing so, we guarantee that the service |
| // isn't shut down here, which would be problematic because shutdown is a |
| // blocking operation and QueueInboundCall is called by the reactor thread. |
| // |
| // See KUDU-2946 for more details. |
| shared_lock<rw_spinlock> guard(lock_.get_lock()); |
| scoped_refptr<RpcService>* service = FindOrNull(rpc_services_, |
| call->remote_method().service_name()); |
| if (PREDICT_FALSE(!service)) { |
| const auto msg = Substitute("service $0 not registered on $1", |
| call->remote_method().service_name(), name_); |
| if (state_ == kServicesRegistered) { |
| // NOTE: this message is only actually interesting if it's not transient. |
| LOG(INFO) << msg; |
| call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, Status::NotFound(msg)); |
| } else { |
| call.release()->RespondFailure( |
| ErrorStatusPB::ERROR_UNAVAILABLE, Status::ServiceUnavailable(msg)); |
| } |
| return; |
| } |
| |
| call->set_method_info((*service)->LookupMethod(call->remote_method())); |
| |
| // The RpcService will respond to the client on success or failure. |
| WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call"); |
| } |
| |
| void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) { |
| Reactor *reactor = RemoteToReactor(call->conn_id().remote()); |
| reactor->QueueCancellation(call); |
| } |
| |
| void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) { |
| Reactor *reactor = RemoteToReactor(remote); |
| reactor->RegisterInboundSocket(new_socket, remote); |
| } |
| |
| Messenger::Messenger(const MessengerBuilder &bld) |
| : name_(bld.name_), |
| state_(kStarted), |
| authentication_(RpcAuthentication::REQUIRED), |
| encryption_(RpcEncryption::REQUIRED), |
| tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_, |
| bld.rpc_tls_ciphersuites_, |
| bld.rpc_tls_min_protocol_, |
| bld.rpc_tls_excluded_protocols_)), |
| token_verifier_(new security::TokenVerifier), |
| rpcz_store_(new RpczStore), |
| metric_entity_(bld.metric_entity_), |
| rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_), |
| sasl_proto_name_(bld.sasl_proto_name_), |
| keytab_file_(bld.keytab_file_), |
| reuseport_(bld.reuseport_), |
| retain_self_(this) { |
| for (int i = 0; i < bld.num_reactors_; i++) { |
| reactors_.push_back(new Reactor(retain_self_, i, bld)); |
| } |
| CHECK_OK(ThreadPoolBuilder("client-negotiator") |
| .set_min_threads(bld.min_negotiation_threads_) |
| .set_max_threads(bld.max_negotiation_threads_) |
| .Build(&client_negotiation_pool_)); |
| CHECK_OK(ThreadPoolBuilder("server-negotiator") |
| .set_min_threads(bld.min_negotiation_threads_) |
| .set_max_threads(bld.max_negotiation_threads_) |
| .Build(&server_negotiation_pool_)); |
| } |
| |
| Messenger::~Messenger() { |
| CHECK_EQ(state_, kClosing) << "Should have already shut down"; |
| STLDeleteElements(&reactors_); |
| } |
| |
| Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) { |
| // This is just a static partitioning; we could get a lot |
| // fancier with assigning Sockaddrs to Reactors. |
| return reactors_[remote.HashCode() % reactors_.size()]; |
| } |
| |
| Status Messenger::Init() { |
| RETURN_NOT_OK(tls_context_->Init()); |
| for (Reactor* r : reactors_) { |
| RETURN_NOT_OK(r->Init()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status Messenger::DumpConnections(const DumpConnectionsRequestPB& req, |
| DumpConnectionsResponsePB* resp) { |
| for (Reactor* reactor : reactors_) { |
| RETURN_NOT_OK(reactor->DumpConnections(req, resp)); |
| } |
| return Status::OK(); |
| } |
| |
| void Messenger::ScheduleOnReactor(std::function<void(const Status&)> func, |
| MonoDelta when) { |
| DCHECK(!reactors_.empty()); |
| |
| // If we're already running on a reactor thread, reuse it. |
| Reactor* chosen = nullptr; |
| for (Reactor* r : reactors_) { |
| if (r->IsCurrentThread()) { |
| chosen = r; |
| } |
| } |
| if (chosen == nullptr) { |
| // Not running on a reactor thread, pick one at random. |
| chosen = reactors_[rand() % reactors_.size()]; |
| } |
| |
| DelayedTask* task = new DelayedTask(std::move(func), when); |
| chosen->ScheduleReactorTask(task); |
| } |
| |
| const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_name) const { |
| scoped_refptr<RpcService> service; |
| { |
| shared_lock<rw_spinlock> guard(lock_.get_lock()); |
| if (!FindCopy(rpc_services_, service_name, &service)) { |
| return scoped_refptr<RpcService>(nullptr); |
| } |
| } |
| return service; |
| } |
| |
| ThreadPool* Messenger::negotiation_pool(Connection::Direction dir) { |
| switch (dir) { |
| case Connection::CLIENT: return client_negotiation_pool_.get(); |
| case Connection::SERVER: return server_negotiation_pool_.get(); |
| } |
| DCHECK(false) << "Unknown Connection::Direction value: " << dir; |
| return nullptr; |
| } |
| |
| } // namespace rpc |
| } // namespace kudu |