blob: 1a46c4f7691fb06daa325867cd82cecda540c680 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/kserver/kserver.h"
#include <algorithm>
#include <cstdint>
#include <initializer_list>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"
DEFINE_int32(server_thread_pool_max_thread_count, -1,
"Maximum number of threads to allow in each server-wide thread "
"pool. If -1, Kudu will automatically calculate this value. It "
"is an error to use a value of 0.");
TAG_FLAG(server_thread_pool_max_thread_count, advanced);
TAG_FLAG(server_thread_pool_max_thread_count, evolving);
static bool ValidateThreadPoolThreadLimit(const char* /*flagname*/, int32_t value) {
if (value == 0 || value < -1) {
LOG(ERROR) << "Invalid thread pool thread limit: cannot be " << value;
return false;
}
return true;
}
DEFINE_validator(server_thread_pool_max_thread_count, &ValidateThreadPoolThreadLimit);
using kudu::server::ServerBaseOptions;
using std::string;
using strings::Substitute;
namespace kudu {
namespace kserver {
METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
MetricUnit::kTasks,
"Number of operations waiting to be applied to the tablet. "
"High queue lengths indicate that the server is unable to process "
"operations as fast as they are being written to the WAL.",
kudu::MetricLevel::kWarn,
10000, 2);
METRIC_DEFINE_histogram(server, op_apply_queue_time, "Operation Apply Queue Time",
MetricUnit::kMicroseconds,
"Time that operations spent waiting in the apply queue before being "
"processed. High queue times indicate that the server is unable to "
"process operations as fast as they are being written to the WAL.",
kudu::MetricLevel::kWarn,
10000000, 2);
METRIC_DEFINE_histogram(server, op_apply_run_time, "Operation Apply Run Time",
MetricUnit::kMicroseconds,
"Time that operations spent being applied to the tablet. "
"High values may indicate that the server is under-provisioned or "
"that operations consist of very large batches.",
kudu::MetricLevel::kWarn,
10000000, 2);
namespace {
int GetThreadPoolThreadLimit(Env* env) {
// Maximize this process' running thread limit first, if possible.
static std::once_flag once;
std::call_once(once, [&]() {
env->IncreaseResourceLimit(Env::ResourceLimitType::RUNNING_THREADS_PER_EUID);
});
uint64_t rlimit = env->GetResourceLimit(Env::ResourceLimitType::RUNNING_THREADS_PER_EUID);
// See server_thread_pool_max_thread_count.
if (FLAGS_server_thread_pool_max_thread_count == -1) {
// Use both pid_max and threads-max as possible upper bounds.
faststring buf;
uint64_t buf_val;
for (const auto& proc_file : { "/proc/sys/kernel/pid_max",
"/proc/sys/kernel/threads-max" }) {
if (ReadFileToString(env, proc_file, &buf).ok() &&
safe_strtou64(buf.ToString(), &buf_val)) {
rlimit = std::min(rlimit, buf_val);
}
}
// Callers of this function expect a signed 32-bit integer, so we need to
// further cap the limit just in case it's too large.
rlimit = std::min(rlimit, static_cast<uint64_t>(kint32max));
// Take only 10% of the calculated limit; we don't want to hog the system.
return static_cast<int32_t>(rlimit) / 10;
}
LOG_IF(FATAL, FLAGS_server_thread_pool_max_thread_count > rlimit) <<
Substitute(
"Configured server-wide thread pool running thread limit "
"(server_thread_pool_max_thread_count) $0 exceeds euid running "
"thread limit (ulimit) $1",
FLAGS_server_thread_pool_max_thread_count, rlimit);
return FLAGS_server_thread_pool_max_thread_count;
}
} // anonymous namespace
KuduServer::KuduServer(string name,
const ServerBaseOptions& options,
const string& metric_namespace)
: ServerBase(std::move(name), options, metric_namespace) {
}
Status KuduServer::Init() {
RETURN_NOT_OK(ServerBase::Init());
ThreadPoolMetrics metrics = {
METRIC_op_apply_queue_length.Instantiate(metric_entity_),
METRIC_op_apply_queue_time.Instantiate(metric_entity_),
METRIC_op_apply_run_time.Instantiate(metric_entity_)
};
RETURN_NOT_OK(ThreadPoolBuilder("apply")
.set_metrics(std::move(metrics))
.Build(&tablet_apply_pool_));
// These pools are shared by all replicas hosted by this server, and thus
// are capped at a portion of the overall per-euid thread resource limit.
int server_wide_pool_limit = GetThreadPoolThreadLimit(fs_manager_->env());
RETURN_NOT_OK(ThreadPoolBuilder("prepare")
.set_max_threads(server_wide_pool_limit)
.Build(&tablet_prepare_pool_));
RETURN_NOT_OK(ThreadPoolBuilder("raft")
.set_trace_metric_prefix("raft")
.set_max_threads(server_wide_pool_limit)
.Build(&raft_pool_));
return Status::OK();
}
Status KuduServer::Start() {
return ServerBase::Start();
}
void KuduServer::Shutdown() {
// Shut down the messenger early, waiting for any reactor threads to finish
// running. This ensures that any ref-counted objects inside closures run by
// reactor threads will be destroyed before we shut down server-wide thread
// pools below, which is important because those objects may own tokens
// belonging to the pools.
//
// Note: prior to this call, it is assumed that any incoming RPCs deferred
// from reactor threads have already been cleaned up.
if (messenger_) {
messenger_->Shutdown();
}
// The shutdown order here shouldn't matter; shutting down the messenger
// first ensures that all outstanding RaftConsensus instances are destroyed.
// Thus, there shouldn't be lingering activity on any of these pools.
if (raft_pool_) {
raft_pool_->Shutdown();
}
if (tablet_apply_pool_) {
tablet_apply_pool_->Shutdown();
}
if (tablet_prepare_pool_) {
tablet_prepare_pool_->Shutdown();
}
ServerBase::Shutdown();
}
} // namespace kserver
} // namespace kudu