blob: d6267b500c8201313f83638c5785c7f889f38962 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tools/master_rebuilder.h"
#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master_options.h"
#include "kudu/master/sys_catalog.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/cow_object.h"
#include "kudu/util/monotime.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
DECLARE_int32(default_num_replicas);
DEFINE_uint32(default_schema_version, 0, "The table schema version assigned to tables if one "
"cannot be determined automatically. When the tablet server version is < 1.16, a "
"viable value can be determined manually using "
"'kudu pbc dump tablet-meta/<tablet-id>' on each server and taking the max value "
"found across all tablets. In tablet server versions >= 1.16, Kudu will determine "
"the proper value for each tablet automatically.");
using kudu::master::Master;
using kudu::master::MasterOptions;
using kudu::master::SysCatalogTable;
using kudu::master::SysTablesEntryPB;
using kudu::master::SysTabletsEntryPB;
using kudu::master::TableInfo;
using kudu::master::TableMetadataLock;
using kudu::master::TabletInfo;
using kudu::master::TabletMetadataGroupLock;
using kudu::master::TabletMetadataLock;
using kudu::tserver::ListTabletsResponsePB;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tools {
namespace {
Status NoOpCb() {
return Status::OK();
}
} // anonymous namespace
MasterRebuilder::MasterRebuilder(vector<string> tserver_addrs)
: state_(State::NOT_DONE),
tserver_addrs_(std::move(tserver_addrs)) {
}
const RebuildReport& MasterRebuilder::GetRebuildReport() const {
CHECK_EQ(State::DONE, state_);
return rebuild_report_;
}
Status MasterRebuilder::RebuildMaster() {
CHECK_EQ(State::NOT_DONE, state_);
int bad_tservers = 0;
for (const auto& tserver_addr : tserver_addrs_) {
std::unique_ptr<tserver::TabletServerServiceProxy> proxy;
vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
Status s = BuildProxy(tserver_addr, tserver::TabletServer::kDefaultPort, &proxy).AndThen([&]() {
return GetReplicas(proxy.get(), &replicas);
});
rebuild_report_.tservers.emplace_back(tserver_addr, s);
if (!s.ok()) {
LOG(WARNING) << Substitute("Failed to gather metadata from tablet server $0: $1",
tserver_addr, s.ToString());
bad_tservers++;
continue;
}
for (const auto& replica : replicas) {
const auto& tablet_status_pb = replica.tablet_status();
const auto& state_pb = tablet_status_pb.state();
const auto& state_str = TabletStatePB_Name(state_pb);
const auto& tablet_id = tablet_status_pb.tablet_id();
const auto& table_name = tablet_status_pb.table_name();
switch (state_pb) {
case tablet::STOPPING:
case tablet::STOPPED:
case tablet::SHUTDOWN:
case tablet::FAILED:
LOG(INFO) << Substitute("Skipping replica of tablet $0 of table $1 on tablet "
"server $2 in state $3", tablet_id, table_name, tserver_addr,
state_str);
continue;
default:
break;
}
Status s = CheckTableAndTabletConsistency(replica);
if (!s.ok()) {
LOG(WARNING) << Substitute("Failed to process metadata for replica of tablet $0 "
"of table $1 on tablet server $2 in state $3: $4",
tablet_id, table_name, tserver_addr, state_str, s.ToString());
}
InsertOrDieNoPrint(&rebuild_report_.replicas,
std::make_tuple(table_name, tablet_id, tserver_addr), s);
}
}
// Check how many tablet servers we got metadata from. We can still continue
// as long as one reported. If not all tablet servers returned info, our
// reconstructed syscatalog might be missing tables and tablets, or it might
// have everything.
if (bad_tservers > 0) {
LOG(WARNING) << Substitute("Failed to gather metadata from all tablet servers: "
"$0 of $1 tablet server(s) had errors",
bad_tservers, tserver_addrs_.size());
}
if (bad_tservers == tserver_addrs_.size()) {
return Status::ServiceUnavailable("unable to gather any tablet server metadata");
}
// Now that we've assembled all the metadata, we can write to a syscatalog table.
RETURN_NOT_OK(WriteSysCatalog());
state_ = State::DONE;
return Status::OK();
}
Status MasterRebuilder::CheckTableAndTabletConsistency(
const ListTabletsResponsePB::StatusAndSchemaPB& replica) {
const string& table_name = replica.tablet_status().table_name();
const string& tablet_id = replica.tablet_status().tablet_id();
if (!ContainsKey(tables_by_name_, table_name)) {
CreateTable(replica);
} else {
RETURN_NOT_OK(CheckTableConsistency(replica));
}
if (!ContainsKey(tablets_by_id_, tablet_id)) {
CreateTablet(replica);
}
return CheckTabletConsistency(replica);
}
void MasterRebuilder::CreateTable(const ListTabletsResponsePB::StatusAndSchemaPB& replica) {
scoped_refptr<TableInfo> table(new TableInfo(oid_generator_.Next()));
table->mutable_metadata()->StartMutation();
SysTablesEntryPB* metadata = &table->mutable_metadata()->mutable_dirty()->pb;
const string& table_name = replica.tablet_status().table_name();
metadata->set_name(table_name);
if (!replica.has_schema_version()) {
metadata->set_version(FLAGS_default_schema_version);
} else {
metadata->set_version(replica.schema_version());
}
// We can't tell the replication factor from ListTablets.
// We'll guess the default replication factor because it's safe and almost
// always correct.
// TODO(awong): there's probably a better heuristic based on the number of
// replicas reported by the tablet servers.
metadata->set_num_replicas(FLAGS_default_num_replicas);
metadata->mutable_schema()->CopyFrom(replica.schema());
metadata->mutable_partition_schema()->CopyFrom(replica.partition_schema());
int32_t max_column_id = 0;
for (const auto& column : replica.schema().columns()) {
if (column.has_id() && max_column_id < column.id()) {
max_column_id = column.id();
}
}
metadata->set_next_column_id(max_column_id + 1);
metadata->set_state(SysTablesEntryPB::RUNNING);
metadata->set_state_msg("reconstructed by MasterRebuilder");
table->mutable_metadata()->CommitMutation();
InsertOrDie(&tables_by_name_, table_name, table);
}
Status MasterRebuilder::CheckTableConsistency(
const ListTabletsResponsePB::StatusAndSchemaPB& replica) {
const string& tablet_id = replica.tablet_status().tablet_id();
const string& table_name = replica.tablet_status().table_name();
scoped_refptr<TableInfo> table = FindOrDie(tables_by_name_, table_name);
TableMetadataLock l_table(table.get(), LockMode::WRITE);
SysTablesEntryPB* metadata = &table->mutable_metadata()->mutable_dirty()->pb;
// Update next_column_id if needed.
Schema schema_from_replica;
RETURN_NOT_OK(SchemaFromPB(replica.schema(), &schema_from_replica));
if (schema_from_replica.max_col_id() + 1 > metadata->next_column_id()) {
metadata->set_next_column_id(schema_from_replica.max_col_id() + 1);
}
// Update schema_version if needed.
if (replica.has_schema_version() &&
replica.schema_version() > metadata->version()) {
metadata->set_version(replica.schema_version());
}
// Check the schemas match.
Schema schema_from_table;
RETURN_NOT_OK(SchemaFromPB(metadata->schema(), &schema_from_table));
if (schema_from_table != schema_from_replica) {
LOG(WARNING) << Substitute("Schema mismatch for tablet $0 of table $1", tablet_id, table_name);
LOG(WARNING) << Substitute("Table schema: $0", schema_from_table.ToString());
LOG(WARNING) << Substitute("Mismatched schema: $0", schema_from_replica.ToString());
return Status::Corruption("inconsistent replica: schema mismatch");
}
// Check the partition schemas match.
PartitionSchema pschema_from_table;
PartitionSchema pschema_from_replica;
RETURN_NOT_OK(PartitionSchema::FromPB(metadata->partition_schema(),
schema_from_table,
&pschema_from_table));
RETURN_NOT_OK(PartitionSchema::FromPB(replica.partition_schema(),
schema_from_replica,
&pschema_from_replica));
if (pschema_from_table != pschema_from_replica) {
LOG(WARNING) << Substitute("Partition schema mismatch for tablet $0 of table $1",
tablet_id, table_name);
LOG(WARNING) << Substitute("First seen partition schema: $0",
pschema_from_table.DebugString(schema_from_table));
LOG(WARNING) << Substitute("Mismatched partition schema $0",
pschema_from_replica.DebugString(schema_from_replica));
return Status::Corruption("inconsistent replica: partition schema mismatch");
}
return Status::OK();
}
void MasterRebuilder::CreateTablet(const ListTabletsResponsePB::StatusAndSchemaPB& replica) {
const string& table_name = replica.tablet_status().table_name();
const string& tablet_id = replica.tablet_status().tablet_id();
scoped_refptr<TableInfo> table = FindOrDie(tables_by_name_, table_name);
scoped_refptr<TabletInfo> tablet(new TabletInfo(table, tablet_id));
tablet->mutable_metadata()->StartMutation();
SysTabletsEntryPB* metadata = &tablet->mutable_metadata()->mutable_dirty()->pb;
metadata->mutable_partition()->CopyFrom(replica.tablet_status().partition());
// Setting the term to the minimum and an invalid opid ensures that, when a
// master loads the reconstructed syscatalog table and receives tablet
// reports, it will adopt the consensus state from the reports.
metadata->mutable_consensus_state()->set_current_term(consensus::kMinimumTerm);
metadata->mutable_consensus_state()->mutable_committed_config()
->set_opid_index(consensus::kInvalidOpIdIndex);
metadata->set_state(SysTabletsEntryPB::RUNNING);
metadata->set_state_msg("reconstructed by MasterRebuilder");
metadata->set_table_id(table->id());
tablet->mutable_metadata()->CommitMutation();
InsertOrDie(&tablets_by_id_, tablet_id, tablet);
TableMetadataLock l_table(table.get(), LockMode::WRITE);
TabletMetadataLock l_tablet(tablet.get(), LockMode::READ);
table->AddRemoveTablets({ tablet }, {});
}
Status MasterRebuilder::CheckTabletConsistency(
const ListTabletsResponsePB::StatusAndSchemaPB& replica) {
const string& tablet_id = replica.tablet_status().tablet_id();
const string& table_name = replica.tablet_status().table_name();
scoped_refptr<TabletInfo> tablet = FindOrDie(tablets_by_id_, tablet_id);
TabletMetadataLock l_tablet(tablet.get(), LockMode::READ);
const SysTabletsEntryPB& metadata = tablet->metadata().state().pb;
// Check the partitions match.
// We do not check the schemas and partition schemas match because they are
// already checked against the table's.
Partition partition_from_tablet;
Partition partition_from_replica;
Partition::FromPB(metadata.partition(), &partition_from_tablet);
Partition::FromPB(replica.tablet_status().partition(), &partition_from_replica);
if (partition_from_tablet != partition_from_replica) {
LOG(WARNING) << Substitute("Partition mismatch for tablet $0 of table $1",
tablet_id, table_name);
LOG(WARNING) << Substitute("First seen partition: $0",
metadata.partition().DebugString());
LOG(WARNING) << Substitute("Mismatched partition $0",
replica.tablet_status().partition().DebugString());;
return Status::Corruption("inconsistent replica: partition mismatch");
}
return Status::OK();
}
Status MasterRebuilder::WriteSysCatalog() {
// Start up the master and syscatalog.
MasterOptions opts;
Master master(opts);
RETURN_NOT_OK(master.Init());
SysCatalogTable sys_catalog(&master, &NoOpCb);
SCOPED_CLEANUP({
sys_catalog.Shutdown();
master.Shutdown();
});
// TODO(awong): we could be smarter about cleaning up on failure, either by
// initially placing data into some temp directories and moving at the end,
// or by deleting all blocks, WALs, and metadata on error.
Status s = sys_catalog.CreateNew(master.fs_manager());
if (s.IsAlreadyPresent()) {
// Adjust the error message to be clearer when running on non-empty dirs.
s = s.CloneAndPrepend("the specified fs directories must be empty");
}
RETURN_NOT_OK(s);
// Table-by-table, organize the metadata and write it to the syscatalog.
vector<scoped_refptr<TabletInfo>> tablets;
const auto kLeaderTimeout = MonoDelta::FromSeconds(10);
RETURN_NOT_OK(sys_catalog.tablet_replica()->consensus()->WaitUntilLeader(kLeaderTimeout));
for (const auto& table_entry : tables_by_name_) {
const auto& table = table_entry.second;
table->GetAllTablets(&tablets);
TableMetadataLock l_table(table.get(), LockMode::WRITE);
TabletMetadataGroupLock l_tablets(LockMode::RELEASED);
l_tablets.AddMutableInfos(tablets);
l_tablets.Lock(LockMode::WRITE);
SysCatalogTable::Actions actions;
actions.table_to_add = table;
actions.tablets_to_add = tablets;
RETURN_NOT_OK_PREPEND(sys_catalog.Write(actions),
Substitute("unable to write metadata for table $0 to sys_catalog",
table_entry.first));
}
return Status::OK();
}
} // namespace tools
} // namespace kudu