blob: 4ea77d543ad3a576f7f6a00d316ce5a1a4b07ebe [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/master_service.h"
#include <gflags/gflags.h>
#include <memory>
#include <string>
#include <vector>
#include "kudu/common/wire_protocol.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/webserver.h"
#include "kudu/util/flag_tags.h"
DEFINE_int32(master_inject_latency_on_tablet_lookups_ms, 0,
"Number of milliseconds that the master will sleep before responding to "
"requests for tablet locations.");
TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, unsafe);
TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, hidden);
namespace kudu {
namespace master {
using consensus::RaftPeerPB;
using std::string;
using std::vector;
using std::shared_ptr;
namespace {
template<class RespClass>
bool CheckCatalogManagerInitializedOrRespond(Master* master,
RespClass* resp,
rpc::RpcContext* rpc) {
if (PREDICT_FALSE(!master->catalog_manager()->IsInitialized())) {
SetupErrorAndRespond(resp->mutable_error(),
Status::ServiceUnavailable("catalog manager has not been initialized"),
MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED,
rpc);
return false;
}
return true;
}
template<class RespClass>
bool CheckIsLeaderOrRespond(Master* master,
RespClass* resp,
rpc::RpcContext* rpc) {
Status s = master->catalog_manager()->CheckIsLeaderAndReady();
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
MasterErrorPB::NOT_THE_LEADER,
rpc);
return false;
}
return true;
}
template<class RespClass>
bool CheckLeaderAndCatalogManagerInitializedOrRespond(Master* master,
RespClass* resp,
rpc::RpcContext* rpc) {
return PREDICT_TRUE(CheckCatalogManagerInitializedOrRespond(master, resp, rpc) &&
CheckIsLeaderOrRespond(master, resp, rpc));
}
// If 's' is not OK and 'resp' has no application specific error set,
// set the error field of 'resp' to match 's' and set the code to
// UNKNOWN_ERROR.
template<class RespClass>
void CheckRespErrorOrSetUnknown(const Status& s, RespClass* resp) {
if (PREDICT_FALSE(!s.ok() && !resp->has_error())) {
StatusToPB(s, resp->mutable_error()->mutable_status());
resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR);
}
}
} // anonymous namespace
static void SetupErrorAndRespond(MasterErrorPB* error,
const Status& s,
MasterErrorPB::Code code,
rpc::RpcContext* rpc) {
StatusToPB(s, error->mutable_status());
error->set_code(code);
// TODO RespondSuccess() is better called 'Respond'.
rpc->RespondSuccess();
}
MasterServiceImpl::MasterServiceImpl(Master* server)
: MasterServiceIf(server->metric_entity()),
server_(server) {
}
void MasterServiceImpl::Ping(const PingRequestPB* req,
PingResponsePB* resp,
rpc::RpcContext* rpc) {
rpc->RespondSuccess();
}
void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
TSHeartbeatResponsePB* resp,
rpc::RpcContext* rpc) {
// If CatalogManager is not initialized don't even know whether
// or not we will be a leader (so we can't tell whether or not we can
// accept tablet reports).
if (!CheckCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
resp->mutable_master_instance()->CopyFrom(server_->instance_pb());
Status s = server_->catalog_manager()->CheckIsLeaderAndReady();
if (!s.ok()) {
// For the time being, ignore heartbeats sent to non-leader distributed
// masters.
//
// TODO KUDU-493 Allow all master processes to receive heartbeat
// information: by having the TabletServers send heartbeats to all
// masters, or by storing heartbeat information in a replicated
// SysTable.
LOG(WARNING) << "Received a heartbeat, but this Master instance is not a leader or a "
<< "single Master: " << s.ToString();
resp->set_leader_master(false);
rpc->RespondSuccess();
return;
}
resp->set_leader_master(true);
shared_ptr<TSDescriptor> ts_desc;
// If the TS is registering, register in the TS manager.
if (req->has_registration()) {
Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
req->registration(),
&ts_desc);
if (!s.ok()) {
LOG(WARNING) << "Unable to register tablet server (" << rpc->requestor_string() << "): "
<< s.ToString();
// TODO: add service-specific errors
rpc->RespondFailure(s);
return;
}
}
// TODO: KUDU-86 if something fails after this point the TS will not be able
// to register again.
// Look up the TS -- if it just registered above, it will be found here.
// This allows the TS to register and tablet-report in the same RPC.
s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
if (s.IsNotFound()) {
LOG(INFO) << "Got heartbeat from unknown tablet server { "
<< req->common().ts_instance().ShortDebugString()
<< " } as " << rpc->requestor_string()
<< "; Asking this server to re-register.";
resp->set_needs_reregister(true);
resp->set_needs_full_tablet_report(true);
rpc->RespondSuccess();
return;
} else if (!s.ok()) {
LOG(WARNING) << "Unable to look up tablet server for heartbeat request "
<< req->DebugString() << " from " << rpc->requestor_string()
<< "\nStatus: " << s.ToString();
rpc->RespondFailure(s.CloneAndPrepend("Unable to lookup TS"));
return;
}
ts_desc->UpdateHeartbeatTime();
ts_desc->set_num_live_replicas(req->num_live_tablets());
if (req->has_tablet_report()) {
s = server_->catalog_manager()->ProcessTabletReport(
ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), rpc);
if (!s.ok()) {
rpc->RespondFailure(s.CloneAndPrepend("Failed to process tablet report"));
return;
}
}
if (!ts_desc->has_tablet_report()) {
resp->set_needs_full_tablet_report(true);
}
rpc->RespondSuccess();
}
void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* req,
GetTabletLocationsResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
}
TSRegistrationPB reg;
vector<TSDescriptor*> locs;
for (const string& tablet_id : req->tablet_ids()) {
// TODO: once we have catalog data. ACL checks would also go here, probably.
TabletLocationsPB* locs_pb = resp->add_tablet_locations();
Status s = server_->catalog_manager()->GetTabletLocations(tablet_id, locs_pb);
if (!s.ok()) {
resp->mutable_tablet_locations()->RemoveLast();
GetTabletLocationsResponsePB::Error* err = resp->add_errors();
err->set_tablet_id(tablet_id);
StatusToPB(s, err->mutable_status());
}
}
rpc->RespondSuccess();
}
void MasterServiceImpl::CreateTable(const CreateTableRequestPB* req,
CreateTableResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->CreateTable(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
IsCreateTableDoneResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->IsCreateTableDone(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req,
DeleteTableResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->DeleteTable(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req,
AlterTableResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->AlterTable(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
IsAlterTableDoneResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->IsAlterTableDone(req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
ListTablesResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->ListTables(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
GetTableLocationsResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
}
Status s = server_->catalog_manager()->GetTableLocations(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
GetTableSchemaResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->catalog_manager()->GetTableSchema(req, resp);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
ListTabletServersResponsePB* resp,
rpc::RpcContext* rpc) {
if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
vector<std::shared_ptr<TSDescriptor> > descs;
server_->ts_manager()->GetAllDescriptors(&descs);
for (const std::shared_ptr<TSDescriptor>& desc : descs) {
ListTabletServersResponsePB::Entry* entry = resp->add_servers();
desc->GetNodeInstancePB(entry->mutable_instance_id());
desc->GetRegistration(entry->mutable_registration());
entry->set_millis_since_heartbeat(desc->TimeSinceHeartbeat().ToMilliseconds());
}
rpc->RespondSuccess();
}
void MasterServiceImpl::ListMasters(const ListMastersRequestPB* req,
ListMastersResponsePB* resp,
rpc::RpcContext* rpc) {
vector<ServerEntryPB> masters;
Status s = server_->ListMasters(&masters);
if (!s.ok()) {
StatusToPB(s, resp->mutable_error());
resp->mutable_error()->set_code(AppStatusPB::UNKNOWN_ERROR);
} else {
for (const ServerEntryPB& master : masters) {
resp->add_masters()->CopyFrom(master);
}
}
rpc->RespondSuccess();
}
void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequestPB* req,
GetMasterRegistrationResponsePB* resp,
rpc::RpcContext* rpc) {
// instance_id must always be set in order for status pages to be useful.
resp->mutable_instance_id()->CopyFrom(server_->instance_pb());
if (!CheckCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
return;
}
Status s = server_->GetMasterRegistration(resp->mutable_registration());
CheckRespErrorOrSetUnknown(s, resp);
resp->set_role(server_->catalog_manager()->Role());
rpc->RespondSuccess();
}
} // namespace master
} // namespace kudu