blob: 7d5803857f5e63b48a2c4bd829a306a3fe310b11 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// The catalog manager handles the current list of tables
// and tablets in the cluster, as well as their current locations.
// Since most operations in the master go through these data
// structures, locking is carefully managed here to prevent unnecessary
// contention and deadlocks:
//
// - each structure has an internal spinlock used for operations that
// are purely in-memory (eg the current status of replicas)
// - data that is persisted on disk is stored in separate PersistentTable(t)Info
// structs. These are managed using copy-on-write so that writers may block
// writing them back to disk while not impacting concurrent readers.
//
// Usage rules:
// - You may obtain READ locks in any order. READ locks should never block,
// since they only conflict with COMMIT which is a purely in-memory operation.
// Thus they are deadlock-free.
// - If you need a WRITE lock on both a table and one or more of its tablets,
// acquire the lock on the table first, and acquire the locks on the tablets
// in tablet ID order, or let ScopedTabletInfoCommitter do the locking. This
// strict ordering prevents deadlocks. Along the same lines, COMMIT must
// happen in reverse (i.e. the tablet lock must be committed before the table
// lock). The only exceptions to this are when there's only one thread in
// operation, such as during master failover.
#include "kudu/master/catalog_manager.h"
#include <algorithm>
#include <boost/optional.hpp>
#include <condition_variable>
#include <glog/logging.h>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "kudu/cfile/type_encodings.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/gutil/walltime.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/sys_catalog.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/thread.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/trace.h"
DEFINE_int32(master_ts_rpc_timeout_ms, 30 * 1000, // 30 sec
"Timeout used for the master->TS async rpc calls.");
TAG_FLAG(master_ts_rpc_timeout_ms, advanced);
DEFINE_int32(tablet_creation_timeout_ms, 30 * 1000, // 30 sec
"Timeout used by the master when attempting to create tablet "
"replicas during table creation.");
TAG_FLAG(tablet_creation_timeout_ms, advanced);
DEFINE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader, true,
"Whether the catalog manager should wait for a newly created tablet to "
"elect a leader before considering it successfully created. "
"This is disabled in some tests where we explicitly manage leader "
"election.");
TAG_FLAG(catalog_manager_wait_for_new_tablets_to_elect_leader, hidden);
DEFINE_int32(unresponsive_ts_rpc_timeout_ms, 60 * 60 * 1000, // 1 hour
"After this amount of time, the master will stop attempting to contact "
"a tablet server in order to perform operations such as deleting a tablet.");
TAG_FLAG(unresponsive_ts_rpc_timeout_ms, advanced);
DEFINE_int32(default_num_replicas, 3,
"Default number of replicas for tables that do not have the num_replicas set.");
TAG_FLAG(default_num_replicas, advanced);
DEFINE_int32(catalog_manager_bg_task_wait_ms, 1000,
"Amount of time the catalog manager background task thread waits "
"between runs");
TAG_FLAG(catalog_manager_bg_task_wait_ms, hidden);
DEFINE_int32(max_create_tablets_per_ts, 20,
"The number of tablets per TS that can be requested for a new table.");
TAG_FLAG(max_create_tablets_per_ts, advanced);
DEFINE_int32(master_failover_catchup_timeout_ms, 30 * 1000, // 30 sec
"Amount of time to give a newly-elected leader master to load"
" the previous master's metadata and become active. If this time"
" is exceeded, the node crashes.");
TAG_FLAG(master_failover_catchup_timeout_ms, advanced);
TAG_FLAG(master_failover_catchup_timeout_ms, experimental);
DEFINE_bool(master_tombstone_evicted_tablet_replicas, true,
"Whether the master should tombstone (delete) tablet replicas that "
"are no longer part of the latest reported raft config.");
TAG_FLAG(master_tombstone_evicted_tablet_replicas, hidden);
DEFINE_bool(master_add_server_when_underreplicated, true,
"Whether the master should attempt to add a new server to a tablet "
"config when it detects that the tablet is under-replicated.");
TAG_FLAG(master_add_server_when_underreplicated, hidden);
DEFINE_bool(catalog_manager_check_ts_count_for_create_table, true,
"Whether the master should ensure that there are enough live tablet "
"servers to satisfy the provided replication count before allowing "
"a table to be created.");
TAG_FLAG(catalog_manager_check_ts_count_for_create_table, hidden);
DEFINE_int32(table_locations_ttl_ms, 60 * 60 * 1000, // 1 hour
"Maximum time in milliseconds which clients may cache table locations. "
"New range partitions may not be visible to existing client instances "
"until after waiting for the ttl period.");
TAG_FLAG(table_locations_ttl_ms, advanced);
DEFINE_bool(catalog_manager_fail_ts_rpcs, false,
"Whether all master->TS async calls should fail. Only for testing!");
TAG_FLAG(catalog_manager_fail_ts_rpcs, hidden);
TAG_FLAG(catalog_manager_fail_ts_rpcs, runtime);
DEFINE_bool(catalog_manager_delete_orphaned_tablets, false,
"Whether the master should delete tablets reported by tablet "
"servers for which there are no corresponding records in the "
"master's metadata. Use this option with care; it may cause "
"permanent tablet data loss under specific (and rare) cases of "
"master failures!");
TAG_FLAG(catalog_manager_delete_orphaned_tablets, advanced);
using std::pair;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace master {
using base::subtle::NoBarrier_Load;
using base::subtle::NoBarrier_CompareAndSwap;
using cfile::TypeEncodingInfo;
using consensus::kMinimumTerm;
using consensus::CONSENSUS_CONFIG_COMMITTED;
using consensus::Consensus;
using consensus::ConsensusServiceProxy;
using consensus::ConsensusStatePB;
using consensus::GetConsensusRole;
using consensus::OpId;
using consensus::RaftPeerPB;
using consensus::StartRemoteBootstrapRequestPB;
using rpc::RpcContext;
using strings::Substitute;
using tablet::TABLET_DATA_DELETED;
using tablet::TABLET_DATA_TOMBSTONED;
using tablet::TabletDataState;
using tablet::TabletPeer;
using tablet::TabletStatePB;
using tserver::TabletServerErrorPB;
////////////////////////////////////////////////////////////
// Table Loader
////////////////////////////////////////////////////////////
class TableLoader : public TableVisitor {
public:
explicit TableLoader(CatalogManager *catalog_manager)
: catalog_manager_(catalog_manager) {
}
virtual Status VisitTable(const std::string& table_id,
const SysTablesEntryPB& metadata) OVERRIDE {
CHECK(!ContainsKey(catalog_manager_->table_ids_map_, table_id))
<< "Table already exists: " << table_id;
// Set up the table info.
TableInfo *table = new TableInfo(table_id);
TableMetadataLock l(table, TableMetadataLock::WRITE);
l.mutable_data()->pb.CopyFrom(metadata);
// Add the tablet to the IDs map and to the name map (if the table is not deleted).
catalog_manager_->table_ids_map_[table->id()] = table;
if (!l.data().is_deleted()) {
catalog_manager_->table_names_map_[l.data().name()] = table;
}
l.Commit();
LOG(INFO) << "Loaded metadata for table " << table->ToString();
VLOG(1) << "Metadata for table " << table->ToString() << ": " << metadata.ShortDebugString();
return Status::OK();
}
private:
CatalogManager *catalog_manager_;
DISALLOW_COPY_AND_ASSIGN(TableLoader);
};
////////////////////////////////////////////////////////////
// Tablet Loader
////////////////////////////////////////////////////////////
class TabletLoader : public TabletVisitor {
public:
explicit TabletLoader(CatalogManager *catalog_manager)
: catalog_manager_(catalog_manager) {
}
virtual Status VisitTablet(const std::string& table_id,
const std::string& tablet_id,
const SysTabletsEntryPB& metadata) OVERRIDE {
// Lookup the table.
scoped_refptr<TableInfo> table(FindPtrOrNull(
catalog_manager_->table_ids_map_, table_id));
if (table == nullptr) {
// Tables and tablets are always created/deleted in one operation, so
// this shouldn't be possible.
LOG(ERROR) << "Missing Table " << table_id << " required by tablet " << tablet_id;
LOG(ERROR) << "Metadata: " << metadata.DebugString();
return Status::Corruption("Missing table for tablet: ", tablet_id);
}
// Set up the tablet info.
TabletInfo* tablet = new TabletInfo(table, tablet_id);
TabletMetadataLock l(tablet, TabletMetadataLock::WRITE);
l.mutable_data()->pb.CopyFrom(metadata);
// Add the tablet to the tablet manager.
catalog_manager_->tablet_map_[tablet->tablet_id()] = tablet;
// Add the tablet to the Tablet.
bool is_deleted = l.mutable_data()->is_deleted();
l.Commit();
if (!is_deleted) {
table->AddTablet(tablet);
}
LOG(INFO) << "Loaded metadata for tablet " << tablet_id
<< " (table " << table->ToString() << ")";
VLOG(2) << "Metadata for tablet " << tablet_id << ": " << metadata.ShortDebugString();
return Status::OK();
}
private:
CatalogManager *catalog_manager_;
DISALLOW_COPY_AND_ASSIGN(TabletLoader);
};
////////////////////////////////////////////////////////////
// Background Tasks
////////////////////////////////////////////////////////////
class CatalogManagerBgTasks {
public:
explicit CatalogManagerBgTasks(CatalogManager *catalog_manager)
: closing_(false),
pending_updates_(false),
cond_(&lock_),
thread_(nullptr),
catalog_manager_(catalog_manager) {
}
~CatalogManagerBgTasks() {}
Status Init();
void Shutdown();
void Wake() {
MutexLock lock(lock_);
pending_updates_ = true;
cond_.Broadcast();
}
void Wait(int msec) {
MutexLock lock(lock_);
if (closing_) return;
if (!pending_updates_) {
cond_.TimedWait(MonoDelta::FromMilliseconds(msec));
}
pending_updates_ = false;
}
void WakeIfHasPendingUpdates() {
MutexLock lock(lock_);
if (pending_updates_) {
cond_.Broadcast();
}
}
private:
void Run();
private:
Atomic32 closing_;
bool pending_updates_;
mutable Mutex lock_;
ConditionVariable cond_;
scoped_refptr<kudu::Thread> thread_;
CatalogManager *catalog_manager_;
};
Status CatalogManagerBgTasks::Init() {
RETURN_NOT_OK(kudu::Thread::Create("catalog manager", "bgtasks",
&CatalogManagerBgTasks::Run, this, &thread_));
return Status::OK();
}
void CatalogManagerBgTasks::Shutdown() {
if (Acquire_CompareAndSwap(&closing_, false, true) != false) {
VLOG(2) << "CatalogManagerBgTasks already shut down";
return;
}
Wake();
if (thread_ != nullptr) {
CHECK_OK(ThreadJoiner(thread_.get()).Join());
}
}
void CatalogManagerBgTasks::Run() {
while (!NoBarrier_Load(&closing_)) {
{
CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
if (!l.catalog_status().ok()) {
LOG(WARNING) << "Catalog manager background task thread going to sleep: "
<< l.catalog_status().ToString();
} else if (l.leader_status().ok()) {
std::vector<scoped_refptr<TabletInfo>> to_process;
// Get list of tablets not yet running.
catalog_manager_->ExtractTabletsToProcess(&to_process);
if (!to_process.empty()) {
// Transition tablet assignment state from preparing to creating, send
// and schedule creation / deletion RPC messages, etc.
Status s = catalog_manager_->ProcessPendingAssignments(to_process);
if (!s.ok()) {
// If there is an error (e.g., we are not the leader) abort this task
// and wait until we're woken up again.
//
// TODO Add tests for this in the revision that makes
// create/alter fault tolerant.
LOG(ERROR) << "Error processing pending assignments, aborting the current task: "
<< s.ToString();
}
}
}
}
// Wait for a notification or a timeout expiration.
// - CreateTable will call Wake() to notify about the tablets to add
// - HandleReportedTablet/ProcessPendingAssignments will call WakeIfHasPendingUpdates()
// to notify about tablets creation.
Wait(FLAGS_catalog_manager_bg_task_wait_ms);
}
VLOG(1) << "Catalog manager background task thread shutting down";
}
////////////////////////////////////////////////////////////
// CatalogManager
////////////////////////////////////////////////////////////
namespace {
// Tracks, or aborts commits TabletInfo mutations.
//
// Can be used in one of two ways:
// 1. To track already-locked TabletInfos and commit them on end of scope:
// {
// ScopedTabletInfoCommitter c(ScopedTabletInfoCommitter::LOCKED);
// c.addTablets({ one, two, three });
// <Perform mutations>
// } // Mutations are committed
//
// 2. To aggregate unlocked TabletInfos, lock them safely, and commit them on end of scope:
// {
// ScopedTabletInfoCommitter c(ScopedTabletInfoCommitter::UNLOCKED);
// c.addTablets({ five, two, three });
// c.addTablets({ four, one });
// c.LockTabletsForWriting();
// <Perform mutations>
// } // Mutations are committed
//
// The acquisition or release of multiple tablet locks is done in tablet ID
// order, as required by the locking rules (see the top of the file).
class ScopedTabletInfoCommitter {
private:
// Compares TabletInfos using their underlying tablet IDs.
struct TabletInfoCompare {
bool operator() (const scoped_refptr<TabletInfo>& left,
const scoped_refptr<TabletInfo>& right) const {
return left->tablet_id() < right->tablet_id();
}
};
// Must be defined before begin()/end() below.
typedef set<scoped_refptr<TabletInfo>, TabletInfoCompare> TabletSet;
public:
// Whether tablets added to this committer have been locked already or
// should be locked by the committer itself.
enum State {
LOCKED,
UNLOCKED,
};
explicit ScopedTabletInfoCommitter(State state)
: state_(state),
aborted_(false) {
}
// Acquire write locks for all of the tablets previously added.
void LockTabletsForWriting() {
DCHECK_EQ(UNLOCKED, state_);
for (const auto& t : tablets_) {
t->mutable_metadata()->StartMutation();
}
state_ = LOCKED;
}
// Release all write locks, discarding any mutated tablet data.
void Abort() {
DCHECK(!aborted_);
if (state_ == LOCKED) {
for (const auto & t : tablets_) {
t->mutable_metadata()->AbortMutation();
}
}
aborted_ = true;
}
~ScopedTabletInfoCommitter() {
Commit();
}
// Release all write locks, committing any mutated tablet data.
void Commit() {
if (PREDICT_TRUE(!aborted_ && state_ == LOCKED)) {
for (const auto& t : tablets_) {
t->mutable_metadata()->CommitMutation();
}
state_ = UNLOCKED;
}
}
// Add new tablets to be tracked.
void AddTablets(const vector<scoped_refptr<TabletInfo>>& new_tablets) {
DCHECK(!aborted_);
tablets_.insert(new_tablets.begin(), new_tablets.end());
}
// These methods allow the class to be used in range-based for loops.
const TabletSet::iterator begin() const {
return tablets_.begin();
}
const TabletSet::iterator end() const {
return tablets_.end();
}
private:
TabletSet tablets_;
State state_;
bool aborted_;
};
string RequestorString(RpcContext* rpc) {
if (rpc) {
return rpc->requestor_string();
} else {
return "internal request";
}
}
// If 's' indicates that the node is no longer the leader, setup
// Service::UnavailableError as the error, set NOT_THE_LEADER as the
// error code and return true.
template<class RespClass>
void CheckIfNoLongerLeaderAndSetupError(Status s, RespClass* resp) {
// TODO (KUDU-591): This is a bit of a hack, as right now
// there's no way to propagate why a write to a consensus configuration has
// failed. However, since we use Status::IllegalState()/IsAborted() to
// indicate the situation where a write was issued on a node
// that is no longer the leader, this suffices until we
// distinguish this cause of write failure more explicitly.
if (s.IsIllegalState() || s.IsAborted()) {
Status new_status = Status::ServiceUnavailable(
"operation requested can only be executed on a leader master, but this"
" master is no longer the leader", s.ToString());
SetupError(resp->mutable_error(), MasterErrorPB::NOT_THE_LEADER, new_status);
}
}
template<class RespClass>
Status CheckIfTableDeletedOrNotRunning(TableMetadataLock* lock, RespClass* resp) {
if (lock->data().is_deleted()) {
Status s = Status::NotFound("The table was deleted", lock->data().pb.state_msg());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
if (!lock->data().is_running()) {
Status s = Status::ServiceUnavailable("The table is not running");
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
return Status::OK();
}
} // anonymous namespace
CatalogManager::CatalogManager(Master *master)
: master_(master),
rng_(GetRandomSeed32()),
state_(kConstructed),
leader_ready_term_(-1),
leader_lock_(RWMutex::Priority::PREFER_WRITING) {
CHECK_OK(ThreadPoolBuilder("leader-initialization")
// Presently, this thread pool must contain only a single thread
// (to correctly serialize invocations of ElectedAsLeaderCb upon
// closely timed consecutive elections).
.set_max_threads(1)
.Build(&leader_election_pool_));
}
CatalogManager::~CatalogManager() {
Shutdown();
}
Status CatalogManager::Init(bool is_first_run) {
{
std::lock_guard<simple_spinlock> l(state_lock_);
CHECK_EQ(kConstructed, state_);
state_ = kStarting;
}
RETURN_NOT_OK_PREPEND(InitSysCatalogAsync(is_first_run),
"Failed to initialize sys tables async");
// WaitUntilRunning() must run outside of the lock as to prevent
// deadlock. This is safe as WaitUntilRunning waits for another
// thread to finish its work and doesn't itself depend on any state
// within CatalogManager.
RETURN_NOT_OK_PREPEND(sys_catalog_->WaitUntilRunning(),
"Failed waiting for the catalog tablet to run");
std::lock_guard<LockType> l(lock_);
background_tasks_.reset(new CatalogManagerBgTasks(this));
RETURN_NOT_OK_PREPEND(background_tasks_->Init(),
"Failed to initialize catalog manager background tasks");
{
std::lock_guard<simple_spinlock> l(state_lock_);
CHECK_EQ(kStarting, state_);
state_ = kRunning;
}
return Status::OK();
}
Status CatalogManager::ElectedAsLeaderCb() {
return leader_election_pool_->SubmitClosure(
Bind(&CatalogManager::VisitTablesAndTabletsTask, Unretained(this)));
}
Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
string uuid = master_->fs_manager()->uuid();
Consensus* consensus = sys_catalog_->tablet_peer()->consensus();
ConsensusStatePB cstate = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED);
if (!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid) {
return Status::IllegalState(
Substitute("Node $0 not leader. Consensus state: $1",
uuid, cstate.ShortDebugString()));
}
// Wait for all transactions to be committed.
RETURN_NOT_OK(sys_catalog_->tablet_peer()->transaction_tracker()->WaitForAllToFinish(timeout));
return Status::OK();
}
void CatalogManager::VisitTablesAndTabletsTask() {
Consensus* consensus = sys_catalog_->tablet_peer()->consensus();
int64_t term = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
{
std::lock_guard<simple_spinlock> l(state_lock_);
if (leader_ready_term_ == term) {
// The term hasn't changed since the last time this master was the
// leader. It's not possible for another master to be leader for the same
// term, so there hasn't been any actual leadership change and thus
// there's no reason to reload the on-disk metadata.
VLOG(2) << Substitute("Term $0 hasn't changed, ignoring dirty callback",
term);
return;
}
}
Status s = WaitUntilCaughtUpAsLeader(
MonoDelta::FromMilliseconds(FLAGS_master_failover_catchup_timeout_ms));
if (!s.ok()) {
WARN_NOT_OK(s, "Failed waiting for node to catch up after master election");
// TODO: Abdicate on timeout instead of crashing.
if (s.IsTimedOut()) {
LOG(FATAL) << "Shutting down due to unavailability of other masters after"
<< " election. TODO: Abdicate instead.";
}
return;
}
int64_t term_after_wait = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED).current_term();
if (term_after_wait != term) {
// If we got elected leader again while waiting to catch up then we will
// get another callback to visit the tables and tablets, so bail.
LOG(INFO) << "Term change from " << term << " to " << term_after_wait
<< " while waiting for master leader catchup. Not loading sys catalog metadata";
return;
}
LOG(INFO) << "Loading table and tablet metadata into memory...";
LOG_SLOW_EXECUTION(WARNING, 1000, LogPrefix() + "Loading metadata into memory") {
CHECK_OK(VisitTablesAndTablets());
}
std::lock_guard<simple_spinlock> l(state_lock_);
leader_ready_term_ = term;
}
Status CatalogManager::VisitTablesAndTablets() {
// Block new catalog operations, and wait for existing operations to finish.
std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
// This lock is held for the entirety of the function because the calls to
// VisitTables and VisitTablets mutate global maps.
std::lock_guard<LockType> lock(lock_);
// Abort any outstanding tasks. All TableInfos are orphaned below, so
// it's important to end their tasks now; otherwise Shutdown() will
// destroy master state used by these tasks.
vector<scoped_refptr<TableInfo>> tables;
AppendValuesFromMap(table_ids_map_, &tables);
AbortAndWaitForAllTasks(tables);
// Clear the existing state.
table_names_map_.clear();
table_ids_map_.clear();
tablet_map_.clear();
// Visit tables and tablets, load them into memory.
TableLoader table_loader(this);
RETURN_NOT_OK_PREPEND(sys_catalog_->VisitTables(&table_loader),
"Failed while visiting tables in sys catalog");
TabletLoader tablet_loader(this);
RETURN_NOT_OK_PREPEND(sys_catalog_->VisitTablets(&tablet_loader),
"Failed while visiting tablets in sys catalog");
return Status::OK();
}
Status CatalogManager::InitSysCatalogAsync(bool is_first_run) {
std::lock_guard<LockType> l(lock_);
sys_catalog_.reset(new SysCatalogTable(master_,
master_->metric_registry(),
Bind(&CatalogManager::ElectedAsLeaderCb,
Unretained(this))));
if (is_first_run) {
RETURN_NOT_OK(sys_catalog_->CreateNew(master_->fs_manager()));
} else {
RETURN_NOT_OK(sys_catalog_->Load(master_->fs_manager()));
}
return Status::OK();
}
bool CatalogManager::IsInitialized() const {
std::lock_guard<simple_spinlock> l(state_lock_);
return state_ == kRunning;
}
RaftPeerPB::Role CatalogManager::Role() const {
CHECK(IsInitialized());
return sys_catalog_->tablet_peer_->consensus()->role();
}
void CatalogManager::Shutdown() {
{
std::lock_guard<simple_spinlock> l(state_lock_);
if (state_ == kClosing) {
VLOG(2) << "CatalogManager already shut down";
return;
}
state_ = kClosing;
}
// Shutdown the Catalog Manager background thread
if (background_tasks_) {
background_tasks_->Shutdown();
}
// Mark all outstanding table tasks as aborted and wait for them to fail.
//
// There may be an outstanding table visitor thread modifying the table map,
// so we must make a copy of it before we iterate. It's OK if the visitor
// adds more entries to the map even after we finish; it won't start any new
// tasks for those entries.
vector<scoped_refptr<TableInfo>> copy;
{
shared_lock<LockType> l(lock_);
AppendValuesFromMap(table_ids_map_, &copy);
}
AbortAndWaitForAllTasks(copy);
// Wait for any outstanding table visitors to finish.
//
// Must be done before shutting down the catalog, otherwise its tablet peer
// may be destroyed while still in use by a table visitor.
leader_election_pool_->Shutdown();
// Shut down the underlying storage for tables and tablets.
if (sys_catalog_) {
sys_catalog_->Shutdown();
}
}
static void SetupError(MasterErrorPB* error,
MasterErrorPB::Code code,
const Status& s) {
StatusToPB(s, error->mutable_status());
error->set_code(code);
}
Status CatalogManager::CheckOnline() const {
if (PREDICT_FALSE(!IsInitialized())) {
return Status::ServiceUnavailable("CatalogManager is not running");
}
return Status::OK();
}
// Create a new table.
// See README file in this directory for a description of the design.
Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
CreateTableResponsePB* resp,
rpc::RpcContext* rpc) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
Status s;
// Copy the request, so we can fill in some defaults.
CreateTableRequestPB req = *orig_req;
LOG(INFO) << "CreateTable from " << RequestorString(rpc)
<< ":\n" << req.DebugString();
// a. Validate the user request.
Schema client_schema;
RETURN_NOT_OK(SchemaFromPB(req.schema(), &client_schema));
if (client_schema.has_column_ids()) {
s = Status::InvalidArgument("User requests should not have Column IDs");
SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
return s;
}
if (PREDICT_FALSE(client_schema.num_key_columns() <= 0)) {
s = Status::InvalidArgument("Must specify at least one key column");
SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
return s;
}
for (int i = 0; i < client_schema.num_key_columns(); i++) {
if (!IsTypeAllowableInKey(client_schema.column(i).type_info())) {
Status s = Status::InvalidArgument(
"Key column may not have type of BOOL, FLOAT, or DOUBLE");
SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
return s;
}
}
Schema schema = client_schema.CopyWithColumnIds();
// If the client did not set a partition schema in the create table request,
// the default partition schema (no hash bucket components and a range
// partitioned on the primary key columns) will be used.
PartitionSchema partition_schema;
s = PartitionSchema::FromPB(req.partition_schema(), schema, &partition_schema);
if (!s.ok()) {
SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
return s;
}
// Decode split rows.
vector<KuduPartialRow> split_rows;
vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;
RowOperationsPBDecoder decoder(req.mutable_split_rows_range_bounds(),
&client_schema, &schema, nullptr);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(decoder.DecodeOperations(&ops));
for (int i = 0; i < ops.size(); i++) {
const DecodedRowOperation& op = ops[i];
switch (op.type) {
case RowOperationsPB::SPLIT_ROW: {
split_rows.push_back(*op.split_row);
break;
}
case RowOperationsPB::RANGE_LOWER_BOUND: {
i += 1;
if (i >= ops.size() || ops[i].type != RowOperationsPB::RANGE_UPPER_BOUND) {
Status s = Status::InvalidArgument("Missing upper range bound in create table request");
SetupError(resp->mutable_error(), MasterErrorPB::UNKNOWN_ERROR, s);
return s;
}
range_bounds.emplace_back(*op.split_row, *ops[i].split_row);
break;
}
default: return Status::InvalidArgument(
Substitute("Illegal row operation type in create table request: $0", op.type));
}
}
// Create partitions based on specified partition schema and split rows.
vector<Partition> partitions;
RETURN_NOT_OK(partition_schema.CreatePartitions(split_rows, range_bounds, schema, &partitions));
// If they didn't specify a num_replicas, set it based on the default.
if (!req.has_num_replicas()) {
req.set_num_replicas(FLAGS_default_num_replicas);
}
// Verify that the total number of tablets is reasonable, relative to the number
// of live tablet servers.
TSDescriptorVector ts_descs;
master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
int num_live_tservers = ts_descs.size();
int max_tablets = FLAGS_max_create_tablets_per_ts * num_live_tservers;
if (req.num_replicas() > 1 && max_tablets > 0 && partitions.size() > max_tablets) {
s = Status::InvalidArgument(Substitute("The requested number of tablets is over the "
"permitted maximum ($0)", max_tablets));
SetupError(resp->mutable_error(), MasterErrorPB::TOO_MANY_TABLETS, s);
return s;
}
// Verify that the number of replicas isn't larger than the number of live tablet
// servers.
if (FLAGS_catalog_manager_check_ts_count_for_create_table &&
req.num_replicas() > num_live_tservers) {
s = Status::InvalidArgument(Substitute(
"Not enough live tablet servers to create a table with the requested replication "
"factor $0. $1 tablet servers are alive.", req.num_replicas(), num_live_tservers));
SetupError(resp->mutable_error(), MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH, s);
return s;
}
scoped_refptr<TableInfo> table;
{
std::lock_guard<LockType> l(lock_);
TRACE("Acquired catalog manager lock");
// b. Verify that the table does not exist.
table = FindPtrOrNull(table_names_map_, req.name());
if (table != nullptr) {
s = Status::AlreadyPresent(Substitute("Table $0 already exists with id $1",
req.name(), table->id()));
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_ALREADY_PRESENT, s);
return s;
}
// c. Reserve the table name if possible.
if (!InsertIfNotPresent(&reserved_table_names_, req.name())) {
s = Status::ServiceUnavailable(Substitute(
"New table name $0 is already reserved", req.name()));
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
}
// Ensure that we drop the name reservation upon return.
auto cleanup = MakeScopedCleanup([&] () {
std::lock_guard<LockType> l(lock_);
CHECK_EQ(1, reserved_table_names_.erase(req.name()));
});
// d. Create the in-memory representation of the new table and its tablets.
// It's not yet in any global maps; that will happen in step g below.
table = CreateTableInfo(req, schema, partition_schema);
vector<TabletInfo*> tablets;
vector<scoped_refptr<TabletInfo>> tablet_refs;
for (const Partition& partition : partitions) {
PartitionPB partition_pb;
partition.ToPB(&partition_pb);
scoped_refptr<TabletInfo> t = CreateTabletInfo(table.get(), partition_pb);
tablets.push_back(t.get());
tablet_refs.emplace_back(std::move(t));
}
TRACE("Created new table and tablet info");
// NOTE: the table and tablets are already locked for write at this point,
// since the CreateTableInfo/CreateTabletInfo functions leave them in that state.
// They will get committed at the end of this function.
// Sanity check: the tables and tablets should all be in "preparing" state.
CHECK_EQ(SysTablesEntryPB::PREPARING, table->metadata().dirty().pb.state());
for (const TabletInfo *tablet : tablets) {
CHECK_EQ(SysTabletsEntryPB::PREPARING, tablet->metadata().dirty().pb.state());
}
table->mutable_metadata()->mutable_dirty()->pb.set_state(SysTablesEntryPB::RUNNING);
// e. Write table and tablets to sys-catalog.
SysCatalogTable::Actions actions;
actions.table_to_add = table.get();
actions.tablets_to_add = tablets;
s = sys_catalog_->Write(actions);
if (!s.ok()) {
s = s.CloneAndPrepend(Substitute("An error occurred while writing to sys-catalog: $0",
s.ToString()));
LOG(WARNING) << s.ToString();
CheckIfNoLongerLeaderAndSetupError(s, resp);
return s;
}
TRACE("Wrote table and tablets to system table");
// f. Commit the in-memory state.
table->mutable_metadata()->CommitMutation();
for (TabletInfo *tablet : tablets) {
tablet->mutable_metadata()->CommitMutation();
}
table->AddTablets(tablets);
// g. Make the new table and tablets visible in the catalog.
{
std::lock_guard<LockType> l(lock_);
table_ids_map_[table->id()] = table;
table_names_map_[req.name()] = table;
for (const auto& tablet : tablet_refs) {
InsertOrDie(&tablet_map_, tablet->tablet_id(), std::move(tablet));
}
}
TRACE("Inserted table and tablets into CatalogManager maps");
resp->set_table_id(table->id());
VLOG(1) << "Created table " << table->ToString();
background_tasks_->Wake();
return Status::OK();
}
Status CatalogManager::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
IsCreateTableDoneResponsePB* resp) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
scoped_refptr<TableInfo> table;
// 1. Lookup the table and verify if it exists
TRACE("Looking up table");
RETURN_NOT_OK(FindTable(req->table(), &table));
if (table == nullptr) {
Status s = Status::NotFound("The table does not exist", req->table().DebugString());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
TRACE("Locking table");
TableMetadataLock l(table.get(), TableMetadataLock::READ);
RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
// 2. Verify if the create is in-progress
TRACE("Verify if the table creation is in progress for $0", table->ToString());
resp->set_done(!table->IsCreateInProgress());
return Status::OK();
}
TableInfo *CatalogManager::CreateTableInfo(const CreateTableRequestPB& req,
const Schema& schema,
const PartitionSchema& partition_schema) {
DCHECK(schema.has_column_ids());
TableInfo* table = new TableInfo(GenerateId());
table->mutable_metadata()->StartMutation();
SysTablesEntryPB *metadata = &table->mutable_metadata()->mutable_dirty()->pb;
metadata->set_state(SysTablesEntryPB::PREPARING);
metadata->set_name(req.name());
metadata->set_version(0);
metadata->set_next_column_id(ColumnId(schema.max_col_id() + 1));
metadata->set_num_replicas(req.num_replicas());
// Use the Schema object passed in, since it has the column IDs already assigned,
// whereas the user request PB does not.
CHECK_OK(SchemaToPB(schema, metadata->mutable_schema()));
partition_schema.ToPB(metadata->mutable_partition_schema());
return table;
}
scoped_refptr<TabletInfo> CatalogManager::CreateTabletInfo(TableInfo* table,
const PartitionPB& partition) {
scoped_refptr<TabletInfo> tablet(new TabletInfo(table, GenerateId()));
tablet->mutable_metadata()->StartMutation();
SysTabletsEntryPB *metadata = &tablet->mutable_metadata()->mutable_dirty()->pb;
metadata->set_state(SysTabletsEntryPB::PREPARING);
metadata->mutable_partition()->CopyFrom(partition);
metadata->set_table_id(table->id());
return tablet;
}
Status CatalogManager::FindTable(const TableIdentifierPB& table_identifier,
scoped_refptr<TableInfo> *table_info) {
shared_lock<LockType> l(lock_);
if (table_identifier.has_table_id()) {
*table_info = FindPtrOrNull(table_ids_map_, table_identifier.table_id());
} else if (table_identifier.has_table_name()) {
*table_info = FindPtrOrNull(table_names_map_, table_identifier.table_name());
} else {
return Status::InvalidArgument("Missing Table ID or Table Name");
}
return Status::OK();
}
Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
DeleteTableResponsePB* resp,
rpc::RpcContext* rpc) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
LOG(INFO) << "Servicing DeleteTable request from " << RequestorString(rpc)
<< ": " << req->ShortDebugString();
// 1. Look up the table, lock it, and mark it as removed.
TRACE("Looking up table");
scoped_refptr<TableInfo> table;
RETURN_NOT_OK(FindTable(req->table(), &table));
if (table == nullptr) {
Status s = Status::NotFound("The table does not exist", req->table().DebugString());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
TRACE("Locking table");
TableMetadataLock l(table.get(), TableMetadataLock::WRITE);
if (l.data().is_deleted()) {
Status s = Status::NotFound("The table was deleted", l.data().pb.state_msg());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
TRACE("Modifying in-memory table state")
string deletion_msg = "Table deleted at " + LocalTimeAsString();
l.mutable_data()->set_state(SysTablesEntryPB::REMOVED, deletion_msg);
// 2. Look up the tablets, lock them, and mark them as deleted.
{
ScopedTabletInfoCommitter committer(ScopedTabletInfoCommitter::UNLOCKED);
TRACE("Locking tablets");
vector<scoped_refptr<TabletInfo>> tablets;
table->GetAllTablets(&tablets);
committer.AddTablets(tablets);
committer.LockTabletsForWriting();
vector<TabletInfo*> tablets_raw;
for (const auto& t : committer) {
t->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
tablets_raw.push_back(t.get());
}
// 3. Update sys-catalog with the removed table and tablet state.
TRACE("Removing table and tablets from system table");
SysCatalogTable::Actions actions;
actions.table_to_update = table.get();
actions.tablets_to_update = tablets_raw;
Status s = sys_catalog_->Write(actions);
if (!s.ok()) {
s = s.CloneAndPrepend(Substitute("An error occurred while updating sys tables: $0",
s.ToString()));
LOG(WARNING) << s.ToString();
CheckIfNoLongerLeaderAndSetupError(s, resp);
committer.Abort();
return s;
}
// The operation has been written to sys-catalog; now it must succeed.
// 4. Remove the table from the by-name map.
{
TRACE("Removing table from by-name map");
std::lock_guard<LockType> l_map(lock_);
if (table_names_map_.erase(l.data().name()) != 1) {
PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name());
}
}
// 5. Commit the dirty tablet state (on end of scope).
}
// 6. Commit the dirty table state.
TRACE("Committing in-memory state");
l.Commit();
// 7. Abort any extant tasks belonging to the table.
TRACE("Aborting table tasks");
table->AbortTasks();
// 8. Send a DeleteTablet() request to each tablet replica in the table.
SendDeleteTableRequest(table, deletion_msg);
LOG(INFO) << "Successfully deleted table " << table->ToString()
<< " per request from " << RequestorString(rpc);
return Status::OK();
}
Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
vector<AlterTableRequestPB::Step> steps,
Schema* new_schema,
ColumnId* next_col_id) {
const SchemaPB& current_schema_pb = current_pb.schema();
Schema cur_schema;
RETURN_NOT_OK(SchemaFromPB(current_schema_pb, &cur_schema));
SchemaBuilder builder(cur_schema);
if (current_pb.has_next_column_id()) {
builder.set_next_column_id(ColumnId(current_pb.next_column_id()));
}
for (const auto& step : steps) {
switch (step.type()) {
case AlterTableRequestPB::ADD_COLUMN: {
if (!step.has_add_column()) {
return Status::InvalidArgument("ADD_COLUMN missing column info");
}
// Verify that encoding is appropriate for the new column's
// type
ColumnSchemaPB new_col_pb = step.add_column().schema();
if (new_col_pb.has_id()) {
return Status::InvalidArgument("column $0: client should not specify column ID",
new_col_pb.ShortDebugString());
}
ColumnSchema new_col = ColumnSchemaFromPB(new_col_pb);
const TypeEncodingInfo *dummy;
RETURN_NOT_OK(TypeEncodingInfo::Get(new_col.type_info(),
new_col.attributes().encoding,
&dummy));
// can't accept a NOT NULL column without read default
if (!new_col.is_nullable() && !new_col.has_read_default()) {
return Status::InvalidArgument(
Substitute("column `$0`: NOT NULL columns must have a default", new_col.name()));
}
RETURN_NOT_OK(builder.AddColumn(new_col, false));
break;
}
case AlterTableRequestPB::DROP_COLUMN: {
if (!step.has_drop_column()) {
return Status::InvalidArgument("DROP_COLUMN missing column info");
}
if (cur_schema.is_key_column(step.drop_column().name())) {
return Status::InvalidArgument("cannot remove a key column");
}
RETURN_NOT_OK(builder.RemoveColumn(step.drop_column().name()));
break;
}
case AlterTableRequestPB::RENAME_COLUMN: {
if (!step.has_rename_column()) {
return Status::InvalidArgument("RENAME_COLUMN missing column info");
}
// TODO: In theory we can rename a key
if (cur_schema.is_key_column(step.rename_column().old_name())) {
return Status::InvalidArgument("cannot rename a key column");
}
RETURN_NOT_OK(builder.RenameColumn(
step.rename_column().old_name(),
step.rename_column().new_name()));
break;
}
// TODO: EDIT_COLUMN
default: {
return Status::InvalidArgument("Invalid alter schema step type", step.DebugString());
}
}
}
*new_schema = builder.Build();
*next_col_id = builder.next_column_id();
return Status::OK();
}
Status CatalogManager::ApplyAlterPartitioningSteps(
const TableMetadataLock& l,
TableInfo* table,
const Schema& client_schema,
vector<AlterTableRequestPB::Step> steps,
vector<scoped_refptr<TabletInfo>>* tablets_to_add,
vector<scoped_refptr<TabletInfo>>* tablets_to_drop) {
Schema schema;
RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &schema));
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(l.data().pb.partition_schema(), schema, &partition_schema));
map<string, TabletInfo*> existing_tablets = table->tablet_map();
map<string, scoped_refptr<TabletInfo>> new_tablets;
for (const auto& step : steps) {
vector<DecodedRowOperation> ops;
if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
RowOperationsPBDecoder decoder(&step.add_range_partition().range_bounds(),
&client_schema, &schema, nullptr);
RETURN_NOT_OK(decoder.DecodeOperations(&ops));
} else {
CHECK_EQ(step.type(), AlterTableRequestPB::DROP_RANGE_PARTITION);
RowOperationsPBDecoder decoder(&step.drop_range_partition().range_bounds(),
&client_schema, &schema, nullptr);
RETURN_NOT_OK(decoder.DecodeOperations(&ops));
}
if (ops.size() != 2) {
return Status::InvalidArgument("expected two row operations for alter range partition step",
step.ShortDebugString());
}
if (ops[0].type != RowOperationsPB::RANGE_LOWER_BOUND ||
ops[1].type != RowOperationsPB::RANGE_UPPER_BOUND) {
return Status::InvalidArgument(
"expected a lower bound and upper bound row op for alter range partition step",
strings::Substitute("$0, $1", ops[0].ToString(schema), ops[1].ToString(schema)));
}
vector<Partition> partitions;
RETURN_NOT_OK(partition_schema.CreatePartitions({}, {{ *ops[0].split_row, *ops[1].split_row }},
schema, &partitions));
switch (step.type()) {
case AlterTableRequestPB::ADD_RANGE_PARTITION: {
for (const Partition& partition : partitions) {
const string& lower_bound = partition.partition_key_start();
const string& upper_bound = partition.partition_key_end();
// Check that the new tablet doesn't overlap with the existing tablets.
// Iter points at the tablet directly *after* the lower bound (or to
// existing_tablets.end(), if such a tablet does not exist).
auto existing_iter = existing_tablets.upper_bound(lower_bound);
if (existing_iter != existing_tablets.end()) {
TabletMetadataLock metadata(existing_iter->second, TabletMetadataLock::READ);
if (metadata.data().pb.partition().partition_key_start() < upper_bound) {
return Status::NotFound("New partition conflicts with existing partition",
step.ShortDebugString());
}
}
if (existing_iter != existing_tablets.begin()) {
TabletMetadataLock metadata(std::prev(existing_iter)->second, TabletMetadataLock::READ);
if (metadata.data().pb.partition().partition_key_end() > lower_bound) {
return Status::NotFound("New partition conflicts with existing partition",
step.ShortDebugString());
}
}
// Check that the new tablet doesn't overlap with any other new tablets.
auto new_iter = new_tablets.upper_bound(lower_bound);
if (new_iter != new_tablets.end()) {
const auto& metadata = new_iter->second->mutable_metadata()->dirty();
if (metadata.pb.partition().partition_key_start() < upper_bound) {
return Status::NotFound("New partition conflicts with another new partition",
step.ShortDebugString());
}
}
if (new_iter != new_tablets.begin()) {
const auto& metadata = std::prev(new_iter)->second->mutable_metadata()->dirty();
if (metadata.pb.partition().partition_key_end() > lower_bound) {
return Status::NotFound("New partition conflicts with another new partition",
step.ShortDebugString());
}
}
PartitionPB partition_pb;
partition.ToPB(&partition_pb);
new_tablets.emplace(lower_bound, CreateTabletInfo(table, partition_pb));
}
break;
}
case AlterTableRequestPB::DROP_RANGE_PARTITION: {
for (const Partition& partition : partitions) {
const string& lower_bound = partition.partition_key_start();
const string& upper_bound = partition.partition_key_end();
// Iter points to the tablet if it exists, or the next tablet, or the end.
auto existing_iter = existing_tablets.lower_bound(lower_bound);
auto new_iter = new_tablets.lower_bound(lower_bound);
bool found_existing = false;
bool found_new = false;
if (existing_iter != existing_tablets.end()) {
TabletMetadataLock metadata(existing_iter->second, TabletMetadataLock::READ);
const auto& partition = metadata.data().pb.partition();
found_existing = partition.partition_key_start() == lower_bound ||
partition.partition_key_end() == upper_bound;
}
if (new_iter != new_tablets.end()) {
const auto& partition = new_iter->second->mutable_metadata()->dirty().pb.partition();
found_new = partition.partition_key_start() == lower_bound ||
partition.partition_key_end() == upper_bound;
}
DCHECK(!found_existing || !found_new);
if (found_existing) {
tablets_to_drop->emplace_back(existing_iter->second);
existing_tablets.erase(existing_iter);
} else if (found_new) {
new_tablets.erase(new_iter);
} else {
return Status::NotFound("No tablet found for drop partition step",
step.ShortDebugString());
}
}
break;
}
default: {
return Status::InvalidArgument("Unknown alter table partitioning step",
step.ShortDebugString());
}
}
}
for (auto& tablet : new_tablets) {
tablets_to_add->emplace_back(std::move(tablet.second));
}
return Status::OK();
}
Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
AlterTableResponsePB* resp,
rpc::RpcContext* rpc) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
LOG(INFO) << "Servicing AlterTable request from " << RequestorString(rpc)
<< ": " << req->ShortDebugString();
RETURN_NOT_OK(CheckOnline());
// 1. Group the steps into schema altering steps and partition altering steps.
vector<AlterTableRequestPB::Step> alter_schema_steps;
vector<AlterTableRequestPB::Step> alter_partitioning_steps;
for (const auto& step : req->alter_schema_steps()) {
switch (step.type()) {
case AlterTableRequestPB::ADD_COLUMN:
case AlterTableRequestPB::DROP_COLUMN:
case AlterTableRequestPB::RENAME_COLUMN: {
alter_schema_steps.emplace_back(step);
break;
}
case AlterTableRequestPB::ADD_RANGE_PARTITION:
case AlterTableRequestPB::DROP_RANGE_PARTITION: {
alter_partitioning_steps.emplace_back(step);
break;
}
case AlterTableRequestPB::ALTER_COLUMN:
case AlterTableRequestPB::UNKNOWN: {
return Status::InvalidArgument("Invalid alter step type", step.ShortDebugString());
}
}
}
// 2. Lookup the table, verify if it exists, and lock it for modification.
TRACE("Looking up table");
scoped_refptr<TableInfo> table;
RETURN_NOT_OK(FindTable(req->table(), &table));
if (table == nullptr) {
Status s = Status::NotFound("The table does not exist", req->table().DebugString());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
TRACE("Locking table");
TableMetadataLock l(table.get(), TableMetadataLock::WRITE);
if (l.data().is_deleted()) {
Status s = Status::NotFound("The table was deleted", l.data().pb.state_msg());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
// 3. Having locked the table, look it up again, in case we raced with another
// AlterTable() that renamed our table.
{
scoped_refptr<TableInfo> table_again;
CHECK_OK(FindTable(req->table(), &table_again));
if (table_again == nullptr) {
Status s = Status::NotFound("The table does not exist", req->table().DebugString());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
}
string table_name = l.data().name();
// 4. Calculate new schema for the on-disk state, not persisted yet.
Schema new_schema;
ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
if (!alter_schema_steps.empty()) {
TRACE("Apply alter schema");
Status s = ApplyAlterSchemaSteps(l.data().pb, alter_schema_steps, &new_schema, &next_col_id);
if (!s.ok()) {
SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
return s;
}
DCHECK_NE(next_col_id, 0);
DCHECK_EQ(new_schema.find_column_by_id(next_col_id),
static_cast<int>(Schema::kColumnNotFound));
}
// 5. Try to acquire the new table name.
if (req->has_new_table_name()) {
std::lock_guard<LockType> catalog_lock(lock_);
TRACE("Acquired catalog manager lock");
// Verify that the table does not exist.
scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req->new_table_name());
if (other_table != nullptr) {
Status s = Status::AlreadyPresent(Substitute("Table $0 already exists with id $1",
req->new_table_name(), table->id()));
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_ALREADY_PRESENT, s);
return s;
}
// Reserve the new table name if possible.
if (!InsertIfNotPresent(&reserved_table_names_, req->new_table_name())) {
Status s = Status::ServiceUnavailable(Substitute(
"Table name $0 is already reserved", req->new_table_name()));
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
l.mutable_data()->pb.set_name(req->new_table_name());
}
// Ensure that we drop our reservation upon return.
auto cleanup = MakeScopedCleanup([&] () {
if (req->has_new_table_name()) {
std::lock_guard<LockType> l(lock_);
CHECK_EQ(1, reserved_table_names_.erase(req->new_table_name()));
}
});
// 6. Alter table partitioning.
vector<scoped_refptr<TabletInfo>> tablets_to_add;
vector<scoped_refptr<TabletInfo>> tablets_to_drop;
if (!alter_partitioning_steps.empty()) {
TRACE("Apply alter partitioning");
Schema client_schema;
RETURN_NOT_OK(SchemaFromPB(req->schema(), &client_schema));
Status s = ApplyAlterPartitioningSteps(l, table.get(), client_schema, alter_partitioning_steps,
&tablets_to_add, &tablets_to_drop);
if (!s.ok()) {
SetupError(resp->mutable_error(), MasterErrorPB::UNKNOWN_ERROR, s);
return s;
}
}
// Set to true if columns are altered, added or dropped.
bool has_schema_changes = !alter_schema_steps.empty();
// Set to true if there are schema changes, or the table is renamed.
bool has_metadata_changes = has_schema_changes || req->has_new_table_name();
// Set to true if there are partitioning changes.
bool has_partitioning_changes = !alter_partitioning_steps.empty();
// Set to true if metadata changes need to be applied to existing tablets.
bool has_metadata_changes_for_existing_tablets =
has_metadata_changes && table->num_tablets() > tablets_to_drop.size();
// Skip empty requests...
if (!has_metadata_changes && !has_partitioning_changes) {
return Status::OK();
}
// 7. Serialize the schema and increment the version number.
if (has_metadata_changes_for_existing_tablets && !l.data().pb.has_fully_applied_schema()) {
l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
}
if (has_schema_changes) {
CHECK_OK(SchemaToPB(new_schema, l.mutable_data()->pb.mutable_schema()));
}
if (has_metadata_changes) {
l.mutable_data()->pb.set_version(l.mutable_data()->pb.version() + 1);
l.mutable_data()->pb.set_next_column_id(next_col_id);
}
if (!tablets_to_add.empty() || has_metadata_changes_for_existing_tablets) {
// If some tablet schemas need to be updated or there are any new tablets,
// set the table state to ALTERING, so that IsAlterTableDone RPCs will wait
// for the schema updates and tablets to be running.
l.mutable_data()->set_state(SysTablesEntryPB::ALTERING,
Substitute("Alter Table version=$0 ts=$1",
l.mutable_data()->pb.version(),
LocalTimeAsString()));
}
// 8. Update sys-catalog with the new table schema and tablets to add/drop.
TRACE("Updating metadata on disk");
string deletion_msg = "Partition dropped at " + LocalTimeAsString();
SysCatalogTable::Actions actions;
if (!tablets_to_add.empty() || has_metadata_changes) {
// If anything modified the table's persistent metadata, then sync it to the sys catalog.
actions.table_to_update = table.get();
}
for (const auto& tablet : tablets_to_add) {
actions.tablets_to_add.push_back(tablet.get());
}
ScopedTabletInfoCommitter tablets_to_add_committer(ScopedTabletInfoCommitter::LOCKED);
ScopedTabletInfoCommitter tablets_to_drop_committer(ScopedTabletInfoCommitter::UNLOCKED);
tablets_to_add_committer.AddTablets(tablets_to_add);
tablets_to_drop_committer.AddTablets(tablets_to_drop);
tablets_to_drop_committer.LockTabletsForWriting();
for (auto& tablet : tablets_to_drop) {
tablet->mutable_metadata()->mutable_dirty()->set_state(SysTabletsEntryPB::DELETED,
deletion_msg);
actions.tablets_to_update.push_back(tablet.get());
}
Status s = sys_catalog_->Write(actions);
if (!s.ok()) {
s = s.CloneAndPrepend(
Substitute("An error occurred while updating sys-catalog tables entry: $0",
s.ToString()));
LOG(WARNING) << s.ToString();
CheckIfNoLongerLeaderAndSetupError(s, resp);
tablets_to_add_committer.Abort();
tablets_to_drop_committer.Abort();
return s;
}
// 9. Commit the in-memory state.
{
TRACE("Committing alterations to in-memory state");
// Commit new tablet in-memory state. This doesn't require taking the global
// lock since the new tablets are not yet visible, because they haven't been
// added to the table or tablet index.
tablets_to_add_committer.Commit();
// Take the global catalog manager lock in order to modify the global table
// and tablets indices.
std::lock_guard<LockType> lock(lock_);
if (req->has_new_table_name()) {
if (table_names_map_.erase(table_name) != 1) {
PANIC_RPC(rpc, Substitute(
"Could not remove table (name $0) from map", table_name));
}
InsertOrDie(&table_names_map_, req->new_table_name(), table);
}
// Insert new tablets into the global tablet map. After this, the tablets
// will be visible in GetTabletLocations RPCs.
for (const auto& tablet : tablets_to_add) {
InsertOrDie(&tablet_map_, tablet->tablet_id(), std::move(tablet));
}
}
// Add and remove new tablets from the table. This makes the tablets visible
// to GetTableLocations RPCs. This doesn't need to happen under the global
// lock, since:
// * clients can not know the new tablet IDs, so GetTabletLocations RPCs
// are impossible.
// * the new tablets can not heartbeat yet, since they don't get created
// until further down.
table->AddRemoveTablets(tablets_to_add, tablets_to_drop);
// Commit state change for dropped tablets. This comes after removing the
// tablets from their associated tables so that if a GetTableLocations or
// GetTabletLocations returns a deleted tablet, the retry will never include
// the tablet again.
tablets_to_drop_committer.Commit();
if (!tablets_to_add.empty() || has_metadata_changes) {
l.Commit();
} else {
l.Unlock();
}
SendAlterTableRequest(table);
for (const auto& tablet : tablets_to_drop) {
TabletMetadataLock l(tablet.get(), TabletMetadataLock::READ);
SendDeleteTabletRequest(tablet, l, deletion_msg);
}
background_tasks_->Wake();
return Status::OK();
}
Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
IsAlterTableDoneResponsePB* resp,
rpc::RpcContext* rpc) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
scoped_refptr<TableInfo> table;
// 1. Lookup the table and verify if it exists
TRACE("Looking up table");
RETURN_NOT_OK(FindTable(req->table(), &table));
if (table == nullptr) {
Status s = Status::NotFound("The table does not exist", req->table().DebugString());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
TRACE("Locking table");
TableMetadataLock l(table.get(), TableMetadataLock::READ);
RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
// 2. Verify if the alter is in-progress
TRACE("Verify if there is an alter operation in progress for $0", table->ToString());
resp->set_schema_version(l.data().pb.version());
resp->set_done(l.data().pb.state() != SysTablesEntryPB::ALTERING);
return Status::OK();
}
Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req,
GetTableSchemaResponsePB* resp) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
scoped_refptr<TableInfo> table;
// 1. Lookup the table and verify if it exists
TRACE("Looking up table");
RETURN_NOT_OK(FindTable(req->table(), &table));
if (table == nullptr) {
Status s = Status::NotFound("The table does not exist", req->table().DebugString());
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
TRACE("Locking table");
TableMetadataLock l(table.get(), TableMetadataLock::READ);
RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
if (l.data().pb.has_fully_applied_schema()) {
// An AlterTable is in progress; fully_applied_schema is the last
// schema that has reached every TS.
CHECK_EQ(SysTablesEntryPB::ALTERING, l.data().pb.state());
resp->mutable_schema()->CopyFrom(l.data().pb.fully_applied_schema());
} else {
// There's no AlterTable, the regular schema is "fully applied".
resp->mutable_schema()->CopyFrom(l.data().pb.schema());
}
resp->set_num_replicas(l.data().pb.num_replicas());
resp->set_table_id(table->id());
resp->mutable_partition_schema()->CopyFrom(l.data().pb.partition_schema());
resp->set_create_table_done(!table->IsCreateInProgress());
resp->set_table_name(l.data().pb.name());
return Status::OK();
}
Status CatalogManager::ListTables(const ListTablesRequestPB* req,
ListTablesResponsePB* resp) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
shared_lock<LockType> l(lock_);
for (const TableInfoMap::value_type& entry : table_names_map_) {
TableMetadataLock ltm(entry.second.get(), TableMetadataLock::READ);
if (!ltm.data().is_running()) continue; // implies !is_deleted() too
if (req->has_name_filter()) {
size_t found = ltm.data().name().find(req->name_filter());
if (found == string::npos) {
continue;
}
}
ListTablesResponsePB::TableInfo *table = resp->add_tables();
table->set_id(entry.second->id());
table->set_name(ltm.data().name());
}
return Status::OK();
}
bool CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableInfo> *table) {
shared_lock<LockType> l(lock_);
*table = FindPtrOrNull(table_ids_map_, table_id);
return *table != nullptr;
}
void CatalogManager::GetAllTables(std::vector<scoped_refptr<TableInfo> > *tables) {
tables->clear();
shared_lock<LockType> l(lock_);
for (const TableInfoMap::value_type& e : table_ids_map_) {
tables->push_back(e.second);
}
}
bool CatalogManager::TableNameExists(const string& table_name) {
shared_lock<LockType> l(lock_);
return table_names_map_.find(table_name) != table_names_map_.end();
}
void CatalogManager::NotifyTabletDeleteSuccess(const string& permanent_uuid,
const string& tablet_id) {
// TODO: Clean up the stale deleted tablet data once all relevant tablet
// servers have responded that they have removed the remnants of the deleted
// tablet.
}
Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc,
const TabletReportPB& report,
TabletReportUpdatesPB *report_update,
RpcContext* rpc) {
TRACE_EVENT2("master", "ProcessTabletReport",
"requestor", rpc->requestor_string(),
"num_tablets", report.updated_tablets_size());
leader_lock_.AssertAcquiredForReading();
if (VLOG_IS_ON(2)) {
VLOG(2) << "Received tablet report from " <<
RequestorString(rpc) << ": " << report.DebugString();
}
// TODO: on a full tablet report, we may want to iterate over the tablets we think
// the server should have, compare vs the ones being reported, and somehow mark
// any that have been "lost" (eg somehow the tablet metadata got corrupted or something).
for (const ReportedTabletPB& reported : report.updated_tablets()) {
ReportedTabletUpdatesPB *tablet_report = report_update->add_tablets();
tablet_report->set_tablet_id(reported.tablet_id());
RETURN_NOT_OK_PREPEND(HandleReportedTablet(ts_desc, reported, tablet_report),
Substitute("Error handling $0", reported.ShortDebugString()));
}
if (report.updated_tablets_size() > 0) {
background_tasks_->WakeIfHasPendingUpdates();
}
return Status::OK();
}
namespace {
// Return true if receiving 'report' for a tablet in CREATING state should
// transition it to the RUNNING state.
bool ShouldTransitionTabletToRunning(const ReportedTabletPB& report) {
if (report.state() != tablet::RUNNING) return false;
// In many tests, we disable leader election, so newly created tablets
// will never elect a leader on their own. In this case, we transition
// to RUNNING as soon as we get a single report.
if (!FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader) {
return true;
}
// Otherwise, we only transition to RUNNING once a leader is elected.
return report.committed_consensus_state().has_leader_uuid();
}
} // anonymous namespace
Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
const ReportedTabletPB& report,
ReportedTabletUpdatesPB *report_updates) {
TRACE_EVENT1("master", "HandleReportedTablet",
"tablet_id", report.tablet_id());
scoped_refptr<TabletInfo> tablet;
{
shared_lock<LockType> l(lock_);
tablet = FindPtrOrNull(tablet_map_, report.tablet_id());
}
if (!tablet) {
// It'd be unsafe to ask the tserver to delete this tablet without first
// replicating something to our followers (i.e. to guarantee that we're the
// leader). For example, if we were a rogue master, we might be deleting a
// tablet created by a new master accidentally. But masters retain metadata
// for deleted tablets forever, so a tablet can only be truly unknown in
// the event of a serious misconfiguration, such as a tserver heartbeating
// to the wrong cluster. Therefore, it should be reasonable to ignore it
// and wait for an operator fix the situation.
if (FLAGS_catalog_manager_delete_orphaned_tablets) {
LOG(INFO) << "Deleting unknown tablet " << report.tablet_id();
SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_DELETED,
boost::none, nullptr, ts_desc->permanent_uuid(),
"Report from unknown tablet");
} else {
LOG(WARNING) << "Ignoring report from unknown tablet: "
<< report.tablet_id();
}
return Status::OK();
}
DCHECK(tablet->table()); // guaranteed by TabletLoader
VLOG(3) << "tablet report: " << report.ShortDebugString();
// TODO: we don't actually need to do the COW here until we see we're going
// to change the state. Can we change CowedObject to lazily do the copy?
TableMetadataLock table_lock(tablet->table().get(), TableMetadataLock::READ);
TabletMetadataLock tablet_lock(tablet.get(), TabletMetadataLock::WRITE);
// If the TS is reporting a tablet which has been deleted, or a tablet from
// a table which has been deleted, send it an RPC to delete it.
if (tablet_lock.data().is_deleted() ||
table_lock.data().is_deleted()) {
report_updates->set_state_msg(tablet_lock.data().pb.state_msg());
const string msg = tablet_lock.data().pb.state_msg();
LOG(INFO) << "Got report from deleted tablet " << tablet->ToString()
<< " (" << msg << "): Sending delete request for this tablet";
// TODO: Cancel tablet creation, instead of deleting, in cases where
// that might be possible (tablet creation timeout & replacement).
SendDeleteReplicaRequest(tablet->tablet_id(), TABLET_DATA_DELETED,
boost::none, tablet->table(),
ts_desc->permanent_uuid(), msg);
return Status::OK();
}
if (!table_lock.data().is_running()) {
LOG(INFO) << "Got report from tablet " << tablet->tablet_id()
<< " for non-running table " << tablet->table()->ToString() << ": "
<< tablet_lock.data().pb.state_msg();
report_updates->set_state_msg(tablet_lock.data().pb.state_msg());
return Status::OK();
}
// Check if the tablet requires an "alter table" call
bool tablet_needs_alter = false;
if (report.has_schema_version() &&
table_lock.data().pb.version() != report.schema_version()) {
if (report.schema_version() > table_lock.data().pb.version()) {
LOG(ERROR) << "TS " << ts_desc->permanent_uuid()
<< " has reported a schema version greater than the current one "
<< " for tablet " << tablet->ToString()
<< ". Expected version " << table_lock.data().pb.version()
<< " got " << report.schema_version()
<< " (corruption)";
} else {
LOG(INFO) << "TS " << ts_desc->permanent_uuid()
<< " does not have the latest schema for tablet " << tablet->ToString()
<< ". Expected version " << table_lock.data().pb.version()
<< " got " << report.schema_version();
}
// It's possible that the tablet being reported is a laggy replica, and in fact
// the leader has already received an AlterTable RPC. That's OK, though --
// it'll safely ignore it if we send another.
tablet_needs_alter = true;
}
if (report.has_error()) {
Status s = StatusFromPB(report.error());
DCHECK(!s.ok());
DCHECK_EQ(report.state(), tablet::FAILED);
LOG(WARNING) << "Tablet " << tablet->ToString() << " has failed on TS "
<< ts_desc->permanent_uuid() << ": " << s.ToString();
return Status::OK();
}
// The report will not have a committed_consensus_state if it is in the
// middle of starting up, such as during tablet bootstrap.
if (report.has_committed_consensus_state()) {
const ConsensusStatePB& prev_cstate = tablet_lock.data().pb.committed_consensus_state();
ConsensusStatePB cstate = report.committed_consensus_state();
// Check if we got a report from a tablet that is no longer part of the raft
// config. If so, tombstone it. We only tombstone replicas that include a
// committed raft config in their report that has an opid_index strictly
// less than the latest reported committed config, and (obviously) who are
// not members of the latest config. This prevents us from spuriously
// deleting replicas that have just been added to a pending config and are
// in the process of catching up to the log entry where they were added to
// the config.
if (FLAGS_master_tombstone_evicted_tablet_replicas &&
cstate.config().opid_index() < prev_cstate.config().opid_index() &&
!IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.config())) {
SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED,
prev_cstate.config().opid_index(),
tablet->table(), ts_desc->permanent_uuid(),
Substitute("Replica from old config with index $0 (latest is $1)",
cstate.config().opid_index(),
prev_cstate.config().opid_index()));
return Status::OK();
}
// If the tablet was not RUNNING, and we have a leader elected, mark it as RUNNING.
// We need to wait for a leader before marking a tablet as RUNNING, or else we
// could incorrectly consider a tablet created when only a minority of its replicas
// were successful. In that case, the tablet would be stuck in this bad state
// forever.
if (!tablet_lock.data().is_running() && ShouldTransitionTabletToRunning(report)) {
DCHECK_EQ(SysTabletsEntryPB::CREATING, tablet_lock.data().pb.state())
<< "Tablet in unexpected state: " << tablet->ToString()
<< ": " << tablet_lock.data().pb.ShortDebugString();
// Mark the tablet as running
// TODO: we could batch the IO onto a background thread, or at least
// across multiple tablets in the same report.
VLOG(1) << "Tablet " << tablet->ToString() << " is now online";
tablet_lock.mutable_data()->set_state(SysTabletsEntryPB::RUNNING,
"Tablet reported with an active leader");
}
// The Master only accepts committed consensus configurations since it needs the committed index
// to only cache the most up-to-date config.
if (PREDICT_FALSE(!cstate.config().has_opid_index())) {
LOG(DFATAL) << "Missing opid_index in reported config:\n" << report.DebugString();
return Status::InvalidArgument("Missing opid_index in reported config");
}
bool modified_cstate = false;
if (cstate.config().opid_index() > prev_cstate.config().opid_index() ||
(cstate.has_leader_uuid() &&
(!prev_cstate.has_leader_uuid() || cstate.current_term() > prev_cstate.current_term()))) {
// When a config change is reported to the master, it may not include the
// leader because the follower doing the reporting may not know who the
// leader is yet (it may have just started up). If the reported config
// has the same term as the previous config, and the leader was
// previously known for the current term, then retain knowledge of that
// leader even if it wasn't reported in the latest config.
if (cstate.current_term() == prev_cstate.current_term()) {
if (!cstate.has_leader_uuid() && prev_cstate.has_leader_uuid()) {
cstate.set_leader_uuid(prev_cstate.leader_uuid());
modified_cstate = true;
// Sanity check to detect consensus divergence bugs.
} else if (cstate.has_leader_uuid() && prev_cstate.has_leader_uuid() &&
cstate.leader_uuid() != prev_cstate.leader_uuid()) {
string msg = Substitute("Previously reported cstate for tablet $0 gave "
"a different leader for term $1 than the current cstate. "
"Previous cstate: $2. Current cstate: $3.",
tablet->ToString(), cstate.current_term(),
prev_cstate.ShortDebugString(), cstate.ShortDebugString());
LOG(DFATAL) << msg;
return Status::InvalidArgument(msg);
}
}
// If a replica is reporting a new consensus configuration, update the
// master's copy of that configuration.
LOG(INFO) << "Tablet: " << tablet->tablet_id() << " reported consensus state change."
<< " New consensus state: " << cstate.ShortDebugString();
// If we need to change the report, copy the whole thing on the stack
// rather than const-casting.
const ReportedTabletPB* final_report = &report;
ReportedTabletPB updated_report;
if (modified_cstate) {
updated_report = report;
*updated_report.mutable_committed_consensus_state() = cstate;
final_report = &updated_report;
}
VLOG(2) << "Updating consensus configuration for tablet "
<< final_report->tablet_id()
<< " from config reported by " << ts_desc->permanent_uuid()
<< " to that committed in log index "
<< final_report->committed_consensus_state().config().opid_index()
<< " with leader state from term "
<< final_report->committed_consensus_state().current_term();
RETURN_NOT_OK(HandleRaftConfigChanged(*final_report, tablet,
&tablet_lock, &table_lock));
}
}
table_lock.Unlock();
// We update the tablets each time that someone reports it.
// This shouldn't be very frequent and should only happen when something in fact changed.
SysCatalogTable::Actions actions;
actions.tablets_to_update.push_back(tablet.get());
Status s = sys_catalog_->Write(actions);
if (!s.ok()) {
LOG(WARNING) << "Error updating tablets: " << s.ToString() << ". Tablet report was: "
<< report.ShortDebugString();
return s;
}
tablet_lock.Commit();
// Need to defer the AlterTable command to after we've committed the new tablet data,
// since the tablet report may also be updating the raft config, and the Alter Table
// request needs to know who the most recent leader is.
if (tablet_needs_alter) {
SendAlterTabletRequest(tablet);
} else if (report.has_schema_version()) {
HandleTabletSchemaVersionReport(tablet.get(), report.schema_version());
}
return Status::OK();
}
Status CatalogManager::HandleRaftConfigChanged(
const ReportedTabletPB& report,
const scoped_refptr<TabletInfo>& tablet,
TabletMetadataLock* tablet_lock,
TableMetadataLock* table_lock) {
DCHECK(tablet_lock->is_write_locked());
ConsensusStatePB prev_cstate = tablet_lock->mutable_data()->pb.committed_consensus_state();
const ConsensusStatePB& cstate = report.committed_consensus_state();
*tablet_lock->mutable_data()->pb.mutable_committed_consensus_state() = cstate;
if (FLAGS_master_tombstone_evicted_tablet_replicas) {
unordered_set<string> current_member_uuids;
for (const consensus::RaftPeerPB& peer : cstate.config().peers()) {
InsertOrDie(&current_member_uuids, peer.permanent_uuid());
}
// Send a DeleteTablet() request to peers that are not in the new config.
for (const consensus::RaftPeerPB& prev_peer : prev_cstate.config().peers()) {
const string& peer_uuid = prev_peer.permanent_uuid();
if (!ContainsKey(current_member_uuids, peer_uuid)) {
SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED,
prev_cstate.config().opid_index(), tablet->table(), peer_uuid,
Substitute("TS $0 not found in new config with opid_index $1",
peer_uuid, cstate.config().opid_index()));
}
}
}
// If the config is under-replicated, add a server to the config.
if (FLAGS_master_add_server_when_underreplicated &&
CountVoters(cstate.config()) < table_lock->data().pb.num_replicas()) {
SendAddServerRequest(tablet, cstate);
}
return Status::OK();
}
Status CatalogManager::GetTabletPeer(const string& tablet_id,
scoped_refptr<TabletPeer>* tablet_peer) const {
// Note: CatalogManager has only one table, 'sys_catalog', with only
// one tablet.
shared_lock<LockType> l(lock_);
if (!sys_catalog_) {
return Status::ServiceUnavailable("Systable not yet initialized");
}
if (sys_catalog_->tablet_id() == tablet_id) {
*tablet_peer = sys_catalog_->tablet_peer();
} else {
return Status::NotFound(Substitute("no SysTable exists with tablet_id $0 in CatalogManager",
tablet_id));
}
return Status::OK();
}
const NodeInstancePB& CatalogManager::NodeInstance() const {
return master_->instance_pb();
}
Status CatalogManager::StartRemoteBootstrap(
const StartRemoteBootstrapRequestPB& req,
boost::optional<kudu::tserver::TabletServerErrorPB::Code>* error_code) {
return Status::NotSupported("Remote bootstrap not yet implemented for the master tablet");
}
// Interface used by RetryingTSRpcTask to pick the tablet server to
// send the next RPC to.
class TSPicker {
public:
TSPicker() {}
virtual ~TSPicker() {}
// Sets *ts_uuid to the uuid of the tserver to contact for the next RPC.
virtual Status PickReplica(string* ts_uuid) = 0;
private:
DISALLOW_COPY_AND_ASSIGN(TSPicker);
};
// Implementation of TSPicker which sends to a specific tablet server,
// identified by its UUID.
class PickSpecificUUID : public TSPicker {
public:
explicit PickSpecificUUID(string ts_uuid)
: ts_uuid_(std::move(ts_uuid)) {}
virtual Status PickReplica(string* ts_uuid) OVERRIDE {
// Just a straight passthrough.
*ts_uuid = ts_uuid_;
return Status::OK();
}
private:
const string ts_uuid_;
DISALLOW_COPY_AND_ASSIGN(PickSpecificUUID);
};
// Implementation of TSPicker which locates the current leader replica,
// and sends the RPC to that server.
class PickLeaderReplica : public TSPicker {
public:
explicit PickLeaderReplica(const scoped_refptr<TabletInfo>& tablet) :
tablet_(tablet) {
}
virtual Status PickReplica(string* ts_uuid) OVERRIDE {
TabletMetadataLock l(tablet_.get(), TabletMetadataLock::READ);
string err_msg;
if (!l.data().pb.has_committed_consensus_state()) {
// The tablet is still in the PREPARING state and has no replicas.
err_msg = Substitute("Tablet $0 has no consensus state",
tablet_->tablet_id());
} else if (!l.data().pb.committed_consensus_state().has_leader_uuid()) {
// The tablet may be in the midst of a leader election.
err_msg = Substitute("Tablet $0 consensus state has no leader",
tablet_->tablet_id());
} else {
*ts_uuid = l.data().pb.committed_consensus_state().leader_uuid();
return Status::OK();
}
return Status::NotFound("No leader found", err_msg);
}
private:
const scoped_refptr<TabletInfo> tablet_;
};
// A background task which continuously retries sending an RPC to a tablet server.
//
// The target tablet server is refreshed before each RPC by consulting the provided
// TSPicker implementation.
class RetryingTSRpcTask : public MonitoredTask {
public:
RetryingTSRpcTask(Master *master,
gscoped_ptr<TSPicker> replica_picker,
const scoped_refptr<TableInfo>& table)
: master_(master),
replica_picker_(std::move(replica_picker)),
table_(table),
start_ts_(MonoTime::Now(MonoTime::FINE)),
attempt_(0),
state_(kStateRunning) {
deadline_ = start_ts_;
deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_unresponsive_ts_rpc_timeout_ms));
}
// Send the subclass RPC request.
Status Run() {
if (PREDICT_FALSE(FLAGS_catalog_manager_fail_ts_rpcs)) {
MarkFailed();
UnregisterAsyncTask(); // May delete this.
return Status::RuntimeError("Async RPCs configured to fail");
}
Status s = ResetTSProxy();
if (!s.ok()) {
LOG(WARNING) << "Unable to reset TS proxy: " << s.ToString();
MarkFailed();
UnregisterAsyncTask(); // May delete this.
return s.CloneAndPrepend("Failed to reset TS proxy");
}
// Calculate and set the timeout deadline.
MonoTime timeout = MonoTime::Now(MonoTime::FINE);
timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms));
const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_);
rpc_.set_deadline(deadline);
if (!SendRequest(++attempt_)) {
if (!RescheduleWithBackoffDelay()) {
UnregisterAsyncTask(); // May call 'delete this'.
}
}
return Status::OK();
}
// Abort this task.
virtual void Abort() OVERRIDE {
MarkAborted();
}
virtual State state() const OVERRIDE {
return static_cast<State>(NoBarrier_Load(&state_));
}
virtual MonoTime start_timestamp() const OVERRIDE { return start_ts_; }
virtual MonoTime completion_timestamp() const OVERRIDE { return end_ts_; }
protected:
// Send an RPC request and register a callback.
// The implementation must return true if the callback was registered, and
// false if an error occurred and no callback will occur.
virtual bool SendRequest(int attempt) = 0;
// Handle the response from the RPC request. On success, MarkSuccess() must
// be called to mutate the state_ variable. If retry is desired, then
// no state change is made. Retries will automatically be attempted as long
// as the state is kStateRunning and deadline_ has not yet passed.
//
// Runs on the reactor thread, so must not block or perform any IO.
virtual void HandleResponse(int attempt) = 0;
// Return the id of the tablet that is the subject of the async request.
virtual string tablet_id() const = 0;
// Overridable log prefix with reasonable default.
virtual string LogPrefix() const {
return Substitute("$0: ", description());
}
// Transition from running -> complete.
void MarkComplete() {
NoBarrier_CompareAndSwap(&state_, kStateRunning, kStateComplete);
}
// Transition from running -> aborted.
void MarkAborted() {
NoBarrier_CompareAndSwap(&state_, kStateRunning, kStateAborted);
}
// Transition from running -> failed.
void MarkFailed() {
NoBarrier_CompareAndSwap(&state_, kStateRunning, kStateFailed);
}
// Callback meant to be invoked from asynchronous RPC service proxy calls.
//
// Runs on a reactor thread, so should not block or do any IO.
void RpcCallback() {
if (!rpc_.status().ok()) {
LOG(WARNING) << "TS " << target_ts_desc_->permanent_uuid() << ": "
<< type_name() << " RPC failed for tablet "
<< tablet_id() << ": " << rpc_.status().ToString();
} else if (state() != kStateAborted) {
HandleResponse(attempt_); // Modifies state_.
}
// Schedule a retry if the RPC call was not successful.
if (RescheduleWithBackoffDelay()) {
return;
}
UnregisterAsyncTask(); // May call 'delete this'.
}
Master * const master_;
const gscoped_ptr<TSPicker> replica_picker_;
const scoped_refptr<TableInfo> table_;
MonoTime start_ts_;
MonoTime end_ts_;
MonoTime deadline_;
int attempt_;
rpc::RpcController rpc_;
TSDescriptor* target_ts_desc_;
shared_ptr<tserver::TabletServerAdminServiceProxy> ts_proxy_;
shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;
private:
// Reschedules the current task after a backoff delay.
// Returns false if the task was not rescheduled due to reaching the maximum
// timeout or because the task is no longer in a running state.
// Returns true if rescheduling the task was successful.
bool RescheduleWithBackoffDelay() {
if (state() != kStateRunning) return false;
MonoTime now = MonoTime::Now(MonoTime::FINE);
// We assume it might take 10ms to process the request in the best case,
// fail if we have less than that amount of time remaining.
int64_t millis_remaining = deadline_.GetDeltaSince(now).ToMilliseconds() - 10;
// Exponential backoff with jitter.
int64_t base_delay_ms;
if (attempt_ <= 12) {
base_delay_ms = 1 << (attempt_ + 3); // 1st retry delayed 2^4 ms, 2nd 2^5, etc.
} else {
base_delay_ms = 60 * 1000; // cap at 1 minute
}
int64_t jitter_ms = rand() % 50; // Add up to 50ms of additional random delay.
int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining);
if (delay_millis <= 0) {
LOG(WARNING) << "Request timed out: " << description();
MarkFailed();
} else {
MonoTime new_start_time = now;
new_start_time.AddDelta(MonoDelta::FromMilliseconds(delay_millis));
LOG(INFO) << "Scheduling retry of " << description() << " with a delay"
<< " of " << delay_millis << "ms (attempt = " << attempt_ << ")...";
master_->messenger()->ScheduleOnReactor(
boost::bind(&RetryingTSRpcTask::RunDelayedTask, this, _1),
MonoDelta::FromMilliseconds(delay_millis));
return true;
}
return false;
}
// Callback for Reactor delayed task mechanism. Called either when it is time
// to execute the delayed task (with status == OK) or when the task
// is cancelled, i.e. when the scheduling timer is shut down (status != OK).
void RunDelayedTask(const Status& status) {
if (!status.ok()) {
LOG(WARNING) << "Async tablet task " << description() << " failed or was cancelled: "
<< status.ToString();
UnregisterAsyncTask(); // May delete this.
return;
}
string desc = description(); // Save in case we need to log after deletion.
Status s = Run(); // May delete this.
if (!s.ok()) {
LOG(WARNING) << "Async tablet task " << desc << " failed: " << s.ToString();
}
}
// Clean up request and release resources. May call 'delete this'.
void UnregisterAsyncTask() {
end_ts_ = MonoTime::Now(MonoTime::FINE);
if (table_ != nullptr) {
table_->RemoveTask(this);
} else {
// This is a floating task (since the table does not exist)
// created as response to a tablet report.
Release(); // May call "delete this";
}
}
Status ResetTSProxy() {
// TODO: if there is no replica available, should we still keep the task running?
string ts_uuid;
RETURN_NOT_OK(replica_picker_->PickReplica(&ts_uuid));
shared_ptr<TSDescriptor> ts_desc;
if (!master_->ts_manager()->LookupTSByUUID(ts_uuid, &ts_desc)) {
return Status::NotFound(Substitute("Could not find TS for UUID $0",
ts_uuid));
}
// This assumes that TSDescriptors are never deleted by the master,
// so the task need not take ownership of the returned pointer.
target_ts_desc_ = ts_desc.get();
shared_ptr<tserver::TabletServerAdminServiceProxy> ts_proxy;
RETURN_NOT_OK(target_ts_desc_->GetTSAdminProxy(master_->messenger(), &ts_proxy));
ts_proxy_.swap(ts_proxy);
shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
RETURN_NOT_OK(target_ts_desc_->GetConsensusProxy(master_->messenger(), &consensus_proxy));
consensus_proxy_.swap(consensus_proxy);
rpc_.Reset();
return Status::OK();
}
// Use state() and MarkX() accessors.
AtomicWord state_;
};
// RetryingTSRpcTask subclass which always retries the same tablet server,
// identified by its UUID.
class RetrySpecificTSRpcTask : public RetryingTSRpcTask {
public:
RetrySpecificTSRpcTask(Master* master,
const string& permanent_uuid,
const scoped_refptr<TableInfo>& table)
: RetryingTSRpcTask(master,
gscoped_ptr<TSPicker>(new PickSpecificUUID(permanent_uuid)),
table),
permanent_uuid_(permanent_uuid) {
}
protected:
const string permanent_uuid_;
};
// Fire off the async create tablet.
// This requires that the new tablet info is locked for write, and the
// consensus configuration information has been filled into the 'dirty' data.
class AsyncCreateReplica : public RetrySpecificTSRpcTask {
public:
// The tablet lock must be acquired for reading before making this call.
AsyncCreateReplica(Master *master,
const string& permanent_uuid,
const scoped_refptr<TabletInfo>& tablet,
const TabletMetadataLock& tablet_lock)
: RetrySpecificTSRpcTask(master, permanent_uuid, tablet->table()),
tablet_id_(tablet->tablet_id()) {
deadline_ = start_ts_;
deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms));
TableMetadataLock table_lock(tablet->table().get(), TableMetadataLock::READ);
req_.set_dest_uuid(permanent_uuid);
req_.set_table_id(tablet->table()->id());
req_.set_tablet_id(tablet->tablet_id());
req_.mutable_partition()->CopyFrom(tablet_lock.data().pb.partition());
req_.set_table_name(table_lock.data().pb.name());
req_.mutable_schema()->CopyFrom(table_lock.data().pb.schema());
req_.mutable_partition_schema()->CopyFrom(
table_lock.data().pb.partition_schema());
req_.mutable_config()->CopyFrom(
tablet_lock.data().pb.committed_consensus_state().config());
}
virtual string type_name() const OVERRIDE { return "Create Tablet"; }
virtual string description() const OVERRIDE {
return "CreateTablet RPC for tablet " + tablet_id_ + " on TS " + permanent_uuid_;
}
protected:
virtual string tablet_id() const OVERRIDE { return tablet_id_; }
virtual void HandleResponse(int attempt) OVERRIDE {
if (!resp_.has_error()) {
MarkComplete();
} else {
Status s = StatusFromPB(resp_.error().status());
if (s.IsAlreadyPresent()) {
LOG(INFO) << "CreateTablet RPC for tablet " << tablet_id_
<< " on TS " << permanent_uuid_ << " returned already present: "
<< s.ToString();
MarkComplete();
} else {
LOG(WARNING) << "CreateTablet RPC for tablet " << tablet_id_
<< " on TS " << permanent_uuid_ << " failed: " << s.ToString();
}
}
}
virtual bool SendRequest(int attempt) OVERRIDE {
VLOG(1) << "Send create tablet request to " << permanent_uuid_ << ":\n"
<< " (attempt " << attempt << "):\n"
<< req_.DebugString();
ts_proxy_->CreateTabletAsync(req_, &resp_, &rpc_,
boost::bind(&AsyncCreateReplica::RpcCallback, this));
return true;
}
private:
const string tablet_id_;
tserver::CreateTabletRequestPB req_;
tserver::CreateTabletResponsePB resp_;
};
// Send a DeleteTablet() RPC request.
class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
public:
AsyncDeleteReplica(
Master* master, const string& permanent_uuid,
const scoped_refptr<TableInfo>& table, std::string tablet_id,
TabletDataState delete_type,
boost::optional<int64_t> cas_config_opid_index_less_or_equal,
string reason)
: RetrySpecificTSRpcTask(master, permanent_uuid, table),
tablet_id_(std::move(tablet_id)),
delete_type_(delete_type),
cas_config_opid_index_less_or_equal_(
std::move(cas_config_opid_index_less_or_equal)),
reason_(std::move(reason)) {}
virtual string type_name() const OVERRIDE { return "Delete Tablet"; }
virtual string description() const OVERRIDE {
return tablet_id_ + " Delete Tablet RPC for TS=" + permanent_uuid_;
}
protected:
virtual string tablet_id() const OVERRIDE { return tablet_id_; }
virtual void HandleResponse(int attempt) OVERRIDE {
if (resp_.has_error()) {
Status status = StatusFromPB(resp_.error().status());
// Do not retry on a fatal error
TabletServerErrorPB::Code code = resp_.error().code();
switch (code) {
case TabletServerErrorPB::TABLET_NOT_FOUND:
LOG(WARNING) << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
<< " because the tablet was not found. No further retry: "
<< status.ToString();
MarkComplete();
break;
case TabletServerErrorPB::CAS_FAILED:
LOG(WARNING) << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
<< " due to a CAS failure. No further retry: " << status.ToString();
MarkComplete();
break;
default:
LOG(WARNING) << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
<< " with error code " << TabletServerErrorPB::Code_Name(code)
<< ": " << status.ToString();
break;
}
} else {
master_->catalog_manager()->NotifyTabletDeleteSuccess(permanent_uuid_, tablet_id_);
if (table_) {
LOG(INFO) << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
<< " (table " << table_->ToString() << ") successfully deleted";
} else {
LOG(WARNING) << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
<< " did not belong to a known table, but was successfully deleted";
}
MarkComplete();
VLOG(1) << "TS " << permanent_uuid_ << ": delete complete on tablet " << tablet_id_;
}
}
virtual bool SendRequest(int attempt) OVERRIDE {
tserver::DeleteTabletRequestPB req;
req.set_dest_uuid(permanent_uuid_);
req.set_tablet_id(tablet_id_);
req.set_reason(reason_);
req.set_delete_type(delete_type_);
if (cas_config_opid_index_less_or_equal_) {
req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal_);
}
VLOG(1) << "Send delete tablet request to " << permanent_uuid_
<< " (attempt " << attempt << "):\n"
<< req.DebugString();
ts_proxy_->DeleteTabletAsync(req, &resp_, &rpc_,
boost::bind(&AsyncDeleteReplica::RpcCallback, this));
return true;
}
const std::string tablet_id_;
const TabletDataState delete_type_;
const boost::optional<int64_t> cas_config_opid_index_less_or_equal_;
const std::string reason_;
tserver::DeleteTabletResponsePB resp_;
};
// Send the "Alter Table" with the latest table schema to the leader replica
// for the tablet.
// Keeps retrying until we get an "ok" response.
// - Alter completed
// - Tablet has already a newer version
// (which may happen in case of concurrent alters, or in case a previous attempt timed
// out but was actually applied).
class AsyncAlterTable : public RetryingTSRpcTask {
public:
AsyncAlterTable(Master *master,
const scoped_refptr<TabletInfo>& tablet)
: RetryingTSRpcTask(master,
gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
tablet->table()),
tablet_(tablet) {
}
virtual string type_name() const OVERRIDE { return "Alter Table"; }
virtual string description() const OVERRIDE {
return tablet_->ToString() + " Alter Table RPC";
}
private:
virtual string tablet_id() const OVERRIDE { return tablet_->tablet_id(); }
string permanent_uuid() const {
return target_ts_desc_->permanent_uuid();
}
virtual void HandleResponse(int attempt) OVERRIDE {
if (resp_.has_error()) {
Status status = StatusFromPB(resp_.error().status());
// Do not retry on a fatal error
switch (resp_.error().code()) {
case TabletServerErrorPB::TABLET_NOT_FOUND:
case TabletServerErrorPB::MISMATCHED_SCHEMA:
case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:
LOG(WARNING) << "TS " << permanent_uuid() << ": alter failed for tablet "
<< tablet_->ToString() << " no further retry: " << status.ToString();
MarkComplete();
break;
default:
LOG(WARNING) << "TS " << permanent_uuid() << ": alter failed for tablet "
<< tablet_->ToString() << ": " << status.ToString();
break;
}
} else {
MarkComplete();
VLOG(1) << "TS " << permanent_uuid() << ": alter complete on tablet " << tablet_->ToString();
}
if (state() != kStateComplete) {
VLOG(1) << "Still waiting for other tablets to finish ALTER";
}
}
virtual bool SendRequest(int attempt) OVERRIDE {
TableMetadataLock l(tablet_->table().get(), TableMetadataLock::READ);
tserver::AlterSchemaRequestPB req;
req.set_dest_uuid(permanent_uuid());
req.set_tablet_id(tablet_->tablet_id());
req.set_new_table_name(l.data().pb.name());
req.set_schema_version(l.data().pb.version());
req.mutable_schema()->CopyFrom(l.data().pb.schema());
l.Unlock();
VLOG(1) << "Send alter table request to " << permanent_uuid()
<< " (attempt " << attempt << "):\n"
<< req.DebugString();
ts_proxy_->AlterSchemaAsync(req, &resp_, &rpc_,
boost::bind(&AsyncAlterTable::RpcCallback, this));
return true;
}
scoped_refptr<TabletInfo> tablet_;
tserver::AlterSchemaResponsePB resp_;
};
namespace {
// Select a random TS not in the 'exclude_uuids' list.
// Will not select tablet servers that have not heartbeated recently.
// Returns true iff it was possible to select a replica.
bool SelectRandomTSForReplica(const TSDescriptorVector& ts_descs,
const unordered_set<string>& exclude_uuids,
shared_ptr<TSDescriptor>* selection) {
TSDescriptorVector tablet_servers;
for (const shared_ptr<TSDescriptor>& ts : ts_descs) {
if (!ContainsKey(exclude_uuids, ts->permanent_uuid())) {
tablet_servers.push_back(ts);
}
}
if (tablet_servers.empty()) {
return false;
}
*selection = tablet_servers[rand() % tablet_servers.size()];
return true;
}
} // anonymous namespace
class AsyncAddServerTask : public RetryingTSRpcTask {
public:
AsyncAddServerTask(Master *master,
const scoped_refptr<TabletInfo>& tablet,
const ConsensusStatePB& cstate)
: RetryingTSRpcTask(master,
gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
tablet->table()),
tablet_(tablet),
cstate_(cstate) {
deadline_ = MonoTime::Max(); // Never time out.
}
virtual string type_name() const OVERRIDE { return "AddServer ChangeConfig"; }
virtual string description() const OVERRIDE {
return Substitute("AddServer ChangeConfig RPC for tablet $0 on peer $1 "
"with cas_config_opid_index $2",
tablet_->tablet_id(), permanent_uuid(), cstate_.config().opid_index());
}
protected:
virtual bool SendRequest(int attempt) OVERRIDE;
virtual void HandleResponse(int attempt) OVERRIDE;
private:
virtual string tablet_id() const OVERRIDE { return tablet_->tablet_id(); }
string permanent_uuid() const {
return target_ts_desc_->permanent_uuid();
}
const scoped_refptr<TabletInfo> tablet_;
const ConsensusStatePB cstate_;
consensus::ChangeConfigRequestPB req_;
consensus::ChangeConfigResponsePB resp_;
};
bool AsyncAddServerTask::SendRequest(int attempt) {
// Bail if we're retrying in vain.
int64_t latest_index;
{
TabletMetadataLock tablet_lock(tablet_.get(), TabletMetadataLock::READ);
latest_index = tablet_lock.data().pb.committed_consensus_state().config().opid_index();
}
if (latest_index > cstate_.config().opid_index()) {
LOG_WITH_PREFIX(INFO) << "Latest config for has opid_index of " << latest_index
<< " while this task has opid_index of "
<< cstate_.config().opid_index() << ". Aborting task.";
MarkAborted();
return false;
}
// Select the replica we wish to add to the config.
// Do not include current members of the config.
unordered_set<string> replica_uuids;
for (const RaftPeerPB& peer : cstate_.config().peers()) {
InsertOrDie(&replica_uuids, peer.permanent_uuid());
}
TSDescriptorVector ts_descs;
master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
shared_ptr<TSDescriptor> replacement_replica;
if (PREDICT_FALSE(!SelectRandomTSForReplica(ts_descs, replica_uuids, &replacement_replica))) {
KLOG_EVERY_N(WARNING, 100) << LogPrefix() << "No candidate replacement replica found "
<< "for tablet " << tablet_->ToString();
return false;
}
req_.set_dest_uuid(permanent_uuid());
req_.set_tablet_id(tablet_->tablet_id());
req_.set_type(consensus::ADD_SERVER);
req_.set_cas_config_opid_index(cstate_.config().opid_index());
RaftPeerPB* peer = req_.mutable_server();
peer->set_permanent_uuid(replacement_replica->permanent_uuid());
TSRegistrationPB peer_reg;
replacement_replica->GetRegistration(&peer_reg);
if (peer_reg.rpc_addresses_size() == 0) {
KLOG_EVERY_N(WARNING, 100) << LogPrefix() << "Candidate replacement "
<< replacement_replica->permanent_uuid()
<< " has no registered rpc address: "
<< peer_reg.ShortDebugString();
return false;
}
*peer->mutable_last_known_addr() = peer_reg.rpc_addresses(0);
peer->set_member_type(RaftPeerPB::VOTER);
VLOG(1) << "Sending AddServer ChangeConfig request to " << permanent_uuid() << ":\n"
<< req_.DebugString();
consensus_proxy_->ChangeConfigAsync(req_, &resp_, &rpc_,
boost::bind(&AsyncAddServerTask::RpcCallback, this));
return true;
}
void AsyncAddServerTask::HandleResponse(int attempt) {
if (!resp_.has_error()) {
MarkComplete();
LOG_WITH_PREFIX(INFO) << "Change config succeeded";
return;
}
Status status = StatusFromPB(resp_.error().status());
// Do not retry on a CAS error, otherwise retry forever or until cancelled.
switch (resp_.error().code()) {
case TabletServerErrorPB::CAS_FAILED:
LOG_WITH_PREFIX(WARNING) << "ChangeConfig() failed with leader " << permanent_uuid()
<< " due to CAS failure. No further retry: "
<< status.ToString();
MarkFailed();
break;
default:
LOG_WITH_PREFIX(INFO) << "ChangeConfig() failed with leader " << permanent_uuid()
<< " due to error "
<< TabletServerErrorPB::Code_Name(resp_.error().code())
<< ". This operation will be retried. Error detail: "
<< status.ToString();
break;
}
}
void CatalogManager::SendAlterTableRequest(const scoped_refptr<TableInfo>& table) {
vector<scoped_refptr<TabletInfo> > tablets;
table->GetAllTablets(&tablets);
for (const scoped_refptr<TabletInfo>& tablet : tablets) {
SendAlterTabletRequest(tablet);
}
}
void CatalogManager::SendAlterTabletRequest(const scoped_refptr<TabletInfo>& tablet) {
auto call = new AsyncAlterTable(master_, tablet);
tablet->table()->AddTask(call);
WARN_NOT_OK(call->Run(), "Failed to send alter table request");
}
void CatalogManager::SendDeleteTableRequest(const scoped_refptr<TableInfo>& table,
const string& deletion_msg) {
vector<scoped_refptr<TabletInfo> > tablets;
table->GetAllTablets(&tablets);
for (const scoped_refptr<TabletInfo>& tablet : tablets) {
TabletMetadataLock l(tablet.get(), TabletMetadataLock::READ);
SendDeleteTabletRequest(tablet, l, deletion_msg);
}
}
void CatalogManager::SendDeleteTabletRequest(const scoped_refptr<TabletInfo>& tablet,
const TabletMetadataLock& tablet_lock,
const string& deletion_msg) {
if (!tablet_lock.data().pb.has_committed_consensus_state()) {
// We could end up here if we're deleting a tablet that never made it to
// the CREATING state. That would mean no replicas were ever assigned, so
// there's nothing to delete.
LOG(INFO) << "Not sending DeleteTablet requests; no consensus state for tablet "
<< tablet->tablet_id();
return;
}
const ConsensusStatePB& cstate = tablet_lock.data().pb.committed_consensus_state();
LOG(INFO) << "Sending DeleteTablet for " << cstate.config().peers().size()
<< " replicas of tablet " << tablet->tablet_id();
for (const auto& peer : cstate.config().peers()) {
SendDeleteReplicaRequest(tablet->tablet_id(), TABLET_DATA_DELETED,
boost::none, tablet->table(),
peer.permanent_uuid(), deletion_msg);
}
}
void CatalogManager::SendDeleteReplicaRequest(
const std::string& tablet_id,
TabletDataState delete_type,
const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
const scoped_refptr<TableInfo>& table,
const string& ts_uuid,
const string& reason) {
LOG_WITH_PREFIX(INFO) << Substitute("Deleting tablet $0 on peer $1 "
"with delete type $2 ($3)",
tablet_id, ts_uuid,
TabletDataState_Name(delete_type),
reason);
AsyncDeleteReplica* call =
new AsyncDeleteReplica(master_, ts_uuid, table,
tablet_id, delete_type, cas_config_opid_index_less_or_equal,
reason);
if (table != nullptr) {
table->AddTask(call);
} else {
// This is a floating task (since the table does not exist)
// created as response to a tablet report.
call->AddRef();
}
WARN_NOT_OK(call->Run(), "Failed to send delete tablet request");
}
void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& tablet,
const ConsensusStatePB& cstate) {
auto task = new AsyncAddServerTask(master_, tablet, cstate);
tablet->table()->AddTask(task);
WARN_NOT_OK(task->Run(), "Failed to send new AddServer request");
// We can't access 'task' here because it may delete itself inside Run() in the
// case that the tablet has no known leader.
LOG(INFO) << "Started AddServer task for tablet " << tablet->tablet_id();
}
void CatalogManager::ExtractTabletsToProcess(
vector<scoped_refptr<TabletInfo>>* tablets_to_process) {
shared_lock<LockType> l(lock_);
// TODO: At the moment we loop through all the tablets
// we can keep a set of tablets waiting for "assignment"
// or just a counter to avoid to take the lock and loop through the tablets
// if everything is "stable".
// 'tablets_to_process' elements must be partially ordered in the same way as
// table->GetAllTablets(); see the locking rules at the top of the file.
for (const auto& table_entry : table_ids_map_) {
scoped_refptr<TableInfo> table = table_entry.second;
TableMetadataLock table_lock(table.get(), TableMetadataLock::READ);
if (table_lock.data().is_deleted()) {
continue;
}
vector<scoped_refptr<TabletInfo>> tablets;
table->GetAllTablets(&tablets);
for (const auto& tablet : tablets) {
TabletMetadataLock tablet_lock(tablet.get(), TabletMetadataLock::READ);
if (tablet_lock.data().is_deleted() ||
tablet_lock.data().is_running()) {
continue;
}
tablets_to_process->push_back(tablet);
}
}
}
struct DeferredAssignmentActions {
vector<TabletInfo*> tablets_to_add;
vector<TabletInfo*> tablets_to_update;
vector<TabletInfo*> needs_create_rpc;
};
void CatalogManager::HandleAssignPreparingTablet(TabletInfo* tablet,
DeferredAssignmentActions* deferred) {
// The tablet was just created (probably by a CreateTable RPC).
// Update the state to "creating" to be ready for the creation request.
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::CREATING, "Sending initial creation of tablet");
deferred->tablets_to_update.push_back(tablet);
deferred->needs_create_rpc.push_back(tablet);
VLOG(1) << "Assign new tablet " << tablet->ToString();
}
void CatalogManager::HandleAssignCreatingTablet(TabletInfo* tablet,
DeferredAssignmentActions* deferred,
vector<scoped_refptr<TabletInfo> >* new_tablets) {
MonoDelta time_since_updated =
MonoTime::Now(MonoTime::FINE).GetDeltaSince(tablet->last_create_tablet_time());
int64_t remaining_timeout_ms =
FLAGS_tablet_creation_timeout_ms - time_since_updated.ToMilliseconds();
// Skip the tablet if the assignment timeout is not yet expired
if (remaining_timeout_ms > 0) {
VLOG(2) << "Tablet " << tablet->ToString() << " still being created. "
<< remaining_timeout_ms << "ms remain until timeout.";
return;
}
const PersistentTabletInfo& old_info = tablet->metadata().state();
// The "tablet creation" was already sent, but we didn't receive an answer
// within the timeout. So the tablet will be replaced by a new one.
scoped_refptr<TabletInfo> replacement = CreateTabletInfo(tablet->table().get(),
old_info.pb.partition());
LOG(WARNING) << "Tablet " << tablet->ToString() << " was not created within "
<< "the allowed timeout. Replacing with a new tablet "
<< replacement->tablet_id();
// Mark old tablet as replaced.
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::REPLACED,
Substitute("Replaced by $0 at $1",
replacement->tablet_id(), LocalTimeAsString()));
// Mark new tablet as being created.
replacement->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::CREATING,
Substitute("Replacement for $0", tablet->tablet_id()));
deferred->tablets_to_update.push_back(tablet);
deferred->tablets_to_add.push_back(replacement.get());
deferred->needs_create_rpc.push_back(replacement.get());
VLOG(1) << "Replaced tablet " << tablet->tablet_id()
<< " with " << replacement->tablet_id()
<< " (Table " << tablet->table()->ToString() << ")";
new_tablets->emplace_back(std::move(replacement));
}
// TODO: we could batch the IO onto a background thread.
// but this is following the current HandleReportedTablet()
Status CatalogManager::HandleTabletSchemaVersionReport(TabletInfo *tablet, uint32_t version) {
// Update the schema version if it's the latest
tablet->set_reported_schema_version(version);
// Verify if it's the last tablet report, and the alter completed.
TableInfo *table = tablet->table().get();
TableMetadataLock l(table, TableMetadataLock::WRITE);
if (l.data().is_deleted() || l.data().pb.state() != SysTablesEntryPB::ALTERING) {
return Status::OK();
}
uint32_t current_version = l.data().pb.version();
if (table->IsAlterInProgress(current_version)) {
return Status::OK();
}
// Update the state from altering to running and remove the last fully
// applied schema (if it exists).
l.mutable_data()->pb.clear_fully_applied_schema();
l.mutable_data()->set_state(SysTablesEntryPB::RUNNING,
Substitute("Current schema version=$0", current_version));
SysCatalogTable::Actions actions;
actions.table_to_update = table;
Status s = sys_catalog_->Write(actions);
if (!s.ok()) {
LOG(WARNING) << "An error occurred while updating sys-tables: " << s.ToString();
return s;
}
l.Commit();
LOG(INFO) << table->ToString() << " - Alter table completed version=" << current_version;
return Status::OK();
}
Status CatalogManager::ProcessPendingAssignments(
const std::vector<scoped_refptr<TabletInfo> >& tablets) {
VLOG(1) << "Processing pending assignments";
// Take write locks on all tablets to be processed, and ensure that they are
// unlocked at the end of this scope.
ScopedTabletInfoCommitter unlocker_in(ScopedTabletInfoCommitter::UNLOCKED);
unlocker_in.AddTablets(tablets);
unlocker_in.LockTabletsForWriting();
// Any tablets created by the helper functions will also be created in a
// locked state, so we must ensure they are unlocked before we return to
// avoid deadlocks.
ScopedTabletInfoCommitter unlocker_out(ScopedTabletInfoCommitter::LOCKED);
DeferredAssignmentActions deferred;
// Iterate over each of the tablets and handle it, whatever state
// it may be in. The actions required for the tablet are collected
// into 'deferred'.
for (const scoped_refptr<TabletInfo>& tablet : tablets) {
SysTabletsEntryPB::State t_state = tablet->metadata().state().pb.state();
switch (t_state) {
case SysTabletsEntryPB::PREPARING:
HandleAssignPreparingTablet(tablet.get(), &deferred);
break;
case SysTabletsEntryPB::CREATING:
{
vector<scoped_refptr<TabletInfo>> new_tablets;
HandleAssignCreatingTablet(tablet.get(), &deferred, &new_tablets);
unlocker_out.AddTablets(new_tablets);
break;
}
default:
VLOG(2) << "Nothing to do for tablet " << tablet->tablet_id() << ": state = "
<< SysTabletsEntryPB_State_Name(t_state);
break;
}
}
// Nothing to do
if (deferred.tablets_to_add.empty() &&
deferred.tablets_to_update.empty() &&
deferred.needs_create_rpc.empty()) {
return Status::OK();
}
// For those tablets which need to be created in this round, assign replicas.
TSDescriptorVector ts_descs;
master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
Status s;
for (TabletInfo *tablet : deferred.needs_create_rpc) {
// NOTE: if we fail to select replicas on the first pass (due to
// insufficient Tablet Servers being online), we will still try
// again unless the tablet/table creation is cancelled.
s = SelectReplicasForTablet(ts_descs, tablet);
if (!s.ok()) {
s = s.CloneAndPrepend(Substitute(
"An error occured while selecting replicas for tablet $0",
tablet->tablet_id()));
break;
}
}
// Update the sys catalog with the new set of tablets/metadata.
if (s.ok()) {
SysCatalogTable::Actions actions;
actions.tablets_to_add = deferred.tablets_to_add;
actions.tablets_to_update = deferred.tablets_to_update;
s = sys_catalog_->Write(actions);
if (!s.ok()) {
s = s.CloneAndPrepend("An error occurred while persisting the updated tablet metadata");
}
}
if (!s.ok()) {
LOG(WARNING) << "Aborting the current task due to error: " << s.ToString();
// If there was an error, abort any mutations started by the
// current task.
unlocker_out.Abort();
unlocker_in.Abort();
return s;
}
// Expose tablet metadata changes before the new tablets themselves.
unlocker_out.Commit();
unlocker_in.Commit();
{
std::lock_guard<LockType> l(lock_);
for (const auto& new_tablet : unlocker_out) {
new_tablet->table()->AddTablet(new_tablet.get());
tablet_map_[new_tablet->tablet_id()] = new_tablet;
}
}
// Send DeleteTablet requests to tablet servers serving deleted tablets.
// This is asynchronous / non-blocking.
for (TabletInfo* tablet : deferred.tablets_to_update) {
TabletMetadataLock l(tablet, TabletMetadataLock::READ);
if (l.data().is_deleted()) {
SendDeleteTabletRequest(tablet, l, l.data().pb.state_msg());
}
}
// Send the CreateTablet() requests to the servers. This is asynchronous / non-blocking.
for (TabletInfo* tablet : deferred.needs_create_rpc) {
TabletMetadataLock l(tablet, TabletMetadataLock::READ);
SendCreateTabletRequest(tablet, l);
}
return Status::OK();
}
Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_descs,
TabletInfo* tablet) {
TableMetadataLock table_guard(tablet->table().get(), TableMetadataLock::READ);
if (!table_guard.data().pb.IsInitialized()) {
return Status::InvalidArgument(
Substitute("TableInfo for tablet $0 is not initialized (aborted CreateTable attempt?)",
tablet->tablet_id()));
}
int nreplicas = table_guard.data().pb.num_replicas();
if (ts_descs.size() < nreplicas) {
return Status::InvalidArgument(
Substitute("Not enough tablet servers are online for table '$0'. Need at least $1 "
"replicas, but only $2 tablet servers are available",
table_guard.data().name(), nreplicas, ts_descs.size()));
}
// Select the set of replicas for the tablet.
ConsensusStatePB* cstate = tablet->mutable_metadata()->mutable_dirty()
->pb.mutable_committed_consensus_state();
cstate->set_current_term(kMinimumTerm);
consensus::RaftConfigPB *config = cstate->mutable_config();
config->set_opid_index(consensus::kInvalidOpIdIndex);
SelectReplicas(ts_descs, nreplicas, config);
return Status::OK();
}
void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& tablet,
const TabletMetadataLock& tablet_lock) {
const consensus::RaftConfigPB& config =
tablet_lock.data().pb.committed_consensus_state().config();
tablet->set_last_create_tablet_time(MonoTime::Now(MonoTime::FINE));
for (const RaftPeerPB& peer : config.peers()) {
AsyncCreateReplica* task = new AsyncCreateReplica(master_,
peer.permanent_uuid(),
tablet, tablet_lock);
tablet->table()->AddTask(task);
WARN_NOT_OK(task->Run(), "Failed to send new tablet request");
}
}
shared_ptr<TSDescriptor> CatalogManager::PickBetterReplicaLocation(
const TSDescriptorVector& two_choices) {
DCHECK_EQ(two_choices.size(), 2);
const auto& a = two_choices[0];
const auto& b = two_choices[1];
// When creating replicas, we consider two aspects of load:
// (1) how many tablet replicas are already on the server, and
// (2) how often we've chosen this server recently.
//
// The first factor will attempt to put more replicas on servers that
// are under-loaded (eg because they have newly joined an existing cluster, or have
// been reformatted and re-joined).
//
// The second factor will ensure that we take into account the recent selection
// decisions even if those replicas are still in the process of being created (and thus
// not yet reported by the server). This is important because, while creating a table,
// we batch the selection process before sending any creation commands to the
// servers themselves.
//
// TODO: in the future we may want to factor in other items such as available disk space,
// actual request load, etc.
double load_a = a->RecentReplicaCreations() + a->num_live_replicas();
double load_b = b->RecentReplicaCreations() + b->num_live_replicas();
if (load_a < load_b) {
return a;
} else if (load_b < load_a) {
return b;
} else {
// If the load is the same, we can just pick randomly.
return two_choices[rng_.Uniform(2)];
}
}
shared_ptr<TSDescriptor> CatalogManager::SelectReplica(
const TSDescriptorVector& ts_descs,
const set<shared_ptr<TSDescriptor>>& excluded) {
// The replica selection algorithm follows the idea from
// "Power of Two Choices in Randomized Load Balancing"[1]. For each replica,
// we randomly select two tablet servers, and then assign the replica to the
// less-loaded one of the two. This has some nice properties:
//
// 1) because the initial selection of two servers is random, we get good
// spreading of replicas across the cluster. In contrast if we sorted by
// load and always picked under-loaded servers first, we'd end up causing
// all tablets of a new table to be placed on an empty server. This wouldn't
// give good load balancing of that table.
//
// 2) because we pick the less-loaded of two random choices, we do end up with a
// weighting towards filling up the underloaded one over time, without
// the extreme scenario above.
//
// 3) because we don't follow any sequential pattern, every server is equally
// likely to replicate its tablets to every other server. In contrast, a
// round-robin design would enforce that each server only replicates to its
// adjacent nodes in the TS sort order, limiting recovery bandwidth (see
// KUDU-1317).
//
// [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
// Pick two random servers, excluding those we've already picked.
// If we've only got one server left, 'two_choices' will actually
// just contain one element.
vector<shared_ptr<TSDescriptor> > two_choices;
rng_.ReservoirSample(ts_descs, 2, excluded, &two_choices);
if (two_choices.size() == 2) {
// Pick the better of the two.
return PickBetterReplicaLocation(two_choices);
}
// If we couldn't randomly sample two servers, it's because we only had one
// more non-excluded choice left.
CHECK_EQ(1, ts_descs.size() - excluded.size())
<< "ts_descs: " << ts_descs.size() << " already_sel: " << excluded.size();
return two_choices[0];
}
void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs,
int nreplicas,
consensus::RaftConfigPB *config) {
DCHECK_EQ(0, config->peers_size()) << "RaftConfig not empty: " << config->ShortDebugString();
DCHECK_LE(nreplicas, ts_descs.size());
// Keep track of servers we've already selected, so that we don't attempt to
// put two replicas on the same host.
set<shared_ptr<TSDescriptor> > already_selected;
for (int i = 0; i < nreplicas; ++i) {
shared_ptr<TSDescriptor> ts = SelectReplica(ts_descs, already_selected);
InsertOrDie(&already_selected, ts);
// Increment the number of pending replicas so that we take this selection into
// account when assigning replicas for other tablets of the same table. This
// value decays back to 0 over time.
ts->IncrementRecentReplicaCreations();
TSRegistrationPB reg;
ts->GetRegistration(&reg);
RaftPeerPB *peer = config->add_peers();
peer->set_member_type(RaftPeerPB::VOTER);
peer->set_permanent_uuid(ts->permanent_uuid());
// TODO: This is temporary, we will use only UUIDs
for (const HostPortPB& addr : reg.rpc_addresses()) {
peer->mutable_last_known_addr()->CopyFrom(addr);
}
}
}
Status CatalogManager::BuildLocationsForTablet(const scoped_refptr<TabletInfo>& tablet,
TabletLocationsPB* locs_pb) {
TabletMetadataLock l_tablet(tablet.get(), TabletMetadataLock::READ);
if (PREDICT_FALSE(l_tablet.data().is_deleted())) {
return Status::NotFound("Tablet deleted", l_tablet.data().pb.state_msg());
}
if (PREDICT_FALSE(!l_tablet.data().is_running())) {
return Status::ServiceUnavailable("Tablet not running");
}
// Guaranteed because the tablet is RUNNING.
DCHECK(l_tablet.data().pb.has_committed_consensus_state());
const ConsensusStatePB& cstate = l_tablet.data().pb.committed_consensus_state();
for (const consensus::RaftPeerPB& peer : cstate.config().peers()) {
// TODO: GetConsensusRole() iterates over all of the peers, making this an
// O(n^2) loop. If replication counts get high, it should be optimized.
TabletLocationsPB_ReplicaPB* replica_pb = locs_pb->add_replicas();
replica_pb->set_role(GetConsensusRole(peer.permanent_uuid(), cstate));
TSInfoPB* tsinfo_pb = replica_pb->mutable_ts_info();
tsinfo_pb->set_permanent_uuid(peer.permanent_uuid());
shared_ptr<TSDescriptor> ts_desc;
if (master_->ts_manager()->LookupTSByUUID(peer.permanent_uuid(), &ts_desc)) {
TSRegistrationPB reg;
ts_desc->GetRegistration(&reg);
tsinfo_pb->mutable_rpc_addresses()->Swap(reg.mutable_rpc_addresses());
} else {
// If we've never received a heartbeat from the tserver, we'll fall back
// to the last known RPC address in the RaftPeerPB.
//
// TODO: We should track these RPC addresses in the master table itself.
tsinfo_pb->add_rpc_addresses()->CopyFrom(peer.last_known_addr());
}
}
locs_pb->mutable_partition()->CopyFrom(tablet->metadata().state().pb.partition());
locs_pb->set_tablet_id(tablet->tablet_id());
// No longer used; always set to false.
locs_pb->set_deprecated_stale(false);
return Status::OK();
}
Status CatalogManager::GetTabletLocations(const std::string& tablet_id,
TabletLocationsPB* locs_pb) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
locs_pb->mutable_replicas()->Clear();
scoped_refptr<TabletInfo> tablet_info;
{
shared_lock<LockType> l(lock_);
if (!FindCopy(tablet_map_, tablet_id, &tablet_info)) {
return Status::NotFound(Substitute("Unknown tablet $0", tablet_id));
}
}
return BuildLocationsForTablet(tablet_info, locs_pb);
}
Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
GetTableLocationsResponsePB* resp) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
// If start-key is > end-key report an error instead of swap the two
// since probably there is something wrong app-side.
if (req->has_partition_key_start() && req->has_partition_key_end()
&& req->partition_key_start() > req->partition_key_end()) {
return Status::InvalidArgument("start partition key is greater than the end partition key");
}
if (req->max_returned_locations() <= 0) {
return Status::InvalidArgument("max_returned_locations must be greater than 0");
}
scoped_refptr<TableInfo> table;
RETURN_NOT_OK(FindTable(req->table(), &table));
if (table == nullptr) {
Status s = Status::NotFound("The table does not exist");
SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
return s;
}
TableMetadataLock l(table.get(), TableMetadataLock::READ);
RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
vector<scoped_refptr<TabletInfo> > tablets_in_range;
table->GetTabletsInRange(req, &tablets_in_range);
TSRegistrationPB reg;
for (const scoped_refptr<TabletInfo>& tablet : tablets_in_range) {
Status s = BuildLocationsForTablet(tablet, resp->add_tablet_locations());
if (s.ok()) {
continue;
} else if (s.IsNotFound()) {
// The tablet has been deleted; force the client to retry. This is a
// transient state that only happens with a concurrent drop range
// partition alter table operation.
resp->Clear();
resp->mutable_error()->set_code(MasterErrorPB_Code::MasterErrorPB_Code_TABLET_NOT_RUNNING);
StatusToPB(Status::ServiceUnavailable("Tablet not running"),
resp->mutable_error()->mutable_status());
} else if (s.IsServiceUnavailable()) {
// The tablet is not yet running; fail the request.
resp->Clear();
resp->mutable_error()->set_code(MasterErrorPB_Code::MasterErrorPB_Code_TABLET_NOT_RUNNING);
StatusToPB(s, resp->mutable_error()->mutable_status());
break;
} else {
LOG(FATAL) << "Unexpected error while building tablet locations: " << s.ToString();
}
}
resp->set_ttl_millis(FLAGS_table_locations_ttl_ms);
return Status::OK();
}
void CatalogManager::DumpState(std::ostream* out) const {
TableInfoMap ids_copy, names_copy;
TabletInfoMap tablets_copy;
// Copy the internal state so that, if the output stream blocks,
// we don't end up holding the lock for a long time.
{
shared_lock<LockType> l(lock_);
ids_copy = table_ids_map_;
names_copy = table_names_map_;
tablets_copy = tablet_map_;
}
*out << "Tables:\n";
for (const TableInfoMap::value_type& e : ids_copy) {
TableInfo* t = e.second.get();
TableMetadataLock l(t, TableMetadataLock::READ);
const string& name = l.data().name();
*out << t->id() << ":\n";
*out << " name: \"" << strings::CHexEscape(name) << "\"\n";
// Erase from the map, so later we can check that we don't have
// any orphaned tables in the by-name map that aren't in the
// by-id map.
if (names_copy.erase(name) != 1) {
*out << " [not present in by-name map]\n";
}
*out << " metadata: " << l.data().pb.ShortDebugString() << "\n";
*out << " tablets:\n";
vector<scoped_refptr<TabletInfo> > table_tablets;
t->GetAllTablets(&table_tablets);
for (const scoped_refptr<TabletInfo>& tablet : table_tablets) {
TabletMetadataLock l_tablet(tablet.get(), TabletMetadataLock::READ);
*out << " " << tablet->tablet_id() << ": "
<< l_tablet.data().pb.ShortDebugString() << "\n";
if (tablets_copy.erase(tablet->tablet_id()) != 1) {
*out << " [ERROR: not present in CM tablet map!]\n";
}
}
}
if (!tablets_copy.empty()) {
*out << "Orphaned tablets (not referenced by any table):\n";
for (const TabletInfoMap::value_type& entry : tablets_copy) {
const scoped_refptr<TabletInfo>& tablet = entry.second;
TabletMetadataLock l_tablet(tablet.get(), TabletMetadataLock::READ);
*out << " " << tablet->tablet_id() << ": "
<< l_tablet.data().pb.ShortDebugString() << "\n";
}
}
if (!names_copy.empty()) {
*out << "Orphaned tables (in by-name map, but not id map):\n";
for (const TableInfoMap::value_type& e : names_copy) {
*out << e.second->id() << ":\n";
*out << " name: \"" << CHexEscape(e.first) << "\"\n";
}
}
}
std::string CatalogManager::LogPrefix() const {
return Substitute("T $0 P $1: ",
sys_catalog_->tablet_peer()->tablet_id(),
sys_catalog_->tablet_peer()->permanent_uuid());
}
void CatalogManager::AbortAndWaitForAllTasks(
const vector<scoped_refptr<TableInfo>>& tables) {
for (const auto& t : tables) {
t->AbortTasks();
}
for (const auto& t : tables) {
t->WaitTasksCompletion();
}
}
////////////////////////////////////////////////////////////
// CatalogManager::ScopedLeaderSharedLock
////////////////////////////////////////////////////////////
CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
CatalogManager* catalog)
: catalog_(DCHECK_NOTNULL(catalog)),
leader_shared_lock_(catalog->leader_lock_, std::try_to_lock) {
// Check if the catalog manager is running.
std::lock_guard<simple_spinlock> l(catalog_->state_lock_);
if (PREDICT_FALSE(catalog_->state_ != kRunning)) {
catalog_status_ = Status::ServiceUnavailable(
Substitute("Catalog manager is not initialized. State: $0",
catalog_->state_));
return;
}
// Check if the catalog manager is the leader.
Consensus* consensus = catalog_->sys_catalog_->tablet_peer_->consensus();
ConsensusStatePB cstate = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED);
string uuid = catalog_->master_->fs_manager()->uuid();
if (PREDICT_FALSE(!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid)) {
leader_status_ = Status::IllegalState(
Substitute("Not the leader. Local UUID: $0, Consensus state: $1",
uuid, cstate.ShortDebugString()));
return;
}
if (PREDICT_FALSE(catalog_->leader_ready_term_ != cstate.current_term() ||
!leader_shared_lock_.owns_lock())) {
leader_status_ = Status::ServiceUnavailable(
"Leader not yet ready to serve requests");
return;
}
}
template<typename RespClass>
bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedOrRespond(
RespClass* resp, RpcContext* rpc) {
if (PREDICT_FALSE(!catalog_status_.ok())) {
StatusToPB(catalog_status_, resp->mutable_error()->mutable_status());
resp->mutable_error()->set_code(
MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED);
rpc->RespondSuccess();
return false;
}
return true;
}
template<typename RespClass>
bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond(
RespClass* resp, RpcContext* rpc) {
const Status& s = first_failed_status();
if (PREDICT_TRUE(s.ok())) {
return true;
}
StatusToPB(s, resp->mutable_error()->mutable_status());
resp->mutable_error()->set_code(MasterErrorPB::NOT_THE_LEADER);
rpc->RespondSuccess();
return false;
}
// Explicit specialization for callers outside this compilation unit.
#define INITTED_OR_RESPOND(RespClass) \
template bool \
CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedOrRespond( \
RespClass* resp, RpcContext* rpc)
#define INITTED_AND_LEADER_OR_RESPOND(RespClass) \
template bool \
CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond( \
RespClass* resp, RpcContext* rpc)
INITTED_OR_RESPOND(GetMasterRegistrationResponsePB);
INITTED_OR_RESPOND(TSHeartbeatResponsePB);
INITTED_AND_LEADER_OR_RESPOND(AlterTableResponsePB);
INITTED_AND_LEADER_OR_RESPOND(CreateTableResponsePB);
INITTED_AND_LEADER_OR_RESPOND(DeleteTableResponsePB);
INITTED_AND_LEADER_OR_RESPOND(IsAlterTableDoneResponsePB);
INITTED_AND_LEADER_OR_RESPOND(IsCreateTableDoneResponsePB);
INITTED_AND_LEADER_OR_RESPOND(ListTablesResponsePB);
INITTED_AND_LEADER_OR_RESPOND(ListTabletServersResponsePB);
INITTED_AND_LEADER_OR_RESPOND(GetTableLocationsResponsePB);
INITTED_AND_LEADER_OR_RESPOND(GetTableSchemaResponsePB);
INITTED_AND_LEADER_OR_RESPOND(GetTabletLocationsResponsePB);
#undef INITTED_OR_RESPOND
#undef INITTED_AND_LEADER_OR_RESPOND
////////////////////////////////////////////////////////////
// TabletInfo
////////////////////////////////////////////////////////////
TabletInfo::TabletInfo(const scoped_refptr<TableInfo>& table,
std::string tablet_id)
: tablet_id_(std::move(tablet_id)),
table_(table),
last_create_tablet_time_(MonoTime::Now(MonoTime::FINE)),
reported_schema_version_(0) {}
TabletInfo::~TabletInfo() {
}
void TabletInfo::set_last_create_tablet_time(const MonoTime& ts) {
std::lock_guard<simple_spinlock> l(lock_);
last_create_tablet_time_ = ts;
}
MonoTime TabletInfo::last_create_tablet_time() const {
std::lock_guard<simple_spinlock> l(lock_);
return last_create_tablet_time_;
}
bool TabletInfo::set_reported_schema_version(uint32_t version) {
std::lock_guard<simple_spinlock> l(lock_);
if (version > reported_schema_version_) {
reported_schema_version_ = version;
return true;
}
return false;
}
uint32_t TabletInfo::reported_schema_version() const {
std::lock_guard<simple_spinlock> l(lock_);
return reported_schema_version_;
}
std::string TabletInfo::ToString() const {
return Substitute("$0 (table $1)", tablet_id_,
(table_ != nullptr ? table_->ToString() : "MISSING"));
}
void PersistentTabletInfo::set_state(SysTabletsEntryPB::State state, const string& msg) {
pb.set_state(state);
pb.set_state_msg(msg);
}
////////////////////////////////////////////////////////////
// TableInfo
////////////////////////////////////////////////////////////
TableInfo::TableInfo(std::string table_id) : table_id_(std::move(table_id)) {}
TableInfo::~TableInfo() {
}
std::string TableInfo::ToString() const {
TableMetadataLock l(this, TableMetadataLock::READ);
return Substitute("$0 [id=$1]", l.data().pb.name(), table_id_);
}
bool TableInfo::RemoveTablet(const std::string& partition_key_start) {
std::lock_guard<simple_spinlock> l(lock_);
return EraseKeyReturnValuePtr(&tablet_map_, partition_key_start) != nullptr;
}
void TableInfo::AddTablet(TabletInfo *tablet) {
std::lock_guard<simple_spinlock> l(lock_);
AddTabletUnlocked(tablet);
}
void TableInfo::AddTablets(const vector<TabletInfo*>& tablets) {
std::lock_guard<simple_spinlock> l(lock_);
for (TabletInfo *tablet : tablets) {
AddTabletUnlocked(tablet);
}
}
void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablets_to_add,
const vector<scoped_refptr<TabletInfo>>& tablets_to_drop) {
std::lock_guard<simple_spinlock> l(lock_);
for (const auto& tablet : tablets_to_drop) {
const auto& lower_bound = tablet->metadata().state().pb.partition().partition_key_start();
CHECK(EraseKeyReturnValuePtr(&tablet_map_, lower_bound) != nullptr);
}
for (const auto& tablet : tablets_to_add) {
AddTabletUnlocked(tablet.get());
}
}
void TableInfo::AddTabletUnlocked(TabletInfo* tablet) {
TabletInfo* old = nullptr;
if (UpdateReturnCopy(&tablet_map_,
tablet->metadata().state().pb.partition().partition_key_start(),
tablet, &old)) {
VLOG(1) << "Replaced tablet " << old->tablet_id() << " with " << tablet->tablet_id();
// TODO: can we assert that the replaced tablet is not in Running state?
// May be a little tricky since we don't know whether to look at its committed or
// uncommitted state.
}
}
void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req,
vector<scoped_refptr<TabletInfo> > *ret) const {
std::lock_guard<simple_spinlock> l(lock_);
int max_returned_locations = req->max_returned_locations();
TableInfo::TabletInfoMap::const_iterator it, it_end;
if (req->has_partition_key_start()) {
it = tablet_map_.upper_bound(req->partition_key_start());
if (it != tablet_map_.begin()) {
--it;
}
} else {
it = tablet_map_.begin();
}
if (req->has_partition_key_end()) {
it_end = tablet_map_.upper_bound(req->partition_key_end());
} else {
it_end = tablet_map_.end();
}
int count = 0;
for (; it != it_end && count < max_returned_locations; ++it) {
ret->push_back(make_scoped_refptr(it->second));
count++;
}
}
bool TableInfo::IsAlterInProgress(uint32_t version) const {
std::lock_guard<simple_spinlock> l(lock_);
for (const TableInfo::TabletInfoMap::value_type& e : tablet_map_) {
if (e.second->reported_schema_version() < version) {
VLOG(3) << "Table " << table_id_ << " ALTER in progress due to tablet "
<< e.second->ToString() << " because reported schema "
<< e.second->reported_schema_version() << " < expected " << version;
return true;
}
}
return false;
}
bool TableInfo::IsCreateInProgress() const {
std::lock_guard<simple_spinlock> l(lock_);
for (const TableInfo::TabletInfoMap::value_type& e : tablet_map_) {
TabletMetadataLock tablet_lock(e.second, TabletMetadataLock::READ);
if (!tablet_lock.data().is_running()) {
return true;
}
}
return false;
}
void TableInfo::AddTask(MonitoredTask* task) {
task->AddRef();
{
std::lock_guard<simple_spinlock> l(lock_);
pending_tasks_.insert(task);
}
}
void TableInfo::RemoveTask(MonitoredTask* task) {
{
std::lock_guard<simple_spinlock> l(lock_);
pending_tasks_.erase(task);
}
// Done outside the lock so that if Release() drops the last ref to this
// TableInfo, RemoveTask() won't unlock a freed lock.
task->Release();
}
void TableInfo::AbortTasks() {
std::lock_guard<simple_spinlock> l(lock_);
for (MonitoredTask* task : pending_tasks_) {
task->Abort();
}
}
void TableInfo::WaitTasksCompletion() {
int wait_time = 5;
while (1) {
{
std::lock_guard<simple_spinlock> l(lock_);
if (pending_tasks_.empty()) {
break;
}
}
base::SleepForMilliseconds(wait_time);
wait_time = std::min(wait_time * 5 / 4, 10000);
}
}
void TableInfo::GetTaskList(std::vector<scoped_refptr<MonitoredTask> > *ret) {
std::lock_guard<simple_spinlock> l(lock_);
for (MonitoredTask* task : pending_tasks_) {
ret->push_back(make_scoped_refptr(task));
}
}
void TableInfo::GetAllTablets(vector<scoped_refptr<TabletInfo> > *ret) const {
ret->clear();
std::lock_guard<simple_spinlock> l(lock_);
for (const auto& e : tablet_map_) {
ret->push_back(make_scoped_refptr(e.second));
}
}
void PersistentTableInfo::set_state(SysTablesEntryPB::State state, const string& msg) {
pb.set_state(state);
pb.set_state_msg(msg);
}
} // namespace master
} // namespace kudu