// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tools/ksck.h"
#include <algorithm>
#include <boost/optional.hpp>
#include <glog/logging.h>
#include <iostream>
#include <map>
#include <mutex>
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/tools/color.h"
#include "kudu/util/atomic.h"
#include "kudu/util/blocking_queue.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/threadpool.h"
DEFINE_int32(checksum_timeout_sec, 3600,
"Maximum total seconds to wait for a checksum scan to complete "
"before timing out.");
DEFINE_int32(checksum_scan_concurrency, 4,
"Number of concurrent checksum scans to execute per tablet server.");
DEFINE_bool(checksum_snapshot, true, "Should the checksum scanner use a snapshot scan");
"timestamp to use for snapshot checksum scans, defaults to 0, which "
"uses the current timestamp of a tablet server involved in the scan");
DEFINE_int32(fetch_replica_info_concurrency, 20,
"Number of concurrent tablet servers to fetch replica info from.");
namespace kudu {
namespace tools {
using std::cout;
using std::endl;
using std::ostream;
using std::shared_ptr;
using std::string;
using std::unordered_map;
using strings::Substitute;
// The stream to write output to. If this is NULL, defaults to cout.
// This is used by tests to capture output.
ostream* g_err_stream = NULL;
// Print an informational message to cout.
static ostream& Out() {
return (g_err_stream ? *g_err_stream : cout);
// Print a warning message to cout.
static ostream& Warn() {
return Out() << Color(AnsiCode::YELLOW, "WARNING: ");
// Print an error message to cout.
static ostream& Error() {
return Out() << Color(AnsiCode::RED, "WARNING: ");
namespace {
// Return true if 'str' matches any of the patterns in 'patterns', or if
// 'patterns' is empty.
bool MatchesAnyPattern(const vector<string>& patterns, const string& str) {
// Consider no filter a wildcard.
if (patterns.empty()) return true;
for (const auto& p : patterns) {
if (MatchPattern(str, p)) return true;
return false;
} // anonymous namespace
: timeout(MonoDelta::FromSeconds(FLAGS_checksum_timeout_sec)),
snapshot_timestamp(FLAGS_checksum_snapshot_timestamp) {
ChecksumOptions::ChecksumOptions(MonoDelta timeout, int scan_concurrency,
bool use_snapshot, uint64_t snapshot_timestamp)
: timeout(std::move(timeout)),
snapshot_timestamp(snapshot_timestamp) {}
const uint64_t ChecksumOptions::kCurrentTimestamp = 0;
tablet::TabletStatePB KsckTabletServer::ReplicaState(const std::string& tablet_id) const {
CHECK_EQ(state_, kFetched);
if (!ContainsKey(tablet_status_map_, tablet_id)) {
return tablet::UNKNOWN;
KsckCluster::~KsckCluster() {
Status KsckCluster::FetchTableAndTabletInfo() {
for (const shared_ptr<KsckTable>& table : tables()) {
return Status::OK();
// Gets the list of tablet servers from the Master.
Status KsckCluster::RetrieveTabletServers() {
return master_->RetrieveTabletServers(&tablet_servers_);
// Gets the list of tables from the Master.
Status KsckCluster::RetrieveTablesList() {
return master_->RetrieveTablesList(&tables_);
Status KsckCluster::RetrieveTabletsList(const shared_ptr<KsckTable>& table) {
return master_->RetrieveTabletsList(table);
Status Ksck::CheckMasterRunning() {
VLOG(1) << "Connecting to the Master";
Status s = cluster_->master()->Connect();
if (s.ok()) {
Out() << "Connected to the Master" << endl;
return s;
Status Ksck::FetchTableAndTabletInfo() {
return cluster_->FetchTableAndTabletInfo();
Status Ksck::FetchInfoFromTabletServers() {
VLOG(1) << "Getting the Tablet Servers list";
int servers_count = cluster_->tablet_servers().size();
VLOG(1) << Substitute("List of $0 Tablet Servers retrieved", servers_count);
if (servers_count == 0) {
return Status::NotFound("No tablet servers found");
gscoped_ptr<ThreadPool> pool;
AtomicInt<int32_t> bad_servers(0);
VLOG(1) << "Fetching info from all the Tablet Servers";
for (const KsckMaster::TSMap::value_type& entry : cluster_->tablet_servers()) {
CHECK_OK(pool->SubmitFunc([&]() {
Status s = ConnectToTabletServer(entry.second);
if (!s.ok()) {
if (bad_servers.Load() == 0) {
Out() << Substitute("Fetched info from all $0 Tablet Servers", servers_count) << endl;
return Status::OK();
} else {
Warn() << Substitute("Fetched info from $0 Tablet Servers, $1 weren't reachable",
servers_count - bad_servers.Load(), bad_servers.Load()) << endl;
return Status::NetworkError("Not all Tablet Servers are reachable");
Status Ksck::ConnectToTabletServer(const shared_ptr<KsckTabletServer>& ts) {
VLOG(1) << "Going to connect to Tablet Server: " << ts->uuid();
Status s = ts->FetchInfo();
if (s.ok()) {
VLOG(1) << "Connected to Tablet Server: " << ts->uuid();
} else {
Warn() << Substitute("Unable to connect to Tablet Server $0: $1",
ts->ToString(), s.ToString()) << endl;
return s;
Status Ksck::CheckTablesConsistency() {
int tables_checked = 0;
int bad_tables_count = 0;
for (const shared_ptr<KsckTable> &table : cluster_->tables()) {
if (!MatchesAnyPattern(table_filters_, table->name())) {
VLOG(1) << "Skipping table " << table->name();
if (!VerifyTable(table)) {
Out() << endl;
if (tables_checked == 0) {
Out() << "The cluster doesn't have any matching tables" << endl;
return Status::OK();
if (bad_tables_count == 0) {
Out() << Substitute("The metadata for $0 table(s) is HEALTHY", tables_checked) << endl;
return Status::OK();
} else {
Warn() << Substitute("$0 out of $1 table(s) are not in a healthy state",
bad_tables_count, tables_checked) << endl;
return Status::Corruption(Substitute("$0 table(s) are bad", bad_tables_count));
// Class to act as a collector of scan results.
// Provides thread-safe accessors to update and read a hash table of results.
class ChecksumResultReporter : public RefCountedThreadSafe<ChecksumResultReporter> {
typedef std::pair<Status, uint64_t> ResultPair;
typedef std::unordered_map<std::string, ResultPair> ReplicaResultMap;
typedef std::unordered_map<std::string, ReplicaResultMap> TabletResultMap;
// Initialize reporter with the number of replicas being queried.
explicit ChecksumResultReporter(int num_tablet_replicas)
: expected_count_(num_tablet_replicas),
disk_bytes_summed_(0) {
void ReportProgress(int64_t delta_rows, int64_t delta_bytes) {
// Write an entry to the result map indicating a response from the remote.
void ReportResult(const std::string& tablet_id,
const std::string& replica_uuid,
const Status& status,
uint64_t checksum) {
std::lock_guard<simple_spinlock> guard(lock_);
unordered_map<string, ResultPair>& replica_results =
LookupOrInsert(&checksums_, tablet_id, unordered_map<string, ResultPair>());
InsertOrDie(&replica_results, replica_uuid, ResultPair(status, checksum));
// Blocks until either the number of results plus errors reported equals
// num_tablet_replicas (from the constructor), or until the timeout expires,
// whichever comes first.
// Returns false if the timeout expired before all responses came in.
// Otherwise, returns true.
bool WaitFor(const MonoDelta& timeout) const {
MonoTime start = MonoTime::Now();
MonoTime deadline = start + timeout;
bool done = false;
while (!done) {
MonoTime now = MonoTime::Now();
int rem_ms = (deadline - now).ToMilliseconds();
if (rem_ms <= 0) return false;
done = responses_.WaitFor(MonoDelta::FromMilliseconds(std::min(rem_ms, 5000)));
string status = done ? "finished in " : "running for ";
int run_time_sec = (MonoTime::Now() - start).ToSeconds();
Out() << "Checksum " << status << run_time_sec << "s: "
<< responses_.count() << "/" << expected_count_ << " replicas remaining ("
<< HumanReadableNumBytes::ToString(disk_bytes_summed_.Load()) << " from disk, "
<< HumanReadableInt::ToString(rows_summed_.Load()) << " rows summed)"
<< endl;
return true;
// Returns true iff all replicas have reported in.
bool AllReported() const { return responses_.count() == 0; }
// Get reported results.
TabletResultMap checksums() const {
std::lock_guard<simple_spinlock> guard(lock_);
return checksums_;
friend class RefCountedThreadSafe<ChecksumResultReporter>;
~ChecksumResultReporter() {}
// Report either a success or error response.
void HandleResponse(const std::string& tablet_id, const std::string& replica_uuid,
const Status& status, uint64_t checksum);
const int expected_count_;
CountDownLatch responses_;
mutable simple_spinlock lock_; // Protects 'checksums_'.
// checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
TabletResultMap checksums_;
AtomicInt<int64_t> rows_summed_;
AtomicInt<int64_t> disk_bytes_summed_;
// Queue of tablet replicas for an individual tablet server.
typedef shared_ptr<BlockingQueue<std::pair<Schema, std::string> > > SharedTabletQueue;
// A set of callbacks which records the result of a tablet replica's checksum,
// and then checks if the tablet server has any more tablets to checksum. If so,
// a new async checksum scan is started.
class TabletServerChecksumCallbacks : public ChecksumProgressCallbacks {
scoped_refptr<ChecksumResultReporter> reporter,
shared_ptr<KsckTabletServer> tablet_server,
SharedTabletQueue queue,
std::string tablet_id,
ChecksumOptions options) :
tablet_id_(std::move(tablet_id)) {
void Progress(int64_t rows_summed, int64_t disk_bytes_summed) override {
reporter_->ReportProgress(rows_summed, disk_bytes_summed);
void Finished(const Status& status, uint64_t checksum) override {
reporter_->ReportResult(tablet_id_, tablet_server_->uuid(), status, checksum);
std::pair<Schema, std::string> table_tablet;
if (queue_->BlockingGet(&table_tablet)) {
const Schema& table_schema = table_tablet.first;
tablet_id_ = table_tablet.second;
tablet_server_->RunTabletChecksumScanAsync(tablet_id_, table_schema, options_, this);
} else {
delete this;
const scoped_refptr<ChecksumResultReporter> reporter_;
const shared_ptr<KsckTabletServer> tablet_server_;
const SharedTabletQueue queue_;
const ChecksumOptions options_;
std::string tablet_id_;
Status Ksck::ChecksumData(const ChecksumOptions& opts) {
// Copy options so that local modifications can be made and passed on.
ChecksumOptions options = opts;
typedef unordered_map<shared_ptr<KsckTablet>, shared_ptr<KsckTable>> TabletTableMap;
TabletTableMap tablet_table_map;
int num_tables = 0;
int num_tablets = 0;
int num_tablet_replicas = 0;
for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
VLOG(1) << "Table: " << table->name();
if (!MatchesAnyPattern(table_filters_, table->name())) continue;
num_tables += 1;
num_tablets += table->tablets().size();
for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
VLOG(1) << "Tablet: " << tablet->id();
if (!MatchesAnyPattern(tablet_id_filters_, tablet->id())) continue;
InsertOrDie(&tablet_table_map, tablet, table);
num_tablet_replicas += tablet->replicas().size();
if (num_tables == 0) {
string msg = "No table found.";
if (!table_filters_.empty()) {
msg += " Filter: table_filters=" + JoinStrings(table_filters_, ",");
return Status::NotFound(msg);
if (num_tablets > 0 && num_tablet_replicas == 0) {
// Warn if the table has tablets, but no replicas. The table may have no
// tablets if all range partitions have been dropped.
string msg = "No tablet replicas found.";
if (!table_filters_.empty() || !tablet_id_filters_.empty()) {
msg += " Filter: ";
if (!table_filters_.empty()) {
msg += "table_filters=" + JoinStrings(table_filters_, ",");
if (!tablet_id_filters_.empty()) {
msg += "tablet_id_filters=" + JoinStrings(tablet_id_filters_, ",");
return Status::NotFound(msg);
// Map of tablet servers to tablet queue.
typedef unordered_map<shared_ptr<KsckTabletServer>, SharedTabletQueue> TabletServerQueueMap;
TabletServerQueueMap tablet_server_queues;
scoped_refptr<ChecksumResultReporter> reporter(new ChecksumResultReporter(num_tablet_replicas));
// Create a queue of checksum callbacks grouped by the tablet server.
for (const TabletTableMap::value_type& entry : tablet_table_map) {
const shared_ptr<KsckTablet>& tablet = entry.first;
const shared_ptr<KsckTable>& table = entry.second;
for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) {
const shared_ptr<KsckTabletServer>& ts =
FindOrDie(cluster_->tablet_servers(), replica->ts_uuid());
const SharedTabletQueue& queue =
LookupOrInsertNewSharedPtr(&tablet_server_queues, ts, num_tablet_replicas);
CHECK_EQ(QUEUE_SUCCESS, queue->Put(make_pair(table->schema(), tablet->id())));
if (options.use_snapshot && options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp) {
// Set the snapshot timestamp to the current timestamp of the first healthy tablet server
// we can find.
for (const auto& ts : tablet_server_queues) {
if (ts.first->is_healthy()) {
options.snapshot_timestamp = ts.first->current_timestamp();
if (options.snapshot_timestamp == ChecksumOptions::kCurrentTimestamp) {
return Status::ServiceUnavailable(
"No tablet servers were available to fetch the current timestamp");
Out() << "Using snapshot timestamp: " << options.snapshot_timestamp << endl;
// Kick off checksum scans in parallel. For each tablet server, we start
// scan_concurrency scans. Each callback then initiates one additional
// scan when it returns if the queue for that TS is not empty.
for (const TabletServerQueueMap::value_type& entry : tablet_server_queues) {
const shared_ptr<KsckTabletServer>& tablet_server = entry.first;
const SharedTabletQueue& queue = entry.second;
queue->Shutdown(); // Ensures that BlockingGet() will not block.
for (int i = 0; i < options.scan_concurrency; i++) {
std::pair<Schema, std::string> table_tablet;
if (queue->BlockingGet(&table_tablet)) {
const Schema& table_schema = table_tablet.first;
const std::string& tablet_id = table_tablet.second;
auto* cbs = new TabletServerChecksumCallbacks(
reporter, tablet_server, queue, tablet_id, options);
// 'cbs' deletes itself when complete.
tablet_server->RunTabletChecksumScanAsync(tablet_id, table_schema, options, cbs);
bool timed_out = !reporter->WaitFor(options.timeout);
// Even if we timed out, print the checksum results that we did get.
ChecksumResultReporter::TabletResultMap checksums = reporter->checksums();
int num_errors = 0;
int num_mismatches = 0;
int num_results = 0;
for (const shared_ptr<KsckTable>& table : cluster_->tables()) {
bool printed_table_name = false;
for (const shared_ptr<KsckTablet>& tablet : table->tablets()) {
if (ContainsKey(checksums, tablet->id())) {
if (!printed_table_name) {
printed_table_name = true;
cout << "-----------------------" << endl;
cout << table->name() << endl;
cout << "-----------------------" << endl;
bool seen_first_replica = false;
uint64_t first_checksum = 0;
for (const ChecksumResultReporter::ReplicaResultMap::value_type& r :
FindOrDie(checksums, tablet->id())) {
const string& replica_uuid = r.first;
shared_ptr<KsckTabletServer> ts = FindOrDie(cluster_->tablet_servers(), replica_uuid);
const ChecksumResultReporter::ResultPair& result = r.second;
const Status& status = result.first;
uint64_t checksum = result.second;
string status_str = (status.ok()) ? Substitute("Checksum: $0", checksum)
: Substitute("Error: $0", status.ToString());
cout << Substitute("T $0 P $1 ($2): $3", tablet->id(), ts->uuid(), ts->address(),
status_str) << endl;
if (!status.ok()) {
} else if (!seen_first_replica) {
seen_first_replica = true;
first_checksum = checksum;
} else if (checksum != first_checksum) {
Error() << ">> Mismatch found in table " << table->name()
<< " tablet " << tablet->id() << endl;
if (printed_table_name) cout << endl;
if (timed_out) {
return Status::TimedOut(Substitute("Checksum scan did not complete within the timeout of $0: "
"Received results for $1 out of $2 expected replicas",
options.timeout.ToString(), num_results,
CHECK_EQ(num_results, num_tablet_replicas)
<< Substitute("Unexpected error: only got $0 out of $1 replica results",
num_results, num_tablet_replicas);
if (num_mismatches != 0) {
// TODO(KUDU-1020): remove the below note once safe time advancement is fully implemented.
return Status::Corruption(Substitute(
"$0 checksum mismatches were detected. "
"NOTE: if the table is actively being written to, this may generate spurious "
"checksum mismatches.", num_mismatches));
if (num_errors != 0) {
return Status::Aborted(Substitute("$0 errors were detected", num_errors));
return Status::OK();
bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
const auto all_tablets = table->tablets();
vector<shared_ptr<KsckTablet>> tablets;
std::copy_if(all_tablets.begin(), all_tablets.end(), std::back_inserter(tablets),
[&](const shared_ptr<KsckTablet>& t) {
return MatchesAnyPattern(tablet_id_filters_, t->id());
int table_num_replicas = table->num_replicas();
VLOG(1) << Substitute("Verifying $0 tablet(s) for table $1 configured with num_replicas = $2",
tablets.size(), table->name(), table_num_replicas);
map<CheckResult, int> result_counts;
for (const auto& tablet : tablets) {
auto tablet_result = VerifyTablet(tablet, table_num_replicas);
if (result_counts[CheckResult::OK] == tablets.size()) {
Out() << Substitute("Table $0 is $1 ($2 tablet(s) checked)",
Color(AnsiCode::GREEN, "HEALTHY"),
tablets.size()) << endl;
return true;
} else {
if (result_counts[CheckResult::UNAVAILABLE] > 0) {
Out() << Substitute("Table $0 has $1 $2 tablet(s)",
Color(AnsiCode::RED, "unavailable")) << endl;
if (result_counts[CheckResult::UNDER_REPLICATED] > 0) {
Out() << Substitute("Table $0 has $1 $2 tablet(s)",
Color(AnsiCode::YELLOW, "under-replicated")) << endl;
return false;
Ksck::CheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int table_num_replicas) {
const string tablet_str = Substitute("Tablet $0 of table '$1'",
tablet->id(), tablet->table()->name());
// Consolidate the state of each replica into a simple struct for easier analysis.
struct ReplicaState {
KsckTabletReplica* replica;
KsckTabletServer* ts = nullptr;
tablet::TabletStatePB state = tablet::UNKNOWN;
boost::optional<tablet::TabletStatusPB> status_pb;
vector<ReplicaState> replica_states;
for (const shared_ptr<KsckTabletReplica> replica : tablet->replicas()) {
auto* repl_state = &replica_states.back();
repl_state->replica = replica.get();
VLOG(1) << Substitute("A replica of tablet $0 is on live tablet server $1",
tablet->id(), replica->ts_uuid());
// Check for agreement on tablet assignment and state between the master
// and the tablet server.
auto ts = FindPtrOrNull(cluster_->tablet_servers(), replica->ts_uuid());
repl_state->ts = ts.get();
if (ts && ts->is_healthy()) {
repl_state->state = ts->ReplicaState(tablet->id());
if (ContainsKey(ts->tablet_status_map(), tablet->id())) {
repl_state->status_pb = ts->tablet_status_map().at(tablet->id());
// Summarize the states.
int leaders_count = 0;
int running_count = 0;
for (const auto& r : replica_states) {
if (r.replica->is_leader()) {
if (r.state == tablet::RUNNING) {
// Determine the overall health state of the tablet.
CheckResult result = CheckResult::OK;
int num_voters = replica_states.size();
int majority_size = consensus::MajoritySize(num_voters);
if (running_count < majority_size) {
Out() << Substitute("$0 is $1: $2 replica(s) not RUNNING",
Color(AnsiCode::RED, "unavailable"),
num_voters - running_count) << endl;
result = CheckResult::UNAVAILABLE;
} else if (running_count < num_voters) {
Out() << Substitute("$0 is $1: $2 replica(s) not RUNNING",
Color(AnsiCode::YELLOW, "under-replicated"),
num_voters - running_count) << endl;
result = CheckResult::UNDER_REPLICATED;
} else if (check_replica_count_ && num_voters < table_num_replicas) {
Out() << Substitute("$0 is $1: configuration has $2 replicas vs desired $3",
Color(AnsiCode::YELLOW, "under-replicated"),
table_num_replicas) << endl;
result = CheckResult::UNDER_REPLICATED;
} else if (leaders_count != 1) {
Out() << Substitute("$0 is $1: expected one LEADER replica",
tablet_str, Color(AnsiCode::RED, "unavailable")) << endl;
result = CheckResult::UNAVAILABLE;
// In the case that we found something wrong, dump info on all the replicas
// to make it easy to debug.
if (result != CheckResult::OK) {
for (const ReplicaState& r : replica_states) {
string ts_str = r.ts ? r.ts->ToString() : r.replica->ts_uuid();
const char* leader_str = r.replica->is_leader() ? " [LEADER]" : "";
Out() << " " << ts_str << ": ";
if (!r.ts || !r.ts->is_healthy()) {
Out() << Color(AnsiCode::YELLOW, "TS unavailable") << leader_str << endl;
if (r.state == tablet::RUNNING) {
Out() << Color(AnsiCode::GREEN, "RUNNING") << leader_str << endl;
if (r.status_pb == boost::none) {
Out() << Color(AnsiCode::YELLOW, "missing") << leader_str << endl;
Out() << Color(AnsiCode::YELLOW, "bad state") << leader_str << endl;
Out() << Substitute(
" State: $0\n"
" Data state: $1\n"
" Last status: $2\n",
Color(AnsiCode::BLUE, tablet::TabletStatePB_Name(r.state)),
Color(AnsiCode::BLUE, tablet::TabletDataState_Name(r.status_pb->tablet_data_state())),
Color(AnsiCode::BLUE, r.status_pb->last_status()));
Out() << endl;
return result;
} // namespace tools
} // namespace kudu