blob: 699cc8ed6077f091edd667984b8df268907cc3b9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/tablet_server.h"
#include <cstddef>
#include <ostream>
#include <type_traits>
#include <utility>
#include <glog/logging.h>
#include "kudu/cfile/block_cache.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/service_if.h"
#include "kudu/tserver/heartbeater.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_copy_service.h"
#include "kudu/tserver/tablet_service.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver_path_handlers.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
using std::string;
using kudu::fs::ErrorHandlerType;
using kudu::rpc::ServiceIf;
namespace kudu {
namespace tserver {
TabletServer::TabletServer(const TabletServerOptions& opts)
: KuduServer("TabletServer", opts, "kudu.tabletserver"),
initted_(false),
fail_heartbeats_for_tests_(false),
opts_(opts),
tablet_manager_(new TSTabletManager(this)),
scanner_manager_(new ScannerManager(metric_entity())),
path_handlers_(new TabletServerPathHandlers(this)) {
}
TabletServer::~TabletServer() {
Shutdown();
}
string TabletServer::ToString() const {
// TODO: include port numbers, etc.
return "TabletServer";
}
Status TabletServer::ValidateMasterAddressResolution() const {
for (const HostPort& master_addr : opts_.master_addresses) {
RETURN_NOT_OK_PREPEND(master_addr.ResolveAddresses(NULL),
strings::Substitute(
"Couldn't resolve master service address '$0'",
master_addr.ToString()));
}
return Status::OK();
}
Status TabletServer::Init() {
CHECK(!initted_);
cfile::BlockCache::GetSingleton()->StartInstrumentation(metric_entity());
// Validate that the passed master address actually resolves.
// We don't validate that we can connect at this point -- it should
// be allowed to start the TS and the master in whichever order --
// our heartbeat thread will loop until successfully connecting.
RETURN_NOT_OK(ValidateMasterAddressResolution());
RETURN_NOT_OK(KuduServer::Init());
if (web_server_) {
RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
}
maintenance_manager_.reset(new MaintenanceManager(
MaintenanceManager::kDefaultOptions, fs_manager_->uuid()));
heartbeater_.reset(new Heartbeater(opts_, this));
RETURN_NOT_OK_PREPEND(tablet_manager_->Init(),
"Could not init Tablet Manager");
RETURN_NOT_OK_PREPEND(scanner_manager_->StartRemovalThread(),
"Could not start expired Scanner removal thread");
initted_ = true;
return Status::OK();
}
Status TabletServer::WaitInited() {
return tablet_manager_->WaitForAllBootstrapsToFinish();
}
Status TabletServer::Start() {
CHECK(initted_);
fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
Bind(&TSTabletManager::FailTabletsInDataDir, Unretained(tablet_manager_.get())));
fs_manager_->SetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION,
Bind(&TSTabletManager::FailTabletAndScheduleShutdown, Unretained(tablet_manager_.get())));
gscoped_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
gscoped_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
gscoped_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(this, tablet_manager_.get()));
gscoped_ptr<ServiceIf> tablet_copy_service(new TabletCopyServiceImpl(
this, tablet_manager_.get()));
RETURN_NOT_OK(RegisterService(std::move(ts_service)));
RETURN_NOT_OK(RegisterService(std::move(admin_service)));
RETURN_NOT_OK(RegisterService(std::move(consensus_service)));
RETURN_NOT_OK(RegisterService(std::move(tablet_copy_service)));
RETURN_NOT_OK(KuduServer::Start());
RETURN_NOT_OK(heartbeater_->Start());
RETURN_NOT_OK(maintenance_manager_->Start());
google::FlushLogFiles(google::INFO); // Flush the startup messages.
return Status::OK();
}
void TabletServer::Shutdown() {
if (initted_) {
string name = ToString();
LOG(INFO) << name << " shutting down...";
// 1. Stop accepting new RPCs.
UnregisterAllServices();
// 2. Shut down the tserver's subsystems.
maintenance_manager_->Shutdown();
WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK_ERROR);
fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION);
tablet_manager_->Shutdown();
// 3. Shut down generic subsystems.
KuduServer::Shutdown();
LOG(INFO) << name << " shutdown complete.";
}
}
} // namespace tserver
} // namespace kudu