| // Copyright 2013 Cloudera, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <boost/foreach.hpp> |
| #include <list> |
| #include <string> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/gscoped_ptr.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/acceptor_pool.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/service_if.h" |
| #include "kudu/rpc/service_pool.h" |
| #include "kudu/server/rpc_server.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/status.h" |
| |
| using std::string; |
| using std::tr1::shared_ptr; |
| using std::vector; |
| using kudu::rpc::AcceptorPool; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::ServiceIf; |
| using strings::Substitute; |
| |
| DEFINE_string(rpc_bind_addresses, "0.0.0.0", |
| "Comma-separated list of addresses to bind to for RPC connections. " |
| "Currently, ephemeral ports (i.e. port 0) are not allowed."); |
| TAG_FLAG(rpc_bind_addresses, stable); |
| |
| DEFINE_int32(rpc_num_acceptors_per_address, 1, |
| "Number of RPC acceptor threads for each bound address"); |
| TAG_FLAG(rpc_num_acceptors_per_address, advanced); |
| |
| DEFINE_int32(rpc_num_service_threads, 10, |
| "Number of RPC worker threads to run"); |
| TAG_FLAG(rpc_num_service_threads, advanced); |
| |
| DEFINE_int32(rpc_service_queue_length, 50, |
| "Default length of queue for incoming RPC requests"); |
| TAG_FLAG(rpc_service_queue_length, advanced); |
| |
| DEFINE_bool(rpc_server_allow_ephemeral_ports, false, |
| "Allow binding to ephemeral ports. This can cause problems, so currently " |
| "only allowed in tests."); |
| TAG_FLAG(rpc_server_allow_ephemeral_ports, unsafe); |
| |
| namespace kudu { |
| |
| RpcServerOptions::RpcServerOptions() |
| : rpc_bind_addresses(FLAGS_rpc_bind_addresses), |
| num_acceptors_per_address(FLAGS_rpc_num_acceptors_per_address), |
| num_service_threads(FLAGS_rpc_num_service_threads), |
| default_port(0), |
| service_queue_length(FLAGS_rpc_service_queue_length) { |
| } |
| |
| RpcServer::RpcServer(const RpcServerOptions& opts) |
| : server_state_(UNINITIALIZED), |
| options_(opts) { |
| } |
| |
| RpcServer::~RpcServer() { |
| Shutdown(); |
| } |
| |
| string RpcServer::ToString() const { |
| // TODO: include port numbers, etc. |
| return "RpcServer"; |
| } |
| |
| Status RpcServer::Init(const std::tr1::shared_ptr<Messenger>& messenger) { |
| CHECK_EQ(server_state_, UNINITIALIZED); |
| messenger_ = messenger; |
| |
| RETURN_NOT_OK(ParseAddressList(options_.rpc_bind_addresses, |
| options_.default_port, |
| &rpc_bind_addresses_)); |
| BOOST_FOREACH(const Sockaddr& addr, rpc_bind_addresses_) { |
| if (IsPrivilegedPort(addr.port())) { |
| LOG(WARNING) << "May be unable to bind to privileged port for address " |
| << addr.ToString(); |
| } |
| |
| // Currently, we can't support binding to ephemeral ports outside of |
| // unit tests, because consensus caches RPC ports of other servers |
| // across restarts. See KUDU-334. |
| if (addr.port() == 0 && !FLAGS_rpc_server_allow_ephemeral_ports) { |
| LOG(FATAL) << "Binding to ephemeral ports not supported (RPC address " |
| << "configured to " << addr.ToString() << ")"; |
| } |
| } |
| |
| server_state_ = INITIALIZED; |
| return Status::OK(); |
| } |
| |
| Status RpcServer::RegisterService(gscoped_ptr<rpc::ServiceIf> service) { |
| CHECK(server_state_ == INITIALIZED || |
| server_state_ == BOUND) << "bad state: " << server_state_; |
| const scoped_refptr<MetricEntity>& metric_entity = messenger_->metric_entity(); |
| string service_name = service->service_name(); |
| scoped_refptr<rpc::ServicePool> service_pool = |
| new rpc::ServicePool(service.Pass(), metric_entity, options_.service_queue_length); |
| RETURN_NOT_OK(service_pool->Init(options_.num_service_threads)); |
| RETURN_NOT_OK(messenger_->RegisterService(service_name, service_pool)); |
| return Status::OK(); |
| } |
| |
| Status RpcServer::Bind() { |
| CHECK_EQ(server_state_, INITIALIZED); |
| |
| // Create the Acceptor pools (one per bind address) |
| vector<shared_ptr<AcceptorPool> > new_acceptor_pools; |
| // Create the AcceptorPool for each bind address. |
| BOOST_FOREACH(const Sockaddr& bind_addr, rpc_bind_addresses_) { |
| shared_ptr<rpc::AcceptorPool> pool; |
| RETURN_NOT_OK(messenger_->AddAcceptorPool( |
| bind_addr, |
| &pool)); |
| new_acceptor_pools.push_back(pool); |
| } |
| acceptor_pools_.swap(new_acceptor_pools); |
| |
| server_state_ = BOUND; |
| return Status::OK(); |
| } |
| |
| Status RpcServer::Start() { |
| if (server_state_ == INITIALIZED) { |
| RETURN_NOT_OK(Bind()); |
| } |
| CHECK_EQ(server_state_, BOUND); |
| server_state_ = STARTED; |
| |
| BOOST_FOREACH(const shared_ptr<AcceptorPool>& pool, acceptor_pools_) { |
| RETURN_NOT_OK(pool->Start(options_.num_acceptors_per_address)); |
| } |
| |
| vector<Sockaddr> bound_addrs; |
| RETURN_NOT_OK(GetBoundAddresses(&bound_addrs)); |
| string bound_addrs_str; |
| BOOST_FOREACH(const Sockaddr& bind_addr, bound_addrs) { |
| if (!bound_addrs_str.empty()) bound_addrs_str += ", "; |
| bound_addrs_str += bind_addr.ToString(); |
| } |
| LOG(INFO) << "RPC server started. Bound to: " << bound_addrs_str; |
| |
| return Status::OK(); |
| } |
| |
| void RpcServer::Shutdown() { |
| BOOST_FOREACH(const shared_ptr<AcceptorPool>& pool, acceptor_pools_) { |
| pool->Shutdown(); |
| } |
| acceptor_pools_.clear(); |
| |
| if (messenger_) { |
| WARN_NOT_OK(messenger_->UnregisterAllServices(), "Unable to unregister our services"); |
| } |
| } |
| |
| Status RpcServer::GetBoundAddresses(vector<Sockaddr>* addresses) const { |
| CHECK(server_state_ == BOUND || |
| server_state_ == STARTED) << "bad state: " << server_state_; |
| BOOST_FOREACH(const shared_ptr<AcceptorPool>& pool, acceptor_pools_) { |
| Sockaddr bound_addr; |
| RETURN_NOT_OK_PREPEND(pool->GetBoundAddress(&bound_addr), |
| "Unable to get bound address from AcceptorPool"); |
| addresses->push_back(bound_addr); |
| } |
| return Status::OK(); |
| } |
| |
| const rpc::ServicePool* RpcServer::service_pool(const string& service_name) const { |
| return down_cast<rpc::ServicePool*>(messenger_->rpc_service(service_name).get()); |
| } |
| |
| } // namespace kudu |