blob: 56d4fd18ecdbe1bebca042864ed9f4634642df49 [file] [log] [blame]
// Copyright 2013 Cloudera, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "kudu/server/server_base.h"
#include <boost/foreach.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <gflags/gflags.h>
#include <string>
#include <vector>
#include "kudu/codegen/compilation_manager.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/messenger.h"
#include "kudu/server/default-path-handlers.h"
#include "kudu/server/generic_service.h"
#include "kudu/server/glog_metrics.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/server/logical_clock.h"
#include "kudu/server/rpc_server.h"
#include "kudu/server/tcmalloc_metrics.h"
#include "kudu/server/webserver.h"
#include "kudu/server/rpcz-path-handler.h"
#include "kudu/server/server_base_options.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/tracing-path-handlers.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/rolling_log.h"
#include "kudu/util/spinlock_profiling.h"
#include "kudu/util/thread.h"
#include "kudu/util/version_info.h"
DEFINE_int32(num_reactor_threads, 4, "Number of libev reactor threads to start.");
TAG_FLAG(num_reactor_threads, advanced);
DECLARE_bool(use_hybrid_clock);
using std::string;
using std::stringstream;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace server {
namespace {
// Disambiguates between servers when in a minicluster.
AtomicInt<int32_t> mem_tracker_id_counter(-1);
shared_ptr<MemTracker> CreateMemTrackerForServer() {
int32_t id = mem_tracker_id_counter.Increment();
string id_str = "server";
if (id != 0) {
StrAppend(&id_str, " ", id);
}
return shared_ptr<MemTracker>(MemTracker::CreateTracker(-1, id_str));
}
} // anonymous namespace
ServerBase::ServerBase(const string& name,
const ServerBaseOptions& options,
const string& metric_namespace)
: name_(name),
mem_tracker_(CreateMemTrackerForServer()),
metric_registry_(new MetricRegistry()),
metric_entity_(METRIC_ENTITY_server.Instantiate(metric_registry_.get(), metric_namespace)),
rpc_server_(new RpcServer(options.rpc_opts)),
web_server_(new Webserver(options.webserver_opts)),
is_first_run_(false),
options_(options),
stop_metrics_logging_latch_(1) {
FsManagerOpts fs_opts;
fs_opts.metric_entity = metric_entity_;
fs_opts.parent_mem_tracker = mem_tracker_;
fs_opts.wal_path = options.fs_opts.wal_path;
fs_opts.data_paths = options.fs_opts.data_paths;
fs_manager_.reset(new FsManager(options.env, fs_opts));
if (FLAGS_use_hybrid_clock) {
clock_ = new HybridClock();
} else {
clock_ = LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp);
}
CHECK_OK(StartThreadInstrumentation(metric_entity_, web_server_.get()));
CHECK_OK(codegen::CompilationManager::GetSingleton()->StartInstrumentation(
metric_entity_));
}
ServerBase::~ServerBase() {
Shutdown();
mem_tracker_->UnregisterFromParent();
}
Sockaddr ServerBase::first_rpc_address() const {
vector<Sockaddr> addrs;
WARN_NOT_OK(rpc_server_->GetBoundAddresses(&addrs),
"Couldn't get bound RPC address");
CHECK(!addrs.empty()) << "Not bound";
return addrs[0];
}
Sockaddr ServerBase::first_http_address() const {
vector<Sockaddr> addrs;
WARN_NOT_OK(web_server_->GetBoundAddresses(&addrs),
"Couldn't get bound webserver addresses");
CHECK(!addrs.empty()) << "Not bound";
return addrs[0];
}
const NodeInstancePB& ServerBase::instance_pb() const {
return *DCHECK_NOTNULL(instance_pb_.get());
}
void ServerBase::GenerateInstanceID() {
instance_pb_.reset(new NodeInstancePB);
instance_pb_->set_permanent_uuid(fs_manager_->uuid());
// TODO: maybe actually bump a sequence number on local disk instead of
// using time.
instance_pb_->set_instance_seqno(Env::Default()->NowMicros());
}
Status ServerBase::Init() {
glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
tcmalloc::RegisterMetrics(metric_entity_);
RegisterSpinLockContentionMetrics(metric_entity_);
InitSpinLockContentionProfiling();
// Check for NTP errors so it's less likely to get into a partially
// initialized state on disk during startup.
// TODO: Remove this check after fully fixing KUDU-1186.
if (FLAGS_use_hybrid_clock) {
RETURN_NOT_OK_PREPEND(CheckClockSynchronized(), "Clock is not synchronized");
}
Status s = fs_manager_->Open();
if (s.IsNotFound()) {
LOG(INFO) << "Could not load existing FS layout: " << s.ToString();
LOG(INFO) << "Creating new FS layout";
is_first_run_ = true;
RETURN_NOT_OK_PREPEND(fs_manager_->CreateInitialFileSystemLayout(),
"Could not create new FS layout");
s = fs_manager_->Open();
}
RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
// Create the Messenger.
rpc::MessengerBuilder builder(name_);
builder.set_num_reactors(FLAGS_num_reactor_threads);
builder.set_metric_entity(metric_entity());
RETURN_NOT_OK(builder.Build(&messenger_));
RETURN_NOT_OK(rpc_server_->Init(messenger_));
RETURN_NOT_OK(rpc_server_->Bind());
RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
clock_->RegisterMetrics(metric_entity_);
RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");
return Status::OK();
}
void ServerBase::GetStatusPB(ServerStatusPB* status) const {
// Node instance
status->mutable_node_instance()->CopyFrom(*instance_pb_);
// RPC ports
{
vector<Sockaddr> addrs;
CHECK_OK(rpc_server_->GetBoundAddresses(&addrs));
BOOST_FOREACH(const Sockaddr& addr, addrs) {
HostPortPB* pb = status->add_bound_rpc_addresses();
pb->set_host(addr.host());
pb->set_port(addr.port());
}
}
// HTTP ports
{
vector<Sockaddr> addrs;
CHECK_OK(web_server_->GetBoundAddresses(&addrs));
BOOST_FOREACH(const Sockaddr& addr, addrs) {
HostPortPB* pb = status->add_bound_http_addresses();
pb->set_host(addr.host());
pb->set_port(addr.port());
}
}
VersionInfo::GetVersionInfoPB(status->mutable_version_info());
}
Status ServerBase::DumpServerInfo(const string& path,
const string& format) const {
ServerStatusPB status;
GetStatusPB(&status);
if (boost::iequals(format, "json")) {
string json = JsonWriter::ToJson(status, JsonWriter::PRETTY);
RETURN_NOT_OK(WriteStringToFile(options_.env, Slice(json), path));
} else if (boost::iequals(format, "pb")) {
// TODO: Use PB container format?
RETURN_NOT_OK(pb_util::WritePBToPath(options_.env, path, status,
pb_util::NO_SYNC)); // durability doesn't matter
} else {
return Status::InvalidArgument("bad format", format);
}
LOG(INFO) << "Dumped server information to " << path;
return Status::OK();
}
Status ServerBase::RegisterService(gscoped_ptr<rpc::ServiceIf> rpc_impl) {
return rpc_server_->RegisterService(rpc_impl.Pass());
}
Status ServerBase::StartMetricsLogging() {
if (options_.metrics_log_interval_ms <= 0) {
return Status::OK();
}
return Thread::Create("server", "metrics-logger", &ServerBase::MetricsLoggingThread,
this, &metrics_logging_thread_);
}
void ServerBase::MetricsLoggingThread() {
RollingLog log(Env::Default(), FLAGS_log_dir, "metrics");
// How long to wait before trying again if we experience a failure
// logging metrics.
const MonoDelta kWaitBetweenFailures = MonoDelta::FromSeconds(60);
MonoTime next_log = MonoTime::Now(MonoTime::FINE);
while (!stop_metrics_logging_latch_.WaitUntil(next_log)) {
next_log = MonoTime::Now(MonoTime::FINE);
next_log.AddDelta(MonoDelta::FromMilliseconds(options_.metrics_log_interval_ms));
std::stringstream buf;
buf << "metrics " << GetCurrentTimeMicros() << " ";
// Collect the metrics JSON string.
vector<string> metrics;
metrics.push_back("*");
MetricJsonOptions opts;
opts.include_raw_histograms = true;
JsonWriter writer(&buf, JsonWriter::COMPACT);
Status s = metric_registry_->WriteAsJson(&writer, metrics, opts);
if (!s.ok()) {
WARN_NOT_OK(s, "Unable to collect metrics to log");
next_log.AddDelta(kWaitBetweenFailures);
continue;
}
buf << "\n";
s = log.Append(buf.str());
if (!s.ok()) {
WARN_NOT_OK(s, "Unable to write metrics to log");
next_log.AddDelta(kWaitBetweenFailures);
continue;
}
}
WARN_NOT_OK(log.Close(), "Unable to close metric log");
}
std::string ServerBase::FooterHtml() const {
return Substitute("<pre>$0\nserver uuid $1</pre>",
VersionInfo::GetShortVersionString(),
instance_pb_->permanent_uuid());
}
Status ServerBase::Start() {
GenerateInstanceID();
RETURN_NOT_OK(RegisterService(make_gscoped_ptr<rpc::ServiceIf>(
new GenericServiceImpl(this))));
RETURN_NOT_OK(rpc_server_->Start());
AddDefaultPathHandlers(web_server_.get());
AddRpczPathHandlers(messenger_, web_server_.get());
RegisterMetricsJsonHandler(web_server_.get(), metric_registry_.get());
TracingPathHandlers::RegisterHandlers(web_server_.get());
web_server_->set_footer_html(FooterHtml());
RETURN_NOT_OK(web_server_->Start());
if (!options_.dump_info_path.empty()) {
RETURN_NOT_OK_PREPEND(DumpServerInfo(options_.dump_info_path, options_.dump_info_format),
"Failed to dump server info to " + options_.dump_info_path);
}
return Status::OK();
}
void ServerBase::Shutdown() {
if (metrics_logging_thread_) {
stop_metrics_logging_latch_.CountDown();
metrics_logging_thread_->Join();
}
web_server_->Stop();
rpc_server_->Shutdown();
}
} // namespace server
} // namespace kudu