blob: d39547ad20200a20107c2c79a946a77632ac7133 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/master_service.h"
#include <gflags/gflags.h>
#include <memory>
#include <string>
#include <vector>
#include "kudu/common/wire_protocol.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/webserver.h"
#include "kudu/util/flag_tags.h"
DEFINE_int32(master_inject_latency_on_tablet_lookups_ms, 0,
"Number of milliseconds that the master will sleep before responding to "
"requests for tablet locations.");
TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, unsafe);
TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, hidden);
namespace kudu {
namespace master {
using consensus::RaftPeerPB;
using std::string;
using std::vector;
using std::shared_ptr;
using strings::Substitute;
namespace {
// If 's' is not OK and 'resp' has no application specific error set,
// set the error field of 'resp' to match 's' and set the code to
// UNKNOWN_ERROR.
template<class RespClass>
void CheckRespErrorOrSetUnknown(const Status& s, RespClass* resp) {
if (PREDICT_FALSE(!s.ok() && !resp->has_error())) {
StatusToPB(s, resp->mutable_error()->mutable_status());
resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR);
}
}
} // anonymous namespace
MasterServiceImpl::MasterServiceImpl(Master* server)
: MasterServiceIf(server->metric_entity(), server->result_tracker()),
server_(server) {
}
void MasterServiceImpl::Ping(const PingRequestPB* req,
PingResponsePB* resp,
rpc::RpcContext* rpc) {
rpc->RespondSuccess();
}
void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
TSHeartbeatResponsePB* resp,
rpc::RpcContext* rpc) {
// If CatalogManager is not initialized don't even know whether
// or not we will be a leader (so we can't tell whether or not we can
// accept tablet reports).
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedOrRespond(resp, rpc)) {
return;
}
bool is_leader_master = l.leader_status().ok();
// 2. All responses contain this.
resp->mutable_master_instance()->CopyFrom(server_->instance_pb());
resp->set_leader_master(is_leader_master);
// 3. Register or look up the tserver.
shared_ptr<TSDescriptor> ts_desc;
if (req->has_registration()) {
Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
req->registration(),
&ts_desc);
if (!s.ok()) {
LOG(WARNING) << Substitute("Unable to register tserver ($0): $1",
rpc->requestor_string(), s.ToString());
// TODO: add service-specific errors
rpc->RespondFailure(s);
return;
}
} else {
Status s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
if (s.IsNotFound()) {
LOG(INFO) << Substitute("Got heartbeat from unknown tserver ($0) as $1; "
"Asking this server to re-register.",
req->common().ts_instance().ShortDebugString(), rpc->requestor_string());
resp->set_needs_reregister(true);
// Don't bother asking for a full tablet report if we're a follower;
// it'll just get ignored anyway.
resp->set_needs_full_tablet_report(is_leader_master);
rpc->RespondSuccess();
return;
} else if (!s.ok()) {
LOG(WARNING) << Substitute("Unable to look up tserver for heartbeat "
"request $0 from $1: $2", req->DebugString(),
rpc->requestor_string(), s.ToString());
rpc->RespondFailure(s.CloneAndPrepend("Unable to lookup tserver"));
return;
}
}
// 4. Update tserver soft state based on the heartbeat contents.
ts_desc->UpdateHeartbeatTime();
ts_desc->set_num_live_replicas(req->num_live_tablets());
// 5. Only leaders handle tablet reports.
if (is_leader_master && req->has_tablet_report()) {
Status s = server_->catalog_manager()->ProcessTabletReport(
ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), rpc);
if (!s.ok()) {
rpc->RespondFailure(s.CloneAndPrepend("Failed to process tablet report"));
return;
}
}
rpc->RespondSuccess();
}
void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* req,
GetTabletLocationsResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
}
TSRegistrationPB reg;
vector<TSDescriptor*> locs;
for (const string& tablet_id : req->tablet_ids()) {
// TODO: once we have catalog data. ACL checks would also go here, probably.
TabletLocationsPB* locs_pb = resp->add_tablet_locations();
Status s = server_->catalog_manager()->GetTabletLocations(tablet_id, locs_pb);
if (!s.ok()) {
resp->mutable_tablet_locations()->RemoveLast();
GetTabletLocationsResponsePB::Error* err = resp->add_errors();
err->set_tablet_id(tablet_id);
StatusToPB(s, err->mutable_status());
}
}
rpc->RespondSuccess();
}
void MasterServiceImpl::CreateTable(const CreateTableRequestPB* req,
CreateTableResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->CreateTable(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
IsCreateTableDoneResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->IsCreateTableDone(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req,
DeleteTableResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->DeleteTable(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req,
AlterTableResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->AlterTable(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
IsAlterTableDoneResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->IsAlterTableDone(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
ListTablesResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->ListTables(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
GetTableLocationsResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
}
Status s = server_->catalog_manager()->GetTableLocations(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
GetTableSchemaResponsePB* resp,
rpc::RpcContext* rpc) {
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->GetTableSchema(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
ListTabletServersResponsePB* resp,
rpc::RpcContext* rpc) {
vector<std::shared_ptr<TSDescriptor> > descs;
server_->ts_manager()->GetAllDescriptors(&descs);
for (const std::shared_ptr<TSDescriptor>& desc : descs) {
ListTabletServersResponsePB::Entry* entry = resp->add_servers();
desc->GetNodeInstancePB(entry->mutable_instance_id());
desc->GetRegistration(entry->mutable_registration());
entry->set_millis_since_heartbeat(desc->TimeSinceHeartbeat().ToMilliseconds());
}
rpc->RespondSuccess();
}
void MasterServiceImpl::ListMasters(const ListMastersRequestPB* req,
ListMastersResponsePB* resp,
rpc::RpcContext* rpc) {
vector<ServerEntryPB> masters;
Status s = server_->ListMasters(&masters);
if (!s.ok()) {
StatusToPB(s, resp->mutable_error());
resp->mutable_error()->set_code(AppStatusPB::UNKNOWN_ERROR);
} else {
for (const ServerEntryPB& master : masters) {
resp->add_masters()->CopyFrom(master);
}
}
rpc->RespondSuccess();
}
void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequestPB* req,
GetMasterRegistrationResponsePB* resp,
rpc::RpcContext* rpc) {
// instance_id must always be set in order for status pages to be useful.
resp->mutable_instance_id()->CopyFrom(server_->instance_pb());
CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
if (!l.CheckIsInitializedOrRespond(resp, rpc)) {
return;
}
Status s = server_->GetMasterRegistration(resp->mutable_registration());
CheckRespErrorOrSetUnknown(s, resp);
resp->set_role(server_->catalog_manager()->Role());
rpc->RespondSuccess();
}
bool MasterServiceImpl::SupportsFeature(uint32_t feature) const {
switch (feature) {
case MasterFeatures::RANGE_PARTITION_BOUNDS:
case MasterFeatures::ADD_DROP_RANGE_PARTITIONS: return true;
default: return false;
}
}
} // namespace master
} // namespace kudu