blob: 1a742d1b5877eb88da1936c4bc6b7034b3cc0fe0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/server/server_base.h"
#include <cstdint>
#include <functional>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/codegen/compilation_manager.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/fs_report.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/service_pool.h"
#include "kudu/security/init.h"
#include "kudu/security/security_flags.h"
#include "kudu/server/default_path_handlers.h"
#include "kudu/server/diagnostics_log.h"
#include "kudu/server/generic_service.h"
#include "kudu/server/glog_metrics.h"
#include "kudu/server/rpc_server.h"
#include "kudu/server/rpcz-path-handler.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base_options.h"
#include "kudu/server/tcmalloc_metrics.h"
#include "kudu/server/tracing_path_handlers.h"
#include "kudu/server/webserver.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/flag_validators.h"
#include "kudu/util/flags.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/logging.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/minidump.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/spinlock_profiling.h"
#include "kudu/util/thread.h"
#include "kudu/util/user.h"
#include "kudu/util/version_info.h"
DEFINE_int32(num_reactor_threads, 4, "Number of libev reactor threads to start.");
TAG_FLAG(num_reactor_threads, advanced);
DEFINE_int32(min_negotiation_threads, 0, "Minimum number of connection negotiation threads.");
TAG_FLAG(min_negotiation_threads, advanced);
DEFINE_int32(max_negotiation_threads, 50, "Maximum number of connection negotiation threads.");
TAG_FLAG(max_negotiation_threads, advanced);
DEFINE_int64(rpc_negotiation_timeout_ms, 3000,
"Timeout for negotiating an RPC connection.");
TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
DEFINE_bool(webserver_enabled, true, "Whether to enable the web server on this daemon. "
"NOTE: disabling the web server is also likely to prevent monitoring systems "
"from properly capturing metrics.");
TAG_FLAG(webserver_enabled, advanced);
DEFINE_string(superuser_acl, "",
"The list of usernames to allow as super users, comma-separated. "
"A '*' entry indicates that all authenticated users are allowed. "
"If this is left unset or blank, the default behavior is that the "
"identity of the daemon itself determines the superuser. If the "
"daemon is logged in from a Keytab, then the local username from "
"the Kerberos principal is used; otherwise, the local Unix "
"username is used.");
TAG_FLAG(superuser_acl, stable);
TAG_FLAG(superuser_acl, sensitive);
DEFINE_string(user_acl, "*",
"The list of usernames who may access the cluster, comma-separated. "
"A '*' entry indicates that all authenticated users are allowed.");
TAG_FLAG(user_acl, stable);
TAG_FLAG(user_acl, sensitive);
DEFINE_string(principal, "kudu/_HOST",
"Kerberos principal that this daemon will log in as. The special token "
"_HOST will be replaced with the FQDN of the local host.");
TAG_FLAG(principal, experimental);
// This is currently tagged as unsafe because there is no way for users to configure
// clients to expect a non-default principal. As such, configuring a server to login
// as a different one would end up with a cluster that can't be connected to.
// See KUDU-1884.
TAG_FLAG(principal, unsafe);
DEFINE_string(keytab_file, "",
"Path to the Kerberos Keytab file for this server. Specifying a "
"keytab file will cause the server to kinit, and enable Kerberos "
"to be used to authenticate RPC connections.");
TAG_FLAG(keytab_file, stable);
DEFINE_bool(allow_world_readable_credentials, false,
"Enable the use of keytab files and TLS private keys with "
"world-readable permissions.");
TAG_FLAG(allow_world_readable_credentials, unsafe);
DEFINE_string(rpc_authentication, "optional",
"Whether to require RPC connections to authenticate. Must be one "
"of 'disabled', 'optional', or 'required'. If 'optional', "
"authentication will be used when the remote end supports it. If "
"'required', connections which are not able to authenticate "
"(because the remote end lacks support) are rejected. Secure "
"clusters should use 'required'.");
DEFINE_string(rpc_encryption, "optional",
"Whether to require RPC connections to be encrypted. Must be one "
"of 'disabled', 'optional', or 'required'. If 'optional', "
"encryption will be used when the remote end supports it. If "
"'required', connections which are not able to use encryption "
"(because the remote end lacks support) are rejected. If 'disabled', "
"encryption will not be used, and RPC authentication "
"(--rpc_authentication) must also be disabled as well. "
"Secure clusters should use 'required'.");
TAG_FLAG(rpc_authentication, evolving);
TAG_FLAG(rpc_encryption, evolving);
"The cipher suite preferences to use for TLS-secured RPC connections. "
"Uses the OpenSSL cipher preference list format. See man (1) ciphers "
"for more information.");
TAG_FLAG(rpc_tls_ciphers, advanced);
"The minimum protocol version to allow when for securing RPC "
"connections with TLS. May be one of 'TLSv1', 'TLSv1.1', or "
TAG_FLAG(rpc_tls_min_protocol, advanced);
DEFINE_string(rpc_certificate_file, "",
"Path to a PEM encoded X509 certificate to use for securing RPC "
"connections with SSL/TLS. If set, '--rpc_private_key_file' and "
"'--rpc_ca_certificate_file' must be set as well.");
DEFINE_string(rpc_private_key_file, "",
"Path to a PEM encoded private key paired with the certificate "
"from '--rpc_certificate_file'");
DEFINE_string(rpc_ca_certificate_file, "",
"Path to the PEM encoded X509 certificate of the trusted external "
"certificate authority. The provided certificate should be the root "
"issuer of the certificate passed in '--rpc_certificate_file'.");
DEFINE_string(rpc_private_key_password_cmd, "", "A Unix command whose output "
"returns the password used to decrypt the RPC server's private key "
"file specified in --rpc_private_key_file. If the .PEM key file is "
"not password-protected, this flag does not need to be set. "
"Trailing whitespace will be trimmed before it is used to decrypt "
"the private key.");
// Setting TLS certs and keys via CLI flags is only necessary for external
// PKI-based security, which is not yet production ready. Instead, see
// internal PKI (ipki) and Kerberos-based authentication.
TAG_FLAG(rpc_certificate_file, experimental);
TAG_FLAG(rpc_private_key_file, experimental);
TAG_FLAG(rpc_ca_certificate_file, experimental);
DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
"If an RPC connection from a client is idle for this amount of time, the server "
"will disconnect the client. Setting this to any negative value keeps connections "
"always alive.");
TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
using kudu::security::RpcAuthentication;
using kudu::security::RpcEncryption;
using std::ostringstream;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
class HostPortPB;
namespace server {
namespace {
bool ValidateKeytabPermissions() {
if (!FLAGS_keytab_file.empty() && !FLAGS_allow_world_readable_credentials) {
bool world_readable_keytab;
Status s = Env::Default()->IsFileWorldReadable(FLAGS_keytab_file, &world_readable_keytab);
if (!s.ok()) {
LOG(ERROR) << Substitute("$0: could not verify keytab file does not have world-readable "
"permissions: $1", FLAGS_keytab_file, s.ToString());
return false;
if (world_readable_keytab) {
LOG(ERROR) << "cannot use keytab file with world-readable permissions: "
<< FLAGS_keytab_file;
return false;
return true;
GROUP_FLAG_VALIDATOR(keytab_permissions, &ValidateKeytabPermissions);
} // namespace
static bool ValidateRpcAuthentication(const char* flag_name, const string& flag_value) {
security::RpcAuthentication result;
Status s = ParseTriState(flag_name, flag_value, &result);
if (!s.ok()) {
LOG(ERROR) << s.message().ToString();
return false;
return true;
DEFINE_validator(rpc_authentication, &ValidateRpcAuthentication);
static bool ValidateRpcEncryption(const char* flag_name, const string& flag_value) {
security::RpcEncryption result;
Status s = ParseTriState(flag_name, flag_value, &result);
if (!s.ok()) {
LOG(ERROR) << s.message().ToString();
return false;
return true;
DEFINE_validator(rpc_encryption, &ValidateRpcEncryption);
static bool ValidateRpcAuthnFlags() {
security::RpcAuthentication authentication;
CHECK_OK(ParseTriState("--rpc_authentication", FLAGS_rpc_authentication, &authentication));
security::RpcEncryption encryption;
CHECK_OK(ParseTriState("--rpc_encryption", FLAGS_rpc_encryption, &encryption));
if (encryption == RpcEncryption::DISABLED && authentication != RpcAuthentication::DISABLED) {
LOG(ERROR) << "RPC authentication (--rpc_authentication) must be disabled "
"if RPC encryption (--rpc_encryption) is disabled";
return false;
const bool has_keytab = !FLAGS_keytab_file.empty();
const bool has_cert = !FLAGS_rpc_certificate_file.empty();
if (authentication == RpcAuthentication::REQUIRED && !has_keytab && !has_cert) {
LOG(ERROR) << "RPC authentication (--rpc_authentication) may not be "
"required unless Kerberos (--keytab_file) or external PKI "
"(--rpc_certificate_file et al) are configured";
return false;
return true;
GROUP_FLAG_VALIDATOR(rpc_authn_flags, ValidateRpcAuthnFlags);
static bool ValidateExternalPkiFlags() {
bool has_cert = !FLAGS_rpc_certificate_file.empty();
bool has_key = !FLAGS_rpc_private_key_file.empty();
bool has_ca = !FLAGS_rpc_ca_certificate_file.empty();
if (has_cert != has_key || has_cert != has_ca) {
LOG(ERROR) << "--rpc_certificate_file, --rpc_private_key_file, and "
"--rpc_ca_certificate_file flags must be set as a group; "
"i.e. either set all or none of them.";
return false;
if (has_key && !FLAGS_allow_world_readable_credentials) {
bool world_readable_private_key;
Status s = Env::Default()->IsFileWorldReadable(FLAGS_rpc_private_key_file,
if (!s.ok()) {
LOG(ERROR) << Substitute("$0: could not verify private key file does not have "
"world-readable permissions: $1",
FLAGS_rpc_private_key_file, s.ToString());
return false;
if (world_readable_private_key) {
LOG(ERROR) << "cannot use private key file with world-readable permissions: "
<< FLAGS_rpc_private_key_file;
return false;
return true;
GROUP_FLAG_VALIDATOR(external_pki_flags, ValidateExternalPkiFlags);
namespace {
// Disambiguates between servers when in a minicluster.
AtomicInt<int32_t> mem_tracker_id_counter(-1);
shared_ptr<MemTracker> CreateMemTrackerForServer() {
int32_t id = mem_tracker_id_counter.Increment();
string id_str = "server";
if (id != 0) {
StrAppend(&id_str, " ", id);
return shared_ptr<MemTracker>(MemTracker::CreateTracker(-1, id_str));
} // anonymous namespace
ServerBase::ServerBase(string name, const ServerBaseOptions& options,
const string& metric_namespace)
: name_(std::move(name)),
minidump_handler_(new MinidumpExceptionHandler()),
metric_registry_(new MetricRegistry()),
rpc_server_(new RpcServer(options.rpc_opts)),
result_tracker_(new rpc::ResultTracker(shared_ptr<MemTracker>(
MemTracker::CreateTracker(-1, "result-tracker", mem_tracker_)))),
stop_background_threads_latch_(1) {
FsManagerOpts fs_opts;
fs_opts.metric_entity = metric_entity_;
fs_opts.parent_mem_tracker = mem_tracker_;
fs_opts.block_manager_type = options.fs_opts.block_manager_type;
fs_opts.wal_root = options.fs_opts.wal_root;
fs_opts.data_roots = options.fs_opts.data_roots;
fs_manager_.reset(new FsManager(options.env, std::move(fs_opts)));
if (FLAGS_use_hybrid_clock) {
clock_ = new clock::HybridClock();
} else {
clock_ = clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp);
if (FLAGS_webserver_enabled) {
web_server_.reset(new Webserver(options.webserver_opts));
CHECK_OK(StartThreadInstrumentation(metric_entity_, web_server_.get()));
ServerBase::~ServerBase() {
Sockaddr ServerBase::first_rpc_address() const {
vector<Sockaddr> addrs;
"Couldn't get bound RPC address");
CHECK(!addrs.empty()) << "Not bound";
return addrs[0];
Sockaddr ServerBase::first_http_address() const {
vector<Sockaddr> addrs;
"Couldn't get bound webserver addresses");
CHECK(!addrs.empty()) << "Not bound";
return addrs[0];
const NodeInstancePB& ServerBase::instance_pb() const {
return *DCHECK_NOTNULL(instance_pb_.get());
void ServerBase::GenerateInstanceID() {
instance_pb_.reset(new NodeInstancePB);
// TODO: maybe actually bump a sequence number on local disk instead of
// using time.
Status ServerBase::Init() {
glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
// Initialize the clock immediately. This checks that the clock is synchronized
// so we're less likely to get into a partially initialized state on disk during startup
// if we're having clock problems.
RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
fs::FsReport report;
Status s = fs_manager_->Open(&report);
if (s.IsNotFound()) {
LOG(INFO) << "Could not load existing FS layout: " << s.ToString();
LOG(INFO) << "Attempting to create new FS layout instead";
is_first_run_ = true;
s = fs_manager_->CreateInitialFileSystemLayout();
if (s.IsAlreadyPresent()) {
// The operator is likely trying to start up with an extra entry in their
// `fs_data_dirs` configuration.
LOG(INFO) << "To start Kudu with a different FS layout, the `kudu fs "
"update_dirs` tool must be run first";
return s.CloneAndPrepend("FS layout already exists; not overwriting existing layout");
RETURN_NOT_OK_PREPEND(s, "Could not create new FS layout");
s = fs_manager_->Open(&report);
RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
// Create the Messenger.
rpc::MessengerBuilder builder(name_);
.set_epki_cert_key_files(FLAGS_rpc_certificate_file, FLAGS_rpc_private_key_file)
if (options_.rpc_opts.rpc_reuseport) {
&ServerBase::ServiceQueueOverflowed, this, std::placeholders::_1));
RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");
return Status::OK();
Status ServerBase::InitAcls() {
string service_user;
boost::optional<string> keytab_user = security::GetLoggedInUsernameFromKeytab();
if (keytab_user) {
// If we're logged in from a keytab, then everyone should be, and we expect them
// to use the same mapped username.
service_user = *keytab_user;
} else {
// If we aren't logged in from a keytab, then just assume that the services
// will be running as the same Unix user as we are.
"could not deterine local username");
// If the user has specified a superuser acl, use that. Otherwise, assume
// that the same user running the service acts as superuser.
if (!FLAGS_superuser_acl.empty()) {
"could not parse --superuser_acl flag");
} else {
superuser_acl_.Reset({ service_user });
"could not parse --user_acl flag");
// For the "service" ACL, we currently don't allow it to be user-configured,
// but instead assume that all of the services will be running the same
// way.
service_acl_.Reset({ service_user });
return Status::OK();
Status ServerBase::GetStatusPB(ServerStatusPB* status) const {
// Node instance
// RPC ports
vector<Sockaddr> addrs;
"could not get bound RPC addresses");
for (const Sockaddr& addr : addrs) {
HostPort hp;
RETURN_NOT_OK_PREPEND(HostPortFromSockaddrReplaceWildcard(addr, &hp),
"could not get RPC hostport");
HostPortPB* pb = status->add_bound_rpc_addresses();
"could not convert RPC hostport");
// HTTP ports
if (web_server_) {
vector<Sockaddr> addrs;
"could not get bound web addresses");
for (const Sockaddr& addr : addrs) {
HostPort hp;
RETURN_NOT_OK_PREPEND(HostPortFromSockaddrReplaceWildcard(addr, &hp),
"could not get web hostport");
HostPortPB* pb = status->add_bound_http_addresses();
"could not convert web hostport");
return Status::OK();
void ServerBase::LogUnauthorizedAccess(rpc::RpcContext* rpc) const {
LOG(WARNING) << "Unauthorized access attempt to method "
<< rpc->service_name() << "." << rpc->method_name()
<< " from " << rpc->requestor_string();
bool ServerBase::Authorize(rpc::RpcContext* rpc, uint32_t allowed_roles) {
if ((allowed_roles & SUPER_USER) &&
superuser_acl_.UserAllowed(rpc->remote_user().username())) {
return true;
if ((allowed_roles & USER) &&
user_acl_.UserAllowed(rpc->remote_user().username())) {
return true;
if ((allowed_roles & SERVICE_USER) &&
service_acl_.UserAllowed(rpc->remote_user().username())) {
return true;
rpc->RespondFailure(Status::NotAuthorized("unauthorized access to method",
return false;
Status ServerBase::DumpServerInfo(const string& path,
const string& format) const {
ServerStatusPB status;
RETURN_NOT_OK_PREPEND(GetStatusPB(&status), "could not get server status");
if (boost::iequals(format, "json")) {
string json = JsonWriter::ToJson(status, JsonWriter::PRETTY);
RETURN_NOT_OK(WriteStringToFile(options_.env, Slice(json), path));
} else if (boost::iequals(format, "pb")) {
// TODO: Use PB container format?
RETURN_NOT_OK(pb_util::WritePBToPath(options_.env, path, status,
pb_util::NO_SYNC)); // durability doesn't matter
} else {
return Status::InvalidArgument("bad format", format);
LOG(INFO) << "Dumped server information to " << path;
return Status::OK();
Status ServerBase::RegisterService(gscoped_ptr<rpc::ServiceIf> rpc_impl) {
return rpc_server_->RegisterService(std::move(rpc_impl));
Status ServerBase::StartMetricsLogging() {
if (options_.metrics_log_interval_ms <= 0) {
return Status::OK();
if (FLAGS_log_dir.empty()) {
LOG(INFO) << "Not starting metrics log since no log directory was specified.";
return Status::OK();
unique_ptr<DiagnosticsLog> l(new DiagnosticsLog(FLAGS_log_dir, metric_registry_.get()));
diag_log_ = std::move(l);
return Status::OK();
Status ServerBase::StartExcessLogFileDeleterThread() {
// Try synchronously deleting excess log files once at startup to make sure it
// works, then start a background thread to continue deleting them in the
// future. Same with minidumps.
if (!FLAGS_logtostderr) {
"Unable to delete excess log files");
"Unable to delete excess minidump files");
return Thread::Create("server", "excess-log-deleter", &ServerBase::ExcessLogFileDeleterThread,
this, &excess_log_deleter_thread_);
void ServerBase::ExcessLogFileDeleterThread() {
// How often to attempt to clean up excess glog and minidump files.
const MonoDelta kWait = MonoDelta::FromSeconds(60);
while (!stop_background_threads_latch_.WaitUntil(MonoTime::Now() + kWait)) {
WARN_NOT_OK(DeleteExcessLogFiles(options_.env), "Unable to delete excess log files");
"Unable to delete excess minidump files");
std::string ServerBase::FooterHtml() const {
return Substitute("<pre>$0\nserver uuid $1</pre>",
Status ServerBase::Start() {
new GenericServiceImpl(this))));
if (web_server_) {
AddRpczPathHandlers(messenger_, web_server_.get());
RegisterMetricsJsonHandler(web_server_.get(), metric_registry_.get());
if (!options_.dump_info_path.empty()) {
RETURN_NOT_OK_PREPEND(DumpServerInfo(options_.dump_info_path, options_.dump_info_format),
"Failed to dump server info to " + options_.dump_info_path);
return Status::OK();
void ServerBase::Shutdown() {
// First, stop accepting incoming requests and wait for any outstanding
// requests to finish processing.
// Note: prior to Messenger::Shutdown, it is assumed that any incoming RPCs
// deferred from reactor threads have already been cleaned up.
if (web_server_) {
if (messenger_) {
// Next, shut down remaining server components.
if (diag_log_) {
if (excess_log_deleter_thread_) {
void ServerBase::UnregisterAllServices() {
void ServerBase::ServiceQueueOverflowed(rpc::ServicePool* service) {
if (!diag_log_) return;
// Logging all of the stacks is relatively heavy-weight, so if we are in a persistent
// state of overload, it's probably not a good idea to start compounding the issue with
// a lot of stack-logging activity. So, we limit the frequency of stack-dumping.
static logging::LogThrottler throttler;
const int kStackDumpFrequencySecs = 5;
int suppressed = 0;
if (PREDICT_TRUE(!throttler.ShouldLog(kStackDumpFrequencySecs, "", &suppressed))) {
diag_log_->DumpStacksNow(Substitute("service queue overflowed for $0",
} // namespace server
} // namespace kudu