// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/tools/ksck.h"

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <functional>
#include <iomanip>
#include <iostream>
#include <iterator>
#include <map>
#include <mutex>
#include <set>
#include <type_traits>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/port.h>

#include "kudu/common/partition.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/color.h"
#include "kudu/tools/ksck_checksum.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/string_case.h"

#define PUSH_PREPEND_NOT_OK(s, statuses, msg) do { \
  ::kudu::Status _s = (s); \
  if (PREDICT_FALSE(!_s.ok())) { \
    (statuses).push_back(_s.CloneAndPrepend(msg)); \
  } \
} while (0)

#define STR_FLAGS_CATEGORY_TIME_SOURCE "time_source"
#define STR_FLAGS_CATEGORY_UNUSUAL "unusual"

DEFINE_bool(checksum_scan, false,
            "Perform a checksum scan on data in the cluster.");
DEFINE_int32(fetch_info_concurrency, 20,
             "Number of threads to fetch info concurrently.");
DEFINE_string(flags_categories_to_check, STR_FLAGS_CATEGORY_TIME_SOURCE,
              "Comma-separated list of flag categories to check for divergence "
              "across the cluster; default is "
              STR_FLAGS_CATEGORY_TIME_SOURCE "; available categories are "
              STR_FLAGS_CATEGORY_TIME_SOURCE ", "
              STR_FLAGS_CATEGORY_UNUSUAL ".");

DEFINE_string(ksck_format, "plain_concise",
              "Output format for ksck. Available options are 'plain_concise', "
              "'plain_full', 'json_pretty', and 'json_compact'.\n"
              "'plain_concise' format is plain text, omitting most information "
              "about healthy tablets.\n"
              "'plain_full' is plain text with all results included.\n"
              "'json_pretty' produces pretty-printed json.\n"
              "'json_compact' produces json suitable for parsing by other programs.\n"
              "'json_pretty' and 'json_compact' differ in format, not content.");
DEFINE_bool(consensus, true,
            "Whether to check the consensus state from each tablet against the master.");

using kudu::cluster_summary::HealthCheckResult;
using kudu::cluster_summary::ConsensusConfigType;
using kudu::cluster_summary::ConsensusState;
using kudu::cluster_summary::ConsensusStateMap;
using kudu::cluster_summary::ReplicaSummary;
using kudu::cluster_summary::ServerHealth;
using kudu::cluster_summary::ServerHealthSummary;
using kudu::cluster_summary::TableSummary;
using kudu::cluster_summary::TabletSummary;
using kudu::server::GetFlagsResponsePB;

using std::atomic;
using std::cout;
using std::nullopt;
using std::optional;
using std::ostream;
using std::ostringstream;
using std::pair;
using std::set;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;

namespace kudu {
namespace tools {

DEFINE_validator(flags_categories_to_check,
                 [](const char* /* flag_name */, const string& value) {
  vector<string> categories;
  SplitStringUsing(value, ",", &categories);
  for (const auto& cat : categories) {
    if (cat.empty() || StringToFlagsCategory(cat, nullptr).ok()) {
      continue;
    }
    LOG(ERROR) << Substitute("unknown flag category: '$0' "
                             "(expecting comma-separated list built out of "
                             STR_FLAGS_CATEGORY_TIME_SOURCE ", "
                             STR_FLAGS_CATEGORY_UNUSUAL ")", cat);
    return false;
  }
  return true;
});

namespace {
void BuildConsensusStateForConfigMember(const consensus::ConsensusStatePB& cstate,
                                        ConsensusState* ksck_cstate) {
  CHECK(ksck_cstate);
  ksck_cstate->term = cstate.current_term();
  ksck_cstate->type = cstate.has_pending_config() ?
                      ConsensusConfigType::PENDING :
                      ConsensusConfigType::COMMITTED;
  const auto& config = cstate.has_pending_config() ?
                       cstate.pending_config() :
                       cstate.committed_config();
  if (config.has_opid_index()) {
    ksck_cstate->opid_index = config.opid_index();
  }
  // Test for emptiness rather than mere presence, since Kudu nodes set
  // leader_uuid to "" explicitly when they do not know about a leader.
  if (!cstate.leader_uuid().empty()) {
    ksck_cstate->leader_uuid = cstate.leader_uuid();
  }
  const auto& peers = config.peers();
  for (const auto& pb : peers) {
    if (pb.member_type() == consensus::RaftPeerPB::NON_VOTER) {
      InsertOrDie(&ksck_cstate->non_voter_uuids, pb.permanent_uuid());
    } else {
      InsertOrDie(&ksck_cstate->voter_uuids, pb.permanent_uuid());
    }
  }
}

bool IsNotAuthorizedMethodAccess(const Status& s) {
  return s.IsRemoteError() &&
         s.ToString().find("Not authorized: unauthorized access to method") != string::npos;
}

// Return whether the format of the ksck results is non-JSON.
bool IsNonJSONFormat() {
  return iequals(FLAGS_ksck_format, "plain_full") ||
         iequals(FLAGS_ksck_format, "plain_concise");
}

} // anonymous namespace

tablet::TabletStatePB KsckTabletServer::ReplicaState(const std::string& tablet_id) const {
  CHECK_EQ(state_, KsckFetchState::FETCHED);
  if (!ContainsKey(tablet_status_map_, tablet_id)) {
    return tablet::UNKNOWN;
  }
  return tablet_status_map_.at(tablet_id).state();
}

std::ostream& operator<<(std::ostream& lhs, KsckFetchState state) {
  switch (state) {
    case KsckFetchState::UNINITIALIZED:
      lhs << "UNINITIALIZED";
      break;
    case KsckFetchState::FETCH_FAILED:
      lhs << "FETCH_FAILED";
      break;
    case KsckFetchState::FETCHED:
      lhs << "FETCHED";
      break;
    default:
      LOG(FATAL) << "unknown KsckFetchState";
  }
  return lhs;
}

const FlagsFetchFilter& GetFlagsCategoryFilter(FlagsCategory category) {
  // NOTE: using double braces for std::array aggregate initialization.
  static const std::array<FlagsFetchFilter, FlagsCategory::MAX + 1> kFilters { {
    {
      // FlagsCategory::TIME_SOURCE
      { "time_source", "builtin_ntp_servers", },
      {}
    },
    {
      // FlagsCategory::UNUSUAL
      {},
      { "experimental", "hidden", "unsafe" }
    },
  } };
  DCHECK_GE(category, FlagsCategory::MIN);
  DCHECK_LE(category, FlagsCategory::MAX);
  return kFilters[category];
}

const char* FlagsCategoryToString(FlagsCategory category) {
  static constexpr const char* const kCategoryTimeSource =
      STR_FLAGS_CATEGORY_TIME_SOURCE;
  static constexpr const char* const kCategoryUnusual =
      STR_FLAGS_CATEGORY_UNUSUAL;
  switch (category) {
    case FlagsCategory::TIME_SOURCE:
      return kCategoryTimeSource;
    case FlagsCategory::UNUSUAL:
      return kCategoryUnusual;
  }
  return "unknown";
}

Status StringToFlagsCategory(const string& str, FlagsCategory* category) {
  if (iequals(str, STR_FLAGS_CATEGORY_TIME_SOURCE)) {
    if (category) {
      *category = FlagsCategory::TIME_SOURCE;
    }
    return Status::OK();
  }
  if (iequals(str, STR_FLAGS_CATEGORY_UNUSUAL)) {
    if (category) {
      *category = FlagsCategory::UNUSUAL;
    }
    return Status::OK();
  }
  return Status::InvalidArgument(Substitute("$0: unknown flags category", str));
}

Status StringToFlagsCategories(const std::string& str,
                               vector<FlagsCategory>* categories) {
  DCHECK(categories);
  vector<string> categories_str(strings::Split(str, ",", strings::SkipEmpty()));
  for (const auto& str : categories_str) {
    FlagsCategory cat;
    RETURN_NOT_OK(StringToFlagsCategory(str, &cat));
    categories->push_back(cat);
  }
  std::sort(categories->begin(), categories->end());
  categories->erase(std::unique(categories->begin(), categories->end()),
                    categories->end());
  return Status::OK();
}

Ksck::Ksck(shared_ptr<KsckCluster> cluster, ostream* out)
    : cluster_(std::move(cluster)),
      out_(out == nullptr ? &std::cout : out) {
  CHECK_OK(ThreadPoolBuilder("ksck-fetch")
               .set_max_threads(FLAGS_fetch_info_concurrency)
               .set_idle_timeout(MonoDelta::FromMilliseconds(10))
               .Build(&pool_));
}

Status Ksck::CheckMasterHealth() {
  const auto num_masters = cluster_->masters().size();
  if (num_masters == 0) {
    return Status::NotFound("no masters found");
  }

  atomic<size_t> bad_masters(0);
  atomic<size_t> unauthorized_masters(0);

  vector<ServerHealthSummary> master_summaries;
  simple_spinlock master_summaries_lock;

  vector<FlagsCategory> flags_categories_to_fetch = { FlagsCategory::UNUSUAL };
  RETURN_NOT_OK(StringToFlagsCategories(FLAGS_flags_categories_to_check,
                                        &flags_categories_to_fetch));
  for (const auto& master : cluster_->masters()) {
    RETURN_NOT_OK(pool_->Submit([&]() {
      ServerHealthSummary sh;
      Status s = master->FetchInfo().AndThen([&]() {
        return master->FetchConsensusState();
      });
      sh.uuid = master->uuid();
      sh.address = master->address();
      sh.version = master->version();
      sh.status = s;
      if (!s.ok()) {
        if (IsNotAuthorizedMethodAccess(s)) {
          sh.health = ServerHealth::UNAUTHORIZED;
          ++unauthorized_masters;
        } else {
          sh.health = ServerHealth::UNAVAILABLE;
        }
        ++bad_masters;
      }

      {
        std::lock_guard<simple_spinlock> lock(master_summaries_lock);
        master_summaries.emplace_back(std::move(sh));
      }

      // Fetch the flags information in every requested category.
      // Flag retrieval is not supported by older versions; failure is tracked
      // in CheckTabletServer{Unusual,Diverged}Flags().
      ignore_result(master->FetchFlags(flags_categories_to_fetch));
    }));
  }
  pool_->Wait();

  results_.cluster_status.master_summaries.swap(master_summaries);

  // Return a NotAuthorized status if any master has auth errors, since this
  // indicates ksck may not be able to gather full and accurate info.
  if (unauthorized_masters > 0) {
    return Status::NotAuthorized(
        Substitute("failed to gather info from $0 of $1 "
                   "masters due to lack of admin privileges",
                   unauthorized_masters.load(), num_masters));
  }
  if (bad_masters > 0) {
    return Status::NetworkError(
        Substitute("failed to gather info from all masters: $0 of $1 had errors",
                   bad_masters.load(), num_masters));
  }
  return Status::OK();
}

Status Ksck::CheckMasterConsensus() {
  // Reset this instance's view of master consensus conflict, in case this
  // instance is being used to repeatedly check for master consensus conflict.
  results_.cluster_status.master_consensus_conflict = false;
  if (!FLAGS_consensus) {
    return Status::OK();
  }
  ConsensusStateMap master_cstates;
  for (const KsckCluster::MasterList::value_type& master : cluster_->masters()) {
    if (master->cstate()) {
      ConsensusState ksck_cstate;
      BuildConsensusStateForConfigMember(*master->cstate(), &ksck_cstate);
      InsertOrDie(&master_cstates, master->uuid(), ksck_cstate);
    } else {
      results_.cluster_status.master_consensus_conflict = true;
    }
  }
  if (master_cstates.empty()) {
    return Status::NotFound("no master consensus state available");
  }
  // There's no "reference" cstate for masters, so pick an arbitrary master
  // cstate to compare with.
  const ConsensusState& base = master_cstates.begin()->second;
  for (const auto& entry : master_cstates) {
    if (!base.Matches(entry.second)) {
      results_.cluster_status.master_consensus_conflict = true;
      break;
    }
  }
  results_.cluster_status.master_consensus_state_map.swap(master_cstates);
  vector<string> uuids;
  std::transform(cluster_->masters().begin(), cluster_->masters().end(),
                 std::back_inserter(uuids),
                 [](const shared_ptr<KsckMaster>& master) { return master->uuid(); });
  results_.cluster_status.master_uuids.swap(uuids);

  if (results_.cluster_status.master_consensus_conflict) {
    return Status::Corruption("there are master consensus conflicts");
  }
  return Status::OK();
}

void Ksck::AddFlagsToFlagMaps(const GetFlagsResponsePB& flags,
                              const string& server_address,
                              KsckFlagToServersMap* flags_to_servers_map,
                              KsckFlagTagsMap* flag_tags_map) {
  DCHECK(flags_to_servers_map);
  for (const auto& f : flags.flags()) {
    const pair<string, string> key(f.name(), f.value());
    if (!InsertIfNotPresent(flags_to_servers_map, key, { server_address })) {
      FindOrDieNoPrint(*flags_to_servers_map, key).push_back(server_address);
    }
    if (flag_tags_map != nullptr) {
      InsertIfNotPresent(flag_tags_map, f.name(), JoinStrings(f.tags(), ","));
    }
  }
}

Status Ksck::CheckMasterUnusualFlags() {
  size_t bad_servers = 0;
  for (const auto& master : cluster_->masters()) {
    const auto& unusual_flags = master->flags(FlagsCategory::UNUSUAL);
    if (!unusual_flags) {
      ++bad_servers;
      continue;
    }
    AddFlagsToFlagMaps(*unusual_flags,
                       master->address(),
                       &results_.master_unusual_flag_to_servers_map,
                       &results_.master_unusual_flag_tags_map);
  }

  if (!results_.master_unusual_flag_to_servers_map.empty()) {
    results_.warning_messages.push_back(Status::ConfigurationError(
        "Some masters have unsafe, experimental, or hidden flags set"));
  }

  if (bad_servers > 0) {
    return Status::Incomplete(Substitute(
        "$0 of $1 masters were not available to retrieve unusual flags",
        bad_servers, cluster_->masters().size()));
  }
  return Status::OK();
}

Status Ksck::CheckMasterDivergedFlags() {
  vector<FlagsCategory> flags_categories;
  RETURN_NOT_OK(StringToFlagsCategories(FLAGS_flags_categories_to_check,
                                        &flags_categories));
  for (const auto cat : flags_categories) {
    KsckFlagToServersMap servers_by_flag;
    size_t bad_servers = 0;
    for (const auto& master : cluster_->masters()) {
      const auto& flags = master->flags(cat);
      if (!flags) {
        ++bad_servers;
        continue;
      }
      AddFlagsToFlagMaps(*flags, master->address(), &servers_by_flag);
      AddFlagsToFlagMaps(*flags,
                         master->address(),
                         &results_.master_checked_flag_to_servers_map);
    }

    for (const auto& e : servers_by_flag) {
      if (e.second.size() + bad_servers == cluster_->masters().size()) {
        continue;
      }
      results_.warning_messages.push_back(Status::ConfigurationError(
          Substitute("Different masters have different settings for same "
                     "flags of checked category '$0'",
                     FlagsCategoryToString(cat))));
      break;
    }
    if (bad_servers > 0) {
      return Status::Incomplete(Substitute(
          "$0 of $1 masters were not available to retrieve $2 category flags",
          bad_servers, cluster_->masters().size(), FlagsCategoryToString(cat)));
    }
  }
  return Status::OK();
}

Status Ksck::CheckClusterRunning() {
  VLOG(1) << "Connecting to the leader master";
  return cluster_->Connect();
}

Status Ksck::FetchTableAndTabletInfo() {
  return cluster_->FetchTableAndTabletInfo();
}

Status Ksck::FetchInfoFromTabletServers() {
  const auto servers_count = cluster_->tablet_servers().size();
  VLOG(1) << Substitute("List of $0 tablet servers retrieved", servers_count);

  if (servers_count == 0) {
    return Status::OK();
  }

  atomic<size_t> bad_servers(0);
  atomic<size_t> unauthorized_servers(0);
  VLOG(1) << "Fetching info from all " << servers_count << " tablet servers";

  vector<ServerHealthSummary> tablet_server_summaries;
  simple_spinlock tablet_server_summaries_lock;

  vector<FlagsCategory> flags_categories_to_fetch = { FlagsCategory::UNUSUAL };
  RETURN_NOT_OK(StringToFlagsCategories(FLAGS_flags_categories_to_check,
                                        &flags_categories_to_fetch));
  for (const auto& entry : cluster_->tablet_servers()) {
    const auto& ts = entry.second;
    RETURN_NOT_OK(pool_->Submit([&]() {
      VLOG(1) << "Going to connect to tablet server: " << ts->uuid();
      ServerHealth health;
      Status s = ts->FetchInfo(&health).AndThen([&ts, &health]() {
          if (FLAGS_consensus) {
            return ts->FetchConsensusState(&health);
          }
          return Status::OK();
      });
      ServerHealthSummary summary;
      summary.uuid = ts->uuid();
      summary.address = ts->address();
      summary.ts_location = ts->location();
      summary.version = ts->version();
      summary.quiescing_info = ts->quiescing_info();
      summary.status = s;
      if (!s.ok()) {
        if (IsNotAuthorizedMethodAccess(s)) {
          health = ServerHealth::UNAUTHORIZED;
          ++unauthorized_servers;
        }
        ++bad_servers;
      }
      summary.health = health;

      {
        std::lock_guard<simple_spinlock> lock(tablet_server_summaries_lock);
        tablet_server_summaries.push_back(std::move(summary));
      }

      // Fetch the flags information in every requested category.
      // Flag retrieval is not supported by older versions; failure is tracked
      // in CheckTabletServer{Unusual,Diverged}Flags().
      ignore_result(ts->FetchFlags(flags_categories_to_fetch));
    }));
  }
  pool_->Wait();

  results_.cluster_status.tserver_summaries.swap(tablet_server_summaries);

  // Return a NotAuthorized status if any tablet server has auth errors, since
  // this indicates ksck may not be able to gather full and accurate info.
  if (unauthorized_servers > 0) {
    return Status::NotAuthorized(
        Substitute("failed to gather info from $0 of $1 tablet servers due "
                   "to lack of admin privileges",
                   unauthorized_servers.load(), servers_count));
  }
  if (bad_servers > 0) {
    return Status::NetworkError(
        Substitute("failed to gather info for all tablet servers: $0 of $1 had errors",
                   bad_servers.load(), servers_count));
  }
  return Status::OK();
}

void Ksck::set_print_sections(const std::vector<std::string>& sections) {
  print_sections_flags_ = PrintSections::NONE;
  for (const auto& section : sections) {
    std::string section_upper;
    ToUpperCase(section, &section_upper);
    if (section_upper == "*") {
      print_sections_flags_ = PrintSections::ALL_SECTIONS;
      break;
    }
    if (section_upper == "MASTER_SUMMARIES") {
      print_sections_flags_ |= PrintSections::MASTER_SUMMARIES;
    }
    if (section_upper == "TSERVER_STATES") {
      print_sections_flags_ |= PrintSections::TSERVER_STATES;
    }
    if (section_upper == "TSERVER_SUMMARIES") {
      print_sections_flags_ |= PrintSections::TSERVER_SUMMARIES;
    }
    if (section_upper == "VERSION_SUMMARIES") {
      print_sections_flags_ |= PrintSections::VERSION_SUMMARIES;
    }
    if (section_upper == "TABLET_SUMMARIES") {
      print_sections_flags_ |= PrintSections::TABLET_SUMMARIES;
    }
    if (section_upper == "TABLE_SUMMARIES") {
      print_sections_flags_ |= PrintSections::TABLE_SUMMARIES;
    }
    if (section_upper == "SYSTEM_TABLE_SUMMARIES") {
      print_sections_flags_ |= PrintSections::SYSTEM_TABLE_SUMMARIES;
    }
    if (section_upper == "CHECKSUM_RESULTS") {
      print_sections_flags_ |= PrintSections::CHECKSUM_RESULTS;
    }
    if (section_upper == "TOTAL_COUNT") {
      print_sections_flags_ |= PrintSections::TOTAL_COUNT;
    }
  }
}

Status Ksck::Run() {
  PUSH_PREPEND_NOT_OK(CheckMasterHealth(), results_.error_messages,
                      "error fetching info from masters");
  PUSH_PREPEND_NOT_OK(CheckMasterConsensus(), results_.error_messages,
                      "master consensus error");
  PUSH_PREPEND_NOT_OK(CheckMasterUnusualFlags(), results_.warning_messages,
                      "master unusual flags check error");
  PUSH_PREPEND_NOT_OK(CheckMasterDivergedFlags(), results_.warning_messages,
                      "master diverged flags check error");

  // CheckClusterRunning and FetchTableAndTabletInfo must succeed for
  // subsequent checks to be runnable.
  const char* const liveness_prefix = "leader master liveness check error";
  Status s = CheckClusterRunning();
  PUSH_PREPEND_NOT_OK(s, results_.error_messages, liveness_prefix);
  RETURN_NOT_OK_PREPEND(s, liveness_prefix);
  const char* const fetch_prefix = "error fetching the cluster metadata "
                                   "from the leader master";
  s = FetchTableAndTabletInfo();
  PUSH_PREPEND_NOT_OK(s, results_.error_messages, fetch_prefix);
  RETURN_NOT_OK_PREPEND(s, fetch_prefix);

  // In getting table and tablet info, we should have also received info about
  // the tablet servers, including any special states they might be in.
  results_.ts_states = cluster_->ts_states();

  PUSH_PREPEND_NOT_OK(FetchInfoFromTabletServers(), results_.error_messages,
                      "error fetching info from tablet servers");
  PUSH_PREPEND_NOT_OK(CheckTabletServerUnusualFlags(), results_.warning_messages,
                      "tserver unusual flags check error");
  PUSH_PREPEND_NOT_OK(CheckTabletServerDivergedFlags(), results_.warning_messages,
                      "tserver diverged flags check error");
  PUSH_PREPEND_NOT_OK(CheckServerVersions(), results_.warning_messages,
                      "version check error");

  PUSH_PREPEND_NOT_OK(CheckDivergedFlags(), results_.warning_messages,
                      "diverged flags (both masters and tservers) check error");

  PUSH_PREPEND_NOT_OK(CheckTablesConsistency(), results_.error_messages,
                      "table consistency check error");

  if (FLAGS_checksum_scan) {
    // Copy the filters because they are passed by-value.
    auto table_filters_for_checksum_opts = cluster_->table_filters();
    auto tablet_id_filters_for_checksum_opts = cluster_->tablet_id_filters();
    PUSH_PREPEND_NOT_OK(
        ChecksumData(KsckChecksumOptions(std::move(table_filters_for_checksum_opts),
                                         std::move(tablet_id_filters_for_checksum_opts))),
        results_.error_messages, "checksum scan error");
  }

  // Use a special-case error if there are auth errors. This makes it harder
  // for admins to miss that ksck isn't working right because they forgot to
  // (e.g.) sudo -u kudu when running ksck!
  if (std::any_of(std::begin(results_.error_messages),
                  std::end(results_.error_messages),
                  [](const Status& s) { return s.IsNotAuthorized(); })) {
    return Status::NotAuthorized("re-run ksck with administrator privileges");
  }
  if (!results_.error_messages.empty()) {
    return Status::RuntimeError("ksck discovered errors");
  }
  return Status::OK();
}

Status Ksck::CheckTabletServerUnusualFlags() {
  int bad_servers = 0;
  for (const auto& uuid_and_ts : cluster_->tablet_servers()) {
    const auto& tserver = uuid_and_ts.second;
    const auto& unusual_flags = tserver->flags(FlagsCategory::UNUSUAL);
    if (!unusual_flags) {
      ++bad_servers;
      continue;
    }
    AddFlagsToFlagMaps(*unusual_flags,
                       tserver->address(),
                       &results_.tserver_unusual_flag_to_servers_map,
                       &results_.tserver_unusual_flag_tags_map);
  }

  if (!results_.tserver_unusual_flag_to_servers_map.empty()) {
    results_.warning_messages.push_back(Status::ConfigurationError(
        "Some tablet servers have unsafe, experimental, or hidden flags set"));
  }

  if (bad_servers > 0) {
    return Status::Incomplete(Substitute(
        "$0 of $1 tservers were not available to retrieve unusual flags",
        bad_servers, cluster_->tablet_servers().size()));
  }
  return Status::OK();
}

Status Ksck::CheckTabletServerDivergedFlags() {
  vector<FlagsCategory> flags_categories;
  RETURN_NOT_OK(StringToFlagsCategories(FLAGS_flags_categories_to_check,
                                        &flags_categories));
  for (const auto cat : flags_categories) {
    KsckFlagToServersMap servers_by_flag;
    size_t bad_servers = 0;
    for (const auto& uuid_and_ts : cluster_->tablet_servers()) {
      const auto& tserver = uuid_and_ts.second;
      const auto& flags = tserver->flags(cat);
      if (!flags) {
        ++bad_servers;
        continue;
      }
      AddFlagsToFlagMaps(*flags, tserver->address(), &servers_by_flag);
      AddFlagsToFlagMaps(*flags,
                         tserver->address(),
                         &results_.tserver_checked_flag_to_servers_map);
    }
    for (const auto& e : servers_by_flag) {
      if (e.second.size() + bad_servers == cluster_->tablet_servers().size()) {
        continue;
      }
      results_.warning_messages.push_back(Status::ConfigurationError(
          Substitute("Different tablet servers have different settings "
                     "for same flags of checked category '$0'",
                     FlagsCategoryToString(cat))));
      break;
    }
    if (bad_servers > 0) {
      return Status::Incomplete(Substitute(
          "$0 of $1 tservers were not available to retrieve $2 category flags",
          bad_servers,
          cluster_->tablet_servers().size(),
          FlagsCategoryToString(cat)));
    }
  }
  return Status::OK();
}

Status Ksck::CheckDivergedFlags() {
  set<KsckFlag> masters_flags;
  for (const auto& elem : results_.master_checked_flag_to_servers_map) {
    InsertOrDieNoPrint(&masters_flags, elem.first);
  }
  set<KsckFlag> tservers_flags;
  for (const auto& elem : results_.tserver_checked_flag_to_servers_map) {
    InsertOrDieNoPrint(&tservers_flags, elem.first);
  }

  vector<KsckFlag> symm_diff;
  std::set_symmetric_difference(masters_flags.begin(),
                                masters_flags.end(),
                                tservers_flags.begin(),
                                tservers_flags.end(),
                                back_inserter(symm_diff));
  if (!symm_diff.empty()) {
    for (const auto& f : symm_diff) {
      {
        const auto* e = FindOrNull(results_.master_checked_flag_to_servers_map, f);
        if (e) {
          InsertOrDieNoPrint(&results_.master_diverged_flag_to_servers_map, f, *e);
          continue;
        }
      }
      {
        const auto* e = FindOrNull(results_.tserver_checked_flag_to_servers_map, f);
        if (e) {
          InsertOrDieNoPrint(&results_.tserver_diverged_flag_to_servers_map, f, *e);
          continue;
        }
      }
      // The flag/value pair must be either of masters' or tservers'.
      LOG(DFATAL) << "found neither masters' or tservers' flag: " << f.first;
    }

    results_.warning_messages.push_back(Status::ConfigurationError(
        "Same flags have different values between masters and tablet servers "
        "for at least one checked flag category"));
  }
  return Status::OK();
}

Status Ksck::CheckServerVersions() {
  results_.version_summaries.clear();
  for (const auto& s : results_.cluster_status.master_summaries) {
    if (!s.version) continue;
    const auto& server = Substitute("master@$0", s.address);
    auto& servers = LookupOrInsert(&results_.version_summaries, *s.version, {});
    servers.push_back(server);
  }
  for (const auto& s : results_.cluster_status.tserver_summaries) {
    if (!s.version) continue;
    const auto& server = Substitute("tserver@$0", s.address);
    auto& servers = LookupOrInsert(&results_.version_summaries, *s.version, {});
    servers.push_back(server);
  }
  if (results_.version_summaries.size() > 1) {
    // This status seemed to fit best even though a version mismatch isn't an
    // error. In any case, ksck only prints the message for warnings.
    return Status::ConfigurationError(
        Substitute("not all servers are running the same version: "
                   "$0 different versions were seen",
                   results_.version_summaries.size()));
  }
  return Status::OK();
}

Status Ksck::PrintResults() {
  PrintMode mode;
  if (FLAGS_ksck_format == "plain_concise") {
    mode = PrintMode::PLAIN_CONCISE;
  } else if (FLAGS_ksck_format == "plain_full") {
    mode = PrintMode::PLAIN_FULL;
  } else if (FLAGS_ksck_format == "json_pretty") {
    mode = PrintMode::JSON_PRETTY;
  } else if (FLAGS_ksck_format == "json_compact") {
    mode = PrintMode::JSON_COMPACT;
  } else {
    return Status::InvalidArgument("unknown ksck format (--ksck_format)",
                                   FLAGS_ksck_format);
  }
  return results_.PrintTo(mode, print_sections_flags_, *out_);
}

Status Ksck::RunAndPrintResults() {
  Status s = Run();
  RETURN_NOT_OK_PREPEND(PrintResults(), "error printing results");
  return s;
}

Status Ksck::CheckTablesConsistency() {
  int bad_tables_count = 0;
  auto& cluster_status = results_.cluster_status;
  if (cluster_->txn_sys_table()) {
    if (!VerifyTable(cluster_->txn_sys_table(), &cluster_status.system_table_summaries)) {
      bad_tables_count++;
    }
  }
  for (const shared_ptr<KsckTable> &table : cluster_->tables()) {
    if (!VerifyTable(table, &cluster_status.table_summaries)) {
      bad_tables_count++;
    }
  }

  if (bad_tables_count > 0) {
      return Status::Corruption(
          Substitute("$0 out of $1 table(s) are not healthy", bad_tables_count,
                     cluster_status.table_summaries.size() +
                         cluster_status.system_table_summaries.size()));
  }
  return Status::OK();
}

Status Ksck::ChecksumData(const KsckChecksumOptions& opts) {
  KsckChecksummer checksummer(cluster_.get());
  auto* checksum_results = &results_.checksum_results;
  // Don't ruin JSON output with ad hoc progress updates.
  auto* out_for_progress_updates = IsNonJSONFormat() ? out_ : nullptr;
  return checksummer.ChecksumData(opts,
                                  checksum_results,
                                  out_for_progress_updates);
}

bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table, vector<TableSummary>* table_summaries) {
  if (table->tablets().empty()) {
    VLOG(1) << Substitute("Skipping table $0 as it has no matching tablets",
                          table->name());
    return true;
  }

  TableSummary ts;
  ts.id = table->id();
  ts.name = table->name();
  ts.replication_factor = table->num_replicas();
  VLOG(1) << Substitute("Verifying $0 tablet(s) for table $1 configured with num_replicas = $2",
                        table->tablets().size(), table->name(), table->num_replicas());
  for (const auto& tablet : table->tablets()) {
    auto tablet_result = VerifyTablet(tablet, table->num_replicas());
    switch (tablet_result) {
      case HealthCheckResult::HEALTHY:
        ts.healthy_tablets++;
        break;
      case HealthCheckResult::RECOVERING:
        ts.recovering_tablets++;
        break;
      case HealthCheckResult::UNDER_REPLICATED:
        ts.underreplicated_tablets++;
        break;
      case HealthCheckResult::CONSENSUS_MISMATCH:
        ts.consensus_mismatch_tablets++;
        break;
      case HealthCheckResult::UNAVAILABLE:
        ts.unavailable_tablets++;
        break;
    }
  }
  bool all_healthy = ts.healthy_tablets == ts.TotalTablets();
  if (ts.TotalTablets() > 0) {
    table_summaries->emplace_back(std::move(ts));
  }
  return all_healthy;
}

HealthCheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet,
                                     int table_num_replicas) {
  const string tablet_str = Substitute("Tablet $0 of table '$1'",
                                 tablet->id(), tablet->table()->name());

  auto leader_it = std::find_if(tablet->replicas().cbegin(), tablet->replicas().cend(),
      [](const shared_ptr<KsckTabletReplica>& r) { return r->is_leader(); });
  optional<string> leader_uuid;
  if (leader_it != tablet->replicas().cend()) {
    leader_uuid = (*leader_it)->ts_uuid();
  }
  vector<string> voter_uuids_from_master;
  vector<string> non_voter_uuids_from_master;
  for (const auto& replica : tablet->replicas()) {
    if (replica->is_voter()) {
      voter_uuids_from_master.push_back(replica->ts_uuid());
    } else {
      non_voter_uuids_from_master.push_back(replica->ts_uuid());
    }
  }
  ConsensusState master_config(ConsensusConfigType::MASTER,
                               nullopt,
                               nullopt,
                               leader_uuid,
                               voter_uuids_from_master,
                               non_voter_uuids_from_master);

  int leaders_count = 0;
  int running_voters_count = 0;
  int copying_replicas_count = 0;
  int conflicting_states = 0;
  int num_voters = 0;
  vector<ReplicaSummary> replicas;
  for (const shared_ptr<KsckTabletReplica>& replica : tablet->replicas()) {
    replicas.emplace_back();
    auto* repl_info = &replicas.back();
    repl_info->ts_uuid = replica->ts_uuid();
    VLOG(1) << Substitute("A replica of tablet $0 is on live tablet server $1",
                          tablet->id(), replica->ts_uuid());

    // Check for agreement on tablet assignment and state between the master
    // and the tablet server.
    auto ts = FindPointeeOrNull(cluster_->tablet_servers(), replica->ts_uuid());
    if (ts) {
      repl_info->ts_address = ts->address();
    }
    if (ts && ts->is_healthy()) {
      repl_info->ts_healthy = true;
      repl_info->state = ts->ReplicaState(tablet->id());
      if (ContainsKey(ts->tablet_status_map(), tablet->id())) {
        repl_info->status_pb = ts->tablet_status_map().at(tablet->id());
      }

      // Organize consensus info for each replica.
      pair<string, string> tablet_key = std::make_pair(ts->uuid(), tablet->id());
      if (ContainsKey(ts->tablet_consensus_state_map(), tablet_key)) {
        const auto& cstate = FindOrDieNoPrint(ts->tablet_consensus_state_map(), tablet_key);
        ConsensusState ksck_cstate;
        BuildConsensusStateForConfigMember(cstate, &ksck_cstate);
        repl_info->consensus_state = std::move(ksck_cstate);
      }
    }

    repl_info->is_leader = replica->is_leader();
    repl_info->is_voter = replica->is_voter();
    num_voters += replica->is_voter() ? 1 : 0;
    if (replica->is_leader()) {
      leaders_count++;
    }
    if (repl_info->state == tablet::RUNNING && replica->is_voter()) {
      running_voters_count++;
    } else if (repl_info->status_pb &&
               repl_info->status_pb->tablet_data_state() == tablet::TABLET_DATA_COPYING) {
      copying_replicas_count++;
    }
  }
  // Compare the master's and peers' consensus configs.
  for (const auto& r : replicas) {
    if (r.consensus_state && !r.consensus_state->Matches(master_config)) {
      conflicting_states++;
    }
  }

  // Determine the overall health state of the tablet.
  HealthCheckResult result = HealthCheckResult::HEALTHY;
  string status;
  int majority_size = consensus::MajoritySize(num_voters);
  if (copying_replicas_count > 0) {
    result = HealthCheckResult::RECOVERING;
    status = Substitute("$0 is $1: $2 on-going tablet copies",
                        tablet_str,
                        Color(AnsiCode::YELLOW, "recovering"),
                        copying_replicas_count);
  } else if (running_voters_count < majority_size) {
    result = HealthCheckResult::UNAVAILABLE;
    status = Substitute("$0 is $1: $2 replica(s) not RUNNING",
                        tablet_str,
                        Color(AnsiCode::RED, "unavailable"),
                        num_voters - running_voters_count);
  } else if (running_voters_count < num_voters) {
    result = HealthCheckResult::UNDER_REPLICATED;
    status = Substitute("$0 is $1: $2 replica(s) not RUNNING",
                        tablet_str,
                        Color(AnsiCode::YELLOW, "under-replicated"),
                        num_voters - running_voters_count);
  } else if (check_replica_count_ && num_voters < table_num_replicas) {
    result = HealthCheckResult::UNDER_REPLICATED;
    status = Substitute("$0 is $1: configuration has $2 replicas vs desired $3",
                        tablet_str,
                        Color(AnsiCode::YELLOW, "under-replicated"),
                        num_voters,
                        table_num_replicas);
  } else if (leaders_count != 1) {
    result = HealthCheckResult::UNAVAILABLE;
    status = Substitute("$0 is $1: expected one LEADER replica",
                        tablet_str, Color(AnsiCode::RED, "unavailable"));
  } else if (conflicting_states > 0) {
    result = HealthCheckResult::CONSENSUS_MISMATCH;
    status = Substitute("$0 is $1: $2 replicas' active configs disagree with the "
                        "leader master's",
                        tablet_str,
                        Color(AnsiCode::YELLOW, "conflicted"),
                        conflicting_states);
  } else {
    status = Substitute("$0 is $1.",
                        tablet_str,
                        Color(AnsiCode::GREEN, "healthy"));
  }

  TabletSummary tablet_summary;
  tablet_summary.id = tablet->id();
  tablet_summary.table_id = tablet->table()->id();
  tablet_summary.table_name = tablet->table()->name();
  tablet_summary.result = result;
  tablet_summary.status = status;
  tablet_summary.master_cstate = std::move(master_config);
  tablet_summary.replicas.swap(replicas);

  // Add printable representation of the key for the start of the range.
  const auto& range_key_begin = tablet->partition().begin().range_key();
  ostringstream ss_range_key_begin;
  for (size_t i = 0; i < range_key_begin.size(); ++i) {
    ss_range_key_begin << std::hex << std::setw(2) << std::setfill('0')
                       << static_cast<uint16_t>(range_key_begin[i]);
  }
  tablet_summary.range_key_begin = ss_range_key_begin.str();
  VLOG(1) << Substitute("range start key for tablet $0: '$1'",
      tablet_summary.id, tablet_summary.range_key_begin);
  results_.cluster_status.tablet_summaries.push_back(std::move(tablet_summary));
  return result;
}

} // namespace tools
} // namespace kudu
