blob: b4d6dd11e4aa8e96d2a26fe801b1d5fb55bcd56d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <functional>
#include <optional>
#include <set>
#include <string>
#include <vector>
#include <gtest/gtest_prod.h>
#include "kudu/common/schema.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/util/status.h"
namespace kudu {
class Counter;
class FsManager;
class MetricRegistry;
class RowBlockRow;
namespace consensus {
class ConsensusMetadata;
class ConsensusMetadataManager;
namespace tablet {
class TabletMetadata;
namespace tserver {
class WriteRequestPB;
} // namespace tserver
namespace master {
class Master;
class SysCertAuthorityEntryPB;
class SysClusterIdEntryPB;
class SysTServerStateEntryPB;
class SysTablesEntryPB;
class SysTabletsEntryPB;
class SysTskEntryPB;
struct MasterOptions;
// The SysCatalogTable has two separate visitors because the tables
// data must be loaded into memory before the tablets data.
class TableVisitor {
virtual ~TableVisitor() = default;
virtual Status VisitTable(const std::string& table_id,
const SysTablesEntryPB& metadata) = 0;
class TabletVisitor {
virtual ~TabletVisitor() = default;
virtual Status VisitTablet(const std::string& table_id,
const std::string& tablet_id,
const SysTabletsEntryPB& metadata) = 0;
// Visitor for TSK-related (Token Signing Key) entries. Actually, only the
// public part of those are stored in the system catalog table. That information
// is preserved to allow any master to verify token which might be signed
// by current or former master leader.
class TskEntryVisitor {
virtual ~TskEntryVisitor() = default;
virtual Status Visit(const std::string& entry_id,
const SysTskEntryPB& metadata) = 0;
class TServerStateVisitor {
virtual ~TServerStateVisitor() = default;
virtual Status Visit(const std::string& tserver_id,
const SysTServerStateEntryPB& metadata) = 0;
class TableInfoLoader : public TableVisitor {
void Reset();
Status VisitTable(const std::string& table_id,
const SysTablesEntryPB& metadata) override;
std::vector<scoped_refptr<TableInfo>> tables;
class TabletInfoLoader : public TabletVisitor {
void Reset();
Status VisitTablet(const std::string& /*table_id*/,
const std::string& tablet_id,
const SysTabletsEntryPB& metadata) override;
std::vector<scoped_refptr<TabletInfo>> tablets;
// SysCatalogTable is a Kudu table that keeps track of the following
// system information:
// * cluster id
// * table metadata
// * tablet metadata
// * root CA (certificate authority) certificate of the Kudu IPKI
// * Kudu IPKI root CA cert's private key
// * TSK (Token Signing Key) entries
// * Latest handled Hive Metastore notification log event ID
// * tserver state (e.g. maintenance mode)
// The essential properties of the SysCatalogTable are:
// * SysCatalogTable has only one tablet.
// * SysCatalogTable is managed by the master and not exposed to the user
// as a "normal table", instead we have Master APIs to query the table.
// It has the schema:
// (entry_type INT8, entry_id STRING) -> metadata STRING
// * entry_type is one of CatalogEntryType. It indicates whether an entry is a
// table, tablet, or some other piece of metadata. It is the first part of
// the compound key to allow efficient scans of entries of only a single
// type (e.g. only scan all of the tables, or only scan all of the tablets).
// * entry_id is the ID of the entry type (e.g. table ID or tablet ID for
// table and tablet entries respectively).
// * metadata is a string containing a protobuf message specific to the
// entry_type. These are defined in master.proto under "Sys Tables
// Metadata".
class SysCatalogTable {
// Magic ID of the system tablet.
static const char* const kSysCatalogTabletId;
// Root certificate authority (CA) entry identifier in the system table.
// There should be no more than one entry of this type in the system table.
static const char* const kSysCertAuthorityEntryId;
// The row ID of the latest notification log entry in the sys catalog table.
static const char* const kLatestNotificationLogEntryIdRowId;
// The row ID of the cluster ID entry in the sys catalog table.
static const char* const kClusterIdRowId;
typedef std::function<Status()> ElectedLeaderCallback;
enum CatalogEntryType {
CERT_AUTHORITY_INFO = 3, // Kudu's root certificate authority entry.
TSK_ENTRY = 4, // Token Signing Key entry.
HMS_NOTIFICATION_LOG = 5, // HMS notification log latest event ID.
TSERVER_STATE = 6, // TServer state.
CLUSTER_ID = 7 // Unique Cluster ID.
enum SysCatalogOperation {
// 'leader_cb_' is invoked whenever this node is elected as a leader
// of the consensus configuration for this tablet, including for local standalone
// master consensus configurations. It used to initialize leader state, submit any
// leader-specific tasks and so forth.
/// NOTE: Since 'leader_cb_' is invoked synchronously and can block
// the consensus configuration's progress, any long running tasks (e.g., scanning
// tablets) should be performed asynchronously (by, e.g., submitting
// them to a to a separate threadpool).
SysCatalogTable(Master* master, ElectedLeaderCallback leader_cb);
// Allow for orderly shutdown of TabletReplica, etc.
void Shutdown();
// Load the Metadata from disk, and initialize the TabletReplica for the sys-table
Status Load(FsManager *fs_manager);
// Create the new Metadata and initialize the TabletReplica for the sys-table.
Status CreateNew(FsManager *fs_manager);
// Perform a series of table/tablet actions in one WriteOp.
struct Actions {
Actions() = default;
scoped_refptr<TableInfo> table_to_add;
scoped_refptr<TableInfo> table_to_update;
scoped_refptr<TableInfo> table_to_delete;
std::vector<scoped_refptr<TabletInfo>> tablets_to_add;
std::vector<scoped_refptr<TabletInfo>> tablets_to_update;
std::vector<scoped_refptr<TabletInfo>> tablets_to_delete;
std::optional<int64_t> hms_notification_log_event_id;
// The way how actions are persisted into the system catalog table when
// calling the Write() method below.
enum class WriteMode {
// Write all the actions in a single atomic update to the system
// catalog tablet.
// Chunk the actions into pieces of maximum size corresponding to the
// maximum RPC size limit. This is to allow the leader replica of the system
// tablet to propagate the update to followers via Raft UpdateConsensus RPC.
// Persist the specified actions into the system tablet. Set the 'mode' to
// WriteMode::CHUNKED to split requests larger than the maximum RPC size
// into chunks if non-atomic update is acceptable. In case of chunked mode,
// no atomicity is guaranteed while persisting the specified actions.
Status Write(Actions actions, WriteMode mode = WriteMode::ATOMIC);
// Scan of the table-related entries.
Status VisitTables(TableVisitor* visitor);
// Scan of the tablet-related entries.
Status VisitTablets(TabletVisitor* visitor);
// Scan for TSK-related entries in the system table.
Status VisitTskEntries(TskEntryVisitor* visitor);
// Scan for tserver state entries in the system table.
Status VisitTServerStates(TServerStateVisitor* visitor);
// Get the latest processed HMS notification log event ID.
Status GetLatestNotificationLogEventId(int64_t* event_id) WARN_UNUSED_RESULT;
// Get the cluster ID from the system table.
Status GetClusterIdEntry(SysClusterIdEntryPB* entry) WARN_UNUSED_RESULT;
// Retrive the CA entry (private key and certificate) from the system table.
Status GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry);
// Add cluster ID entry into the system table.
// There should be no more than one cluster ID in the system table.
Status AddClusterIdEntry(const SysClusterIdEntryPB& entry);
// Add root CA entry (private key and certificate) into the system table.
// There should be no more than one CA entry in the system table.
Status AddCertAuthorityEntry(const SysCertAuthorityEntryPB& entry);
// Add TSK (Token Signing Key) entry into the system table.
Status AddTskEntry(const SysTskEntryPB& entry);
// Remove TSK (Token Signing Key) entries with the specified entry identifiers
// (as in 'entry_id' column) from the system table. The container of the
// entry identifiers must not be empty.
Status RemoveTskEntries(const std::set<std::string>& entry_ids);
// Add a tserver state entry to the system table.
Status WriteTServerState(const std::string& tserver_id,
const SysTServerStateEntryPB& entry);
// Remove a tserver state entry from the system table.
Status RemoveTServerState(const std::string& tserver_id);
// Return the underlying TabletReplica instance hosting the metadata.
// This should be used with caution -- typically the various methods
// above should be used rather than directly accessing the replica.
const scoped_refptr<tablet::TabletReplica>& tablet_replica() const {
return tablet_replica_;
FRIEND_TEST(MasterTest, TestMasterMetadataConsistentDespiteFailures);
friend class CatalogManager;
const char *table_name() const { return "sys.catalog"; }
const char *table_id() const { return ""; }
// Return the schema of the table.
// NOTE: This is the "server-side" schema, so it must have the column IDs.
Schema BuildTableSchema();
// Returns 'Status::OK()' if the WriteOp completed
Status SyncWrite(const tserver::WriteRequestPB& req);
void SysCatalogStateChanged(const std::string& tablet_id, const std::string& reason);
Status SetupTablet(const scoped_refptr<tablet::TabletMetadata>& metadata);
// Use the master options to generate a new consensus configuration.
// In addition, resolve all UUIDs of this consensus configuration.
Status CreateDistributedConfig(const MasterOptions& options,
consensus::RaftConfigPB* committed_config);
std::string tablet_id() const {
return tablet_replica_->tablet_id();
// Conventional "T xxx P xxxx..." prefix for logging.
std::string LogPrefix() const;
// Waits for the tablet to reach 'RUNNING' state.
// Contrary to tablet servers, in master we actually wait for the master tablet
// to become online synchronously, this allows us to fail fast if something fails
// and shouldn't induce the all-workers-blocked-waiting-for-tablets problem
// that we've seen in tablet servers since the master only has to boot a few
// tablets.
Status WaitUntilRunning();
template<typename T>
Status GetEntryFromRow(const RowBlockRow& row,
std::string* entry_id, T* entry_data) const;
template<typename T, CatalogEntryType entry_type>
Status ProcessRows(std::function<Status(const std::string&, const T&)>) const;
// Tablet related private methods.
// Initializes the RaftPeerPB for the local peer.
// Crashes due to an invariant check if the rpc server is not running.
void InitLocalRaftPeerPB();
// Add an operation to a write adding/updating/deleting a table or tablet.
void ReqAddTable(tserver::WriteRequestPB* req,
const scoped_refptr<TableInfo>& table);
void ReqUpdateTable(tserver::WriteRequestPB* req,
const scoped_refptr<TableInfo>& table);
void ReqDeleteTable(tserver::WriteRequestPB* req,
const scoped_refptr<TableInfo>& table);
// These three methods below generate WriteRequestPB to persist the
// information on the tablet metadata updates in the system catalog. The size
// of the generated write request is limited by the 'max_size' parameter.
// The information on the tablets' metadata updates is provided with the
// 'tablets' parameter. The 'excess_tablets' output parameter is populated
// with elements which didn't fit into the result request due to the sizing
// limitations. The result request is returned via the 'req' output parameter.
// Note the best effort behavior of these methods: the resulting request
// cannot be empty, and the very first row might be over the limit already.
// However, that will be caught later by SyncWrite() before actually writing
// the generated data into the system tablet.
void ReqAddTablets(size_t max_size,
std::vector<scoped_refptr<TabletInfo>> tablets,
std::vector<scoped_refptr<TabletInfo>>* excess_tablets,
tserver::WriteRequestPB* req);
void ReqUpdateTablets(size_t max_size,
std::vector<scoped_refptr<TabletInfo>> tablets,
std::vector<scoped_refptr<TabletInfo>>* excess_tablets,
tserver::WriteRequestPB* req);
void ReqDeleteTablets(size_t max_size,
std::vector<scoped_refptr<TabletInfo>> tablets,
std::vector<scoped_refptr<TabletInfo>>* excess_tablets,
tserver::WriteRequestPB* req);
// Generator function used by the ChunkedWrite() method below. The three
// methods above (ReqAddTablets, ReqUpdateTablets, and ReqDeleteTablets)
// are the generators with corresponding signature.
typedef std::function<void(SysCatalogTable&,
tserver::WriteRequestPB*)> Generator;
// A method for chunked write into the system tablet.
Status ChunkedWrite(const Generator& generator,
size_t max_chunk_size,
std::vector<scoped_refptr<TabletInfo>> tablets_info,
tserver::WriteRequestPB* req);
// Overwrite (upsert) the latest event ID in the table with the provided ID.
void ReqSetNotificationLogEventId(tserver::WriteRequestPB* req, int64_t event_id);
static std::string TskSeqNumberToEntryId(int64_t seq_number);
static int64_t TskEntryIdToSeqNumber(const std::string& entry_id);
// For a single master config, verifies the on-disk Raft config and populates the
// 'last_known_addr' in the on-disk Raft config, if master address is specified.
// Pointer 'cmeta' must outlive the call to this function.
Status VerifyAndPopulateSingleMasterConfig(consensus::ConsensusMetadata* cmeta);
// Special string injected into SyncWrite() random failures (if enabled).
// Only useful for tests.
static const char* const kInjectedFailureStatusMsg;
// Table schema, without IDs, used to send messages to the TabletReplica
Schema schema_;
Schema key_schema_;
MetricRegistry* metric_registry_;
scoped_refptr<tablet::TabletReplica> tablet_replica_;
Master* master_;
const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
ElectedLeaderCallback leader_cb_;
consensus::RaftPeerPB local_peer_pb_;
scoped_refptr<Counter> oversized_write_requests_;
} // namespace master
} // namespace kudu