// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <functional>
#include <iostream>
#include <iterator>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/schema.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/hms/hive_metastore_types.h"
#include "kudu/hms/hms_catalog.h"
#include "kudu/hms/hms_client.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"

DECLARE_bool(force);
DECLARE_bool(hive_metastore_sasl_enabled);
DECLARE_string(columns);
DECLARE_string(hive_metastore_uris);

DEFINE_bool(dryrun, false,
    "Print a message for each fix, but do not make modifications to Kudu or the Hive Metastore.");
DEFINE_bool(drop_orphan_hms_tables, false,
    "Drop orphan Hive Metastore tables which refer to non-existent Kudu tables.");
DEFINE_bool(create_missing_hms_tables, true,
    "Create a Hive Metastore table for each Kudu table which is missing one.");
DEFINE_bool(fix_inconsistent_tables, true,
    "Fix tables whose Kudu and Hive Metastore metadata differ. If the table name is "
    "different, the table is renamed in Kudu to match the HMS. If the columns "
    "or other metadata is different the HMS is updated to match Kudu.");
DEFINE_bool(upgrade_hms_tables, true,
    "Upgrade Hive Metastore tables from the legacy Impala metadata format to the "
    "new Kudu metadata format.");
DEFINE_bool(ignore_other_clusters, true,
    "Whether to ignore entirely separate Kudu clusters, as indicated by a "
    "different set of master addresses.");

using kudu::client::KuduClient;
using kudu::client::KuduSchema;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::sp::shared_ptr;
using kudu::hms::HmsCatalog;
using kudu::hms::HmsClient;
using std::cout;
using std::endl;
using std::make_pair;
using std::nullopt;
using std::ostream;
using std::pair;
using std::set;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
using strings::Split;
using strings::Substitute;

namespace kudu {
namespace tools {

// Only alter the table in Kudu but not in the Hive Metastore.
Status RenameTableInKuduCatalog(KuduClient* kudu_client,
                                const string& name,
                                const string& new_name) {
  unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name));
  return alterer->RenameTo(new_name)
                ->modify_external_catalogs(false)
                ->Alter();
}

// Only alter the table in Kudu but not in the Hive Metastore.
Status ChangeOwnerInKuduCatalog(KuduClient* kudu_client,
                                const string& name,
                                const string& owner) {
  unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name));
  return alterer->SetOwner(owner)
                ->modify_external_catalogs(false)
                ->Alter();
}

// Only alter the table in Kudu but not in the Hive Metastore.
Status ChangeTableCommentInKuduCatalog(KuduClient* kudu_client,
                                       const string& name,
                                       const string& comment) {
  unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name));
  return alterer->SetComment(comment)
      ->modify_external_catalogs(false)
      ->Alter();
}

Status Init(const RunnerContext& context,
            shared_ptr<KuduClient>* kudu_client,
            unique_ptr<HmsCatalog>* hms_catalog,
            string* master_addrs) {
  string master_addrs_flag;
  RETURN_NOT_OK(ParseMasterAddressesStr(context, &master_addrs_flag));

  // Create a Kudu Client.
  const vector<string> master_addresses(Split(master_addrs_flag, ","));
  RETURN_NOT_OK(CreateKuduClient(master_addresses, kudu_client));

  // Get the configured master addresses from the leader master. It's critical
  // that the check and fix tools use the exact same master address
  // configuration that the masters do, otherwise the HMS table entries will
  // disagree on the master addresses property.
  *master_addrs = (*kudu_client)->GetMasterAddresses();

  if (FLAGS_hive_metastore_uris.empty()) {
    string hive_metastore_uris = (*kudu_client)->GetHiveMetastoreUris();
    if (hive_metastore_uris.empty()) {
      return Status::ConfigurationError(
          "Could not fetch the Hive Metastore locations from the Kudu master "
          "since it is not configured with the Hive Metastore integration. "
          "Run the tool with --hive_metastore_uris and pass in the location(s) "
          "of the Hive Metastore.");
    }
    bool hive_metastore_sasl_enabled = (*kudu_client)->GetHiveMetastoreSaslEnabled();

    // Override the flag values.
    FLAGS_hive_metastore_uris = hive_metastore_uris;
    FLAGS_hive_metastore_sasl_enabled = hive_metastore_sasl_enabled;
  }

  // Create HMS catalog.
  hms_catalog->reset(new hms::HmsCatalog(master_addrs_flag));
  return (*hms_catalog)->Start();
}

Status HmsDowngrade(const RunnerContext& context) {
  shared_ptr<KuduClient> kudu_client;
  unique_ptr<HmsCatalog> hms_catalog;
  string master_addrs;
  Init(context, &kudu_client, &hms_catalog, &master_addrs);

  // 1. Identify all Kudu tables in the HMS entries.
  vector<hive::Table> hms_tables;
  RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));

  // 2. Downgrades all Kudu tables to legacy table format.
  for (auto& hms_table : hms_tables) {
    if (hms_table.parameters[HmsClient::kStorageHandlerKey] == HmsClient::kKuduStorageHandler) {
      RETURN_NOT_OK(hms_catalog->DowngradeToLegacyImpalaTable(
          Substitute("$0.$1", hms_table.dbName, hms_table.tableName)));
    }
  }

  return Status::OK();
}

// Given a Kudu table and a HMS table, checks if their metadata is in sync.
bool IsSynced(const set<string>& master_addresses,
              const KuduTable& kudu_table,
              const hive::Table& hms_table) {
  DCHECK(!master_addresses.empty());
  auto schema = KuduSchema::ToSchema(kudu_table.schema());
  hive::Table hms_table_copy(hms_table);
  const string* hms_masters_field = FindOrNull(hms_table.parameters,
                                               HmsClient::kKuduMasterAddrsKey);
  if (!hms_masters_field ||
      master_addresses != static_cast<set<string>>(Split(*hms_masters_field, ","))) {
    return false;
  }
  Status s = HmsCatalog::PopulateTable(kudu_table.id(), kudu_table.name(), kudu_table.owner(),
                                       schema, kudu_table.comment(),
                                       kudu_table.client()->cluster_id(), *hms_masters_field,
                                       hms_table.tableType, &hms_table_copy);
  return s.ok() && hms_table_copy == hms_table;
}

// Prints catalog information about Kudu tables in data table format to 'out'.
Status PrintKuduTables(const string& master_addrs,
                       const vector<shared_ptr<KuduTable>>& kudu_tables,
                       ostream& out) {
  DataTable table({
      "Kudu table",
      "Kudu table ID",
      "Kudu master addresses",
  });
  for (const auto& kudu_table : kudu_tables) {
    table.AddRow({
        kudu_table->name(),
        kudu_table->id(),
        master_addrs,
    });
  }
  return table.PrintTo(out);
}

// Prints catalog information about Kudu and HMS tables in data table format to 'out'.
Status PrintTables(const string& master_addrs,
                   vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables,
                   ostream& out) {
  DataTable table({
      "Kudu table",
      "Kudu table ID",
      "Kudu cluster ID",
      "Kudu owner",
      "Kudu master addresses",
      "HMS database",
      "HMS table",
      "HMS table type",
      "HMS owner",
      Substitute("HMS $0", HmsClient::kKuduTableNameKey),
      Substitute("HMS $0", HmsClient::kKuduTableIdKey),
      Substitute("HMS $0", HmsClient::kKuduClusterIdKey),
      Substitute("HMS $0", HmsClient::kKuduMasterAddrsKey),
      Substitute("HMS $0", HmsClient::kStorageHandlerKey),
  });
  for (auto& pair : tables) {
    vector<string> row;
    if (pair.first) {
      // Note: If adding or removing fields, be sure to adjust `row.resize` below.
      const KuduTable& kudu_table = *pair.first;
      row.emplace_back(kudu_table.name());
      row.emplace_back(kudu_table.id());
      row.emplace_back(kudu_table.client()->cluster_id());
      row.emplace_back(kudu_table.owner());
      row.emplace_back(master_addrs);
    } else {
      row.resize(5);
    }
    if (pair.second) {
      // Note: If adding or removing fields, be sure to adjust `row.resize` below.
      hive::Table& hms_table = *pair.second;
      row.emplace_back(hms_table.dbName);
      row.emplace_back(hms_table.tableName);
      row.emplace_back(hms_table.tableType);
      row.emplace_back(hms_table.owner);
      row.emplace_back(hms_table.parameters[HmsClient::kKuduTableNameKey]);
      row.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]);
      row.emplace_back(hms_table.parameters[HmsClient::kKuduClusterIdKey]);
      row.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]);
      row.emplace_back(hms_table.parameters[HmsClient::kStorageHandlerKey]);
    } else {
      row.resize(14);
    }
    table.AddRow(std::move(row));
  }
  return table.PrintTo(out);
}

// Prints catalog information about Kudu HMS tables in data table format to 'out'.
Status PrintHMSTables(vector<hive::Table> tables, ostream& out) {
  DataTable table({});
  for (const auto& column : strings::Split(FLAGS_columns, ",", strings::SkipEmpty())) {
    vector<string> values;
    if (iequals(column.ToString(), "database")) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.dbName);
      }
    } else if (iequals(column.ToString(), "table")) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.tableName);
      }
    } else if (iequals(column.ToString(), "type")) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.tableType);
      }
    } else if (iequals(column.ToString(), "owner")) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.owner);
      }
    } else if (iequals(column.ToString(), "comment")) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.parameters[HmsClient::kTableCommentKey]);
      }
    } else if (iequals(column.ToString(), HmsClient::kKuduTableNameKey)) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.parameters[HmsClient::kKuduTableNameKey]);
      }
    } else if (iequals(column.ToString(), HmsClient::kKuduTableIdKey)) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]);
      }
    } else if (iequals(column.ToString(), HmsClient::kKuduClusterIdKey)) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.parameters[HmsClient::kKuduClusterIdKey]);
      }
    } else if (iequals(column.ToString(), HmsClient::kKuduMasterAddrsKey)) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]);
      }
    } else if (iequals(column.ToString(), HmsClient::kStorageHandlerKey)) {
      for (auto& hms_table : tables) {
        values.emplace_back(hms_table.parameters[HmsClient::kStorageHandlerKey]);
      }
    } else {
      return Status::InvalidArgument("unknown column (--columns)", column);
    }
    table.AddColumn(column.ToString(), std::move(values));
  }
  return table.PrintTo(out);
}

// A report of inconsistent tables in Kudu and the HMS catalogs.
struct CatalogReport {
  // Kudu tables in the HMS catalog which have no corresponding table in the
  // Kudu catalog (including legacy tables).
  vector<hive::Table> orphan_hms_tables;

  // Tables in the Kudu catalog which have no corresponding table in the HMS catalog.
  vector<shared_ptr<KuduTable>> missing_hms_tables;

  // Tables in the Kudu catalog which have a Hive-incompatible name, and which
  // are not referenced by an existing legacy Hive table (otherwise they would
  // fall in to 'inconsistent_tables').
  //
  // These tables can not be automatically corrected by the fix tool.
  vector<shared_ptr<KuduTable>> invalid_name_tables;

  // Legacy Imapala/Kudu tables (storage handler is com.cloudera.kudu.hive.KuduStorageHandler).
  vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_hms_tables;

  // Kudu tables with multiple HMS table entries. The entries may or may not be legacy.
  //
  // These tables can not be automatically corrected by the fix tool.
  vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_hms_tables;

  // Tables whose Kudu catalog table and HMS table are inconsistent.
  vector<pair<shared_ptr<KuduTable>, hive::Table>> inconsistent_tables;

  // Returns true if the report is empty.
  bool empty() const {
    return orphan_hms_tables.empty()
      && legacy_hms_tables.empty()
      && duplicate_hms_tables.empty()
      && inconsistent_tables.empty()
      && missing_hms_tables.empty()
      && invalid_name_tables.empty();
  }
};

// Retrieves the entire Kudu catalog, as well as all Kudu tables in the HMS
// catalog, and compares them to find inconsistencies.
//
// Inconsistencies are bucketed into different groups, corresponding to how they
// can be repaired.
Status AnalyzeCatalogs(const string& master_addrs,
                       HmsCatalog* hms_catalog,
                       KuduClient* kudu_client,
                       CatalogReport* report,
                       int* kudu_catalog_count = nullptr,
                       int* hms_catalog_count = nullptr) {
  // Step 1: retrieve all Kudu tables, and aggregate them by ID and by name. The
  // by-ID map will be used to match the HMS Kudu table entries. The by-name map
  // will be used to match against legacy Impala/Kudu HMS table entries.
  unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_id;
  unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_name;
  {
    vector<string> kudu_table_names;
    RETURN_NOT_OK(kudu_client->ListTables(&kudu_table_names));
    if (kudu_catalog_count) {
      *kudu_catalog_count = kudu_table_names.size();
    }
    for (const string& kudu_table_name : kudu_table_names) {
      shared_ptr<KuduTable> kudu_table;
      // TODO(dan): When the error is NotFound, prepend an admonishment about not
      // running this tool when the catalog is in-flux.
      RETURN_NOT_OK(kudu_client->OpenTable(kudu_table_name, &kudu_table));
      kudu_tables_by_id.emplace(kudu_table->id(), kudu_table);
      kudu_tables_by_name.emplace(kudu_table->name(), std::move(kudu_table));
    }
  }

  // Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned
  // entries which reference non-existent Kudu tables, and group the rest by
  // table type and table ID.
  const set<string> kudu_master_addrs = Split(master_addrs, ",");
  vector<hive::Table> orphan_hms_tables;
  unordered_map<string, vector<hive::Table>> synchronized_hms_tables_by_id;
  unordered_map<string, vector<hive::Table>> external_hms_tables_by_id;
  {
    vector<hive::Table> hms_tables;
    RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));
    if (hms_catalog_count) {
      *hms_catalog_count = hms_tables.size();
    }
    for (hive::Table& hms_table : hms_tables) {
      // If the addresses in the HMS entry don't overlap at all with the
      // expected addresses, the entry is likely from another Kudu cluster.
      // Ignore it if appropriate.
      if (FLAGS_ignore_other_clusters) {
        const string* hms_masters_field = FindOrNull(hms_table.parameters,
                                                    HmsClient::kKuduMasterAddrsKey);
        vector<string> master_intersection;
        if (hms_masters_field) {
          const set<string> hms_master_addrs = Split(*hms_masters_field, ",");
          std::set_intersection(hms_master_addrs.begin(), hms_master_addrs.end(),
                                kudu_master_addrs.begin(), kudu_master_addrs.end(),
                                std::back_inserter(master_intersection));
        }
        if (master_intersection.empty()) {
          LOG(INFO) << Substitute("Skipping HMS table $0.$1 with different "
              "masters specified: $2", hms_table.dbName, hms_table.tableName,
              hms_masters_field ? *hms_masters_field : "<none>");
          continue;
        }
      }

      // If this is a non-legacy, synchronized table, we expect a table ID to exist
      // in the HMS entry; look up the Kudu table by ID. Otherwise, look it up
      // by table name.
      shared_ptr<KuduTable>* kudu_table;
      const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
      const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
      const string& hms_table_name = hms_table.parameters[HmsClient::kKuduTableNameKey];
      if (storage_handler == HmsClient::kKuduStorageHandler &&
          HmsClient::IsSynchronized(hms_table)) {
        kudu_table = FindOrNull(kudu_tables_by_id, hms_table_id);
        // If there is no kudu table that matches the id, or the id doesn't exist,
        // lookup the table by name. This handles synchronized tables created when HMS
        // sync was off or tables with IDs out of sync.
        if (!kudu_table) {
          kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
        }
      } else {
        kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name);
      }

      if (kudu_table) {
        if (HmsClient::IsSynchronized(hms_table)) {
          synchronized_hms_tables_by_id[(*kudu_table)->id()].emplace_back(
              std::move(hms_table));
        } else if (hms_table.tableType == HmsClient::kExternalTable) {
          external_hms_tables_by_id[(*kudu_table)->id()].emplace_back(
              std::move(hms_table));
        }
      } else if (HmsClient::IsSynchronized(hms_table)) {
        // Note: we only consider synchronized HMS table entries "orphans"
        // because unsynchronized external tables don't always point at valid
        // Kudu tables.
        orphan_hms_tables.emplace_back(std::move(hms_table));
      }
    }
  }

  // Step 3: Determine the state of each Kudu table's HMS entry(ies), and bin
  // them appropriately.
  vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_tables;
  vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_tables;
  vector<pair<shared_ptr<KuduTable>, hive::Table>> stale_tables;
  vector<shared_ptr<KuduTable>> missing_tables;
  vector<shared_ptr<KuduTable>> invalid_name_tables;
  for (auto& kudu_table_pair : kudu_tables_by_id) {
    shared_ptr<KuduTable> kudu_table = kudu_table_pair.second;
    // Check all of the synchronized HMS tables.
    vector<hive::Table>* hms_tables = FindOrNull(synchronized_hms_tables_by_id,
                                                 kudu_table_pair.first);
    // If the there are no synchronized HMS table entries, this table is missing
    // HMS tables and might have an invalid table name.
    if (!hms_tables) {
      const string& table_name = kudu_table->name();
      string normalized_table_name(table_name.data(), table_name.size());
      Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name);
      if (!s.ok()) {
        invalid_name_tables.emplace_back(std::move(kudu_table));
      } else {
        missing_tables.emplace_back(std::move(kudu_table));
      }
    // If there is a single synchronized HMS table, this table could be unsynced or
    // using the legacy handler.
    } else if (hms_tables->size() == 1) {
      hive::Table& hms_table = (*hms_tables)[0];
      const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
      if (storage_handler == HmsClient::kKuduStorageHandler &&
          !IsSynced(kudu_master_addrs, *kudu_table, hms_table)) {
        stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
      } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
        legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table)));
      }
    // Otherwise, there are multiple synchronized HMS tables for a single Kudu table.
    } else {
      for (hive::Table& hms_table : *hms_tables) {
        duplicate_tables.emplace_back(make_pair(kudu_table, std::move(hms_table)));
      }
    }
  }

  // Check all of the external HMS tables to see if they are using the legacy handler.
  for (auto& hms_table_pair : external_hms_tables_by_id) {
    shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_id, hms_table_pair.first);
    if (kudu_table) {
      for (hive::Table &hms_table : hms_table_pair.second) {
        const string &storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
        if (storage_handler == HmsClient::kLegacyKuduStorageHandler) {
          legacy_tables.emplace_back(make_pair(std::move(*kudu_table), std::move(hms_table)));
        }
      }
    }
  }
  report->orphan_hms_tables.swap(orphan_hms_tables);
  report->missing_hms_tables.swap(missing_tables);
  report->invalid_name_tables.swap(invalid_name_tables);
  report->inconsistent_tables.swap(stale_tables);
  report->legacy_hms_tables.swap(legacy_tables);
  report->duplicate_hms_tables.swap(duplicate_tables);
  return Status::OK();
}

Status CheckHmsMetadata(const RunnerContext& context) {
  // TODO(dan): check that the critical HMS configuration flags
  // (--hive_metastore_uris, --hive_metastore_sasl_enabled) match on all masters.
  shared_ptr<KuduClient> kudu_client;
  unique_ptr<HmsCatalog> hms_catalog;
  string master_addrs;
  RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs));

  CatalogReport report;
  RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report));

  if (report.empty()) {
    return Status::OK();
  }

  if (!report.invalid_name_tables.empty()) {
    cout << "Found Kudu table(s) with Hive-incompatible names:" << endl;
    RETURN_NOT_OK(PrintKuduTables(master_addrs, report.invalid_name_tables, cout));
    cout << endl
         << "Suggestion: rename the Kudu table(s) to be Hive-compatible, then run the fix tool:"
         << endl;
    for (const auto& table : report.invalid_name_tables) {
      cout << "\t$ kudu table rename_table --modify_external_catalogs=false "
           << master_addrs << " " << table->name() << " <database-name>.<table-name>" << endl;
    }
    cout << endl;
  }

  if (!report.duplicate_hms_tables.empty()) {
    vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables;
    for (auto& table : report.duplicate_hms_tables) {
      tables.emplace_back(table.first, &table.second);
    }
    cout << "Found Kudu table(s) with multiple corresponding Hive Metastore tables:" << endl;
    RETURN_NOT_OK(PrintTables(master_addrs, std::move(tables), cout));
    cout << endl
         << "Suggestion: using Impala or the Hive Beeline shell, alter one of the duplicate"
         << endl
         << "Hive Metastore tables to be an external table referencing the base Kudu table."
         << endl
         << endl;
  }

  if (!report.orphan_hms_tables.empty() || !report.missing_hms_tables.empty()
      || !report.legacy_hms_tables.empty() || !report.inconsistent_tables.empty()) {
    vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables;
    for (const auto& kudu_table : report.missing_hms_tables) {
      tables.emplace_back(kudu_table, nullptr);
    }
    for (auto& table : report.legacy_hms_tables) {
      tables.emplace_back(table.first, &table.second);
    }
    for (auto& table : report.inconsistent_tables) {
      tables.emplace_back(table.first, &table.second);
    }
    for (auto& hms_table : report.orphan_hms_tables) {
      tables.emplace_back(shared_ptr<KuduTable>(), &hms_table);
    }
    cout << "Found table(s) with missing or inconsisentent metadata in the Kudu "
            "catalog or Hive Metastore:" << endl;
    RETURN_NOT_OK(PrintTables(master_addrs, std::move(tables), cout));

    if (report.orphan_hms_tables.empty()) {
      cout << endl
          << "Suggestion: use the fix tool to repair the Kudu and Hive Metastore catalog metadata:"
          << endl
          << "\t$ kudu hms fix " << master_addrs << endl;
    } else {
      cout << endl
          << "Suggestion: use the fix tool to repair the Kudu and Hive Metastore catalog metadata"
          << endl
          << "and drop Hive Metastore table(s) which reference non-existent Kudu tables:" << endl
          << "\t$ kudu hms fix --drop_orphan_hms_tables " << master_addrs << endl;
    }
  }

  // TODO(dan): add a link to the HMS guide on kudu.apache.org to this message.
  return Status::IllegalState("found inconsistencies in the Kudu and HMS catalogs");
}

// Pretty-prints the table name and ID.
string TableIdent(const KuduTable& table) {
  return Substitute("$0 [id=$1]", table.name(), table.id());
}

// Analyzes the Kudu and HMS catalogs and attempts to fix any
// automatically-fixable issues.
//
// Error handling: unexpected errors (e.g. networking errors) are fatal and
// result in returning early. Expected application failures such as a rename
// failing due to duplicate table being present are logged and execution
// continues.
Status FixHmsMetadata(const RunnerContext& context) {
  shared_ptr<KuduClient> kudu_client;
  unique_ptr<HmsCatalog> hms_catalog;
  string master_addrs;
  RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs));

  CatalogReport report;
  int kudu_catalog_count = 0;
  int hms_catalog_count = 0;
  RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report,
                                &kudu_catalog_count, &hms_catalog_count));
  if (FLAGS_dryrun && kudu_catalog_count == 0) {
    LOG(INFO) << "NOTE: There are zero kudu tables listed. If the cluster indeed has kudu tables "
                 "please re-run the command with right credentials." << endl;
  }
  bool success = true;

  if (FLAGS_drop_orphan_hms_tables) {
    for (hive::Table& hms_table : report.orphan_hms_tables) {
      string table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
      const string& master_addrs_param = hms_table.parameters[HmsClient::kKuduMasterAddrsKey];

      // Normalize the master addresses to allow for an equality check that ignores
      // missing default ports, duplicate addresses, and address order.
      UnorderedHostPortSet param_set;
      MasterAddressesToSet(master_addrs_param, &param_set);
      UnorderedHostPortSet cluster_set;
      MasterAddressesToSet(master_addrs, &cluster_set);

      if (param_set != cluster_set && !FLAGS_force) {
        LOG(INFO) << "Skipping drop of orphan HMS table " << table_name
                  << " with master addresses parameter " << master_addrs_param
                  << " because it does not match the --" << kMasterAddressesArg << " argument"
                  << " (use --force to skip this check)";
        continue;
      }

      if (FLAGS_dryrun) {
        LOG(INFO) << "[dryrun] Dropping orphan HMS table " << table_name;
      } else {
        const string& table_id = hms_table.parameters[HmsClient::kKuduTableIdKey];
        const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey];
        // All errors are fatal here, since we've already checked that the table exists in the HMS.
        if (storage_handler == HmsClient::kKuduStorageHandler) {
          RETURN_NOT_OK_PREPEND(hms_catalog->DropTable(table_id, table_name),
              Substitute("failed to drop orphan HMS table $0", table_name));
        } else {
          RETURN_NOT_OK_PREPEND(hms_catalog->DropLegacyTable(table_name),
              Substitute("failed to drop legacy orphan HMS table $0", table_name));
        }
      }
    }
  }

  if (FLAGS_create_missing_hms_tables) {
    for (const auto& kudu_table : report.missing_hms_tables) {
      const string& table_id = kudu_table->id();
      const string& cluster_id = kudu_table->client()->cluster_id();
      const string& table_name = kudu_table->name();
      auto schema = KuduSchema::ToSchema(kudu_table->schema());
      string normalized_table_name(table_name.data(), table_name.size());
      CHECK_OK(hms::HmsCatalog::NormalizeTableName(&normalized_table_name));

      if (FLAGS_dryrun) {
        LOG(INFO) << "[dryrun] Creating HMS table for Kudu table " << TableIdent(*kudu_table);
      } else {
        Status s = hms_catalog->CreateTable(table_id, table_name, cluster_id,
            kudu_table->owner(), schema, kudu_table->comment());
        if (s.IsAlreadyPresent()) {
          LOG(ERROR) << "Failed to create HMS table for Kudu table "
                     << TableIdent(*kudu_table)
                     << " because another table already exists in the HMS with that name";
          success = false;
          continue;
        }
        if (s.IsInvalidArgument()) {
          // This most likely means the database doesn't exist, but it is ambiguous.
          LOG(ERROR) << "Failed to create HMS table for Kudu table "
                     << TableIdent(*kudu_table)
                     << " (database does not exist?): " << s.message().ToString();
          success = false;
          continue;
        }
        // All other errors are unexpected.
        RETURN_NOT_OK_PREPEND(s,
            Substitute("failed to create HMS table for Kudu table $0", TableIdent(*kudu_table)));
      }

      if (normalized_table_name != table_name) {
        if (FLAGS_dryrun) {
          LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(*kudu_table)
                    << " to lowercased Hive-compatible name: " << normalized_table_name;
        } else {
          // All errors are fatal. We never expect to get an 'AlreadyPresent'
          // error, since the catalog manager validates that no two
          // Hive-compatible table names differ only by case.
          //
          // Note that if an error occurs we do not roll-back the HMS table
          // creation step, since a subsequent run of the tool will recognize
          // the table as an inconsistent table (Kudu and HMS table names do not
          // match), and automatically fix it.
          RETURN_NOT_OK_PREPEND(
              RenameTableInKuduCatalog(kudu_client.get(), table_name, normalized_table_name),
              Substitute("failed to rename Kudu table $0 to lowercased Hive compatible name $1",
                         TableIdent(*kudu_table), normalized_table_name));
        }
      }
    }
  }

  if (FLAGS_upgrade_hms_tables) {
    for (const auto& table_pair : report.legacy_hms_tables) {
      const KuduTable& kudu_table = *table_pair.first;
      const hive::Table& hms_table = table_pair.second;
      string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);

      if (FLAGS_dryrun) {
        LOG(INFO) << "[dryrun] Upgrading legacy Impala HMS metadata for table "
                  << hms_table_name;
      } else {
        RETURN_NOT_OK_PREPEND(hms_catalog->UpgradeLegacyImpalaTable(
                  kudu_table.id(), kudu_table.client()->cluster_id(), hms_table.dbName,
                  hms_table.tableName, KuduSchema::ToSchema(kudu_table.schema()),
                  kudu_table.comment()),
            Substitute("failed to upgrade legacy Impala HMS metadata for table $0",
              hms_table_name));
      }

      if (HmsClient::IsSynchronized(hms_table) && kudu_table.name() != hms_table_name) {
        if (FLAGS_dryrun) {
          LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
                    << " to " << hms_table_name;
        } else {
          Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
          if (s.IsAlreadyPresent()) {
            LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
                       << " to match the Hive Metastore name " << hms_table_name
                       << ", because a Kudu table with name" << hms_table_name
                       << " already exists";
            LOG(INFO) << "Suggestion: rename the conflicting table name manually:\n"
                      << "\t$ kudu table rename_table --modify_external_catalogs=false "
                      << master_addrs << " " << hms_table_name << " <database-name>.<table-name>'";
            success = false;
            continue;
          }

          // All other errors are fatal. Note that if an error occurs we do not
          // roll-back the HMS legacy upgrade step, since a subsequent run of
          // the tool will recognize the table as an inconsistent table (Kudu
          // and HMS table names do not match), and automatically fix it.
          RETURN_NOT_OK_PREPEND(s,
              Substitute("failed to rename Kudu table $0 to $1",
                TableIdent(kudu_table), hms_table_name));
        }
      }
    }
  }

  if (FLAGS_fix_inconsistent_tables) {
    for (const auto& table_pair : report.inconsistent_tables) {
      const KuduTable& kudu_table = *table_pair.first;
      const hive::Table& hms_table = table_pair.second;
      string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName);
      string owner = kudu_table.owner();
      string comment = kudu_table.comment();

      if (hms_table_name != kudu_table.name()) {
        // Update the Kudu table name to match the HMS table name.
        if (FLAGS_dryrun) {
          LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table)
                    << " to " << hms_table_name;
        } else {
          Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name);
          if (s.IsAlreadyPresent()) {
            LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table)
                       << " to match HMS table " << hms_table_name
                       << ", because a Kudu table with name " << hms_table_name
                       << " already exists";
            success = false;
            continue;
          }
          RETURN_NOT_OK_PREPEND(s,
              Substitute("failed to rename Kudu table $0 to $1",
                TableIdent(kudu_table), hms_table_name));
        }
      }

      // If the HMS table has an owner and Kudu does not, update the Kudu table owner to match
      // the HMS table owner. Otherwise the metadata step below will ensure the Kudu owner
      // is updated in the HMS.
      if (hms_table.owner != owner && owner.empty()) {
        if (FLAGS_dryrun) {
          LOG(INFO) << "[dryrun] Changing owner of " << TableIdent(kudu_table)
                    << " to " << hms_table.owner << " in Kudu catalog.";
        } else {
          RETURN_NOT_OK_PREPEND(
              ChangeOwnerInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table.owner),
              Substitute("failed to change owner of $0 to $1 in Kudu catalog",
                TableIdent(kudu_table), hms_table.owner));
          owner = hms_table.owner;
        }
      }

      // If the HMS table has a table comment and Kudu does not, update the Kudu table comment
      // to match the HMS table comment. Otherwise the metadata step below will ensure the Kudu
      // comment is updated in the HMS.
      const string hms_table_comment =
          FindWithDefault(hms_table.parameters, hms::HmsClient::kTableCommentKey, "");
      if (hms_table_comment != comment && comment.empty()) {
        if (FLAGS_dryrun) {
          LOG(INFO) << "[dryrun] Changing table comment for " << TableIdent(kudu_table)
                    << " to " << hms_table_comment << " in Kudu catalog.";
        } else {
          RETURN_NOT_OK_PREPEND(ChangeTableCommentInKuduCatalog(kudu_client.get(),
              kudu_table.name(), hms_table_comment),
              Substitute("failed to change table comment of $0 to $1 in Kudu catalog",
                  TableIdent(kudu_table), hms_table_comment));
          comment = hms_table_comment;
        }
      }

      // Update the HMS table metadata to match Kudu.
      if (FLAGS_dryrun) {
        LOG(INFO) << "[dryrun] Refreshing HMS table metadata for Kudu table "
                  << TableIdent(kudu_table);
      } else {
        auto schema = KuduSchema::ToSchema(kudu_table.schema());
        RETURN_NOT_OK_PREPEND(
            // Disable table ID checking to support fixing tables with unsynchronized IDs.
            hms_catalog->AlterTable(kudu_table.id(), hms_table_name, hms_table_name,
                                    kudu_table.client()->cluster_id(), owner, schema,
                                    comment, /* check_id */ false),
            Substitute("failed to refresh HMS table metadata for Kudu table $0",
              TableIdent(kudu_table)));
      }
    }
  }
  LOG(INFO) << Substitute("Number of Kudu tables found in Kudu master catalog: $0",
                          kudu_catalog_count) << endl;
  LOG(INFO) << Substitute("Number of Kudu tables found in HMS catalog: $0", hms_catalog_count)
            << endl;
  if (FLAGS_dryrun || success) {
    return Status::OK();
  }
  return Status::RuntimeError("Failed to fix some catalog metadata inconsistencies");
}

Status List(const RunnerContext& context) {
  shared_ptr<KuduClient> kudu_client;
  unique_ptr<HmsCatalog> hms_catalog;
  string master_addrs;
  RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs));

  vector<hive::Table> hms_tables;
  RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables));

  return PrintHMSTables(hms_tables, cout);
}

Status Precheck(const RunnerContext& context) {
  string master_addrs;
  RETURN_NOT_OK(ParseMasterAddressesStr(context, &master_addrs));
  shared_ptr<KuduClient> client;
  const vector<string> master_addresses(Split(master_addrs, ","));
  RETURN_NOT_OK(CreateKuduClient(master_addresses, &client));
  vector<string> tables;
  RETURN_NOT_OK(client->ListTables(&tables));

  // Map of normalized table name to table names.
  bool conflicting_tables = false;
  unordered_map<string, vector<string>> normalized_tables;
  for (string& table : tables) {
    string normalized_table_name(table.data(), table.size());
    Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name);
    if (!s.ok()) {
      // This is not a Hive-compatible table name, so there can't be a conflict
      // among normalized names (see CatalogManager::NormalizeTableName).
      continue;
    }
    vector<string>& tables = normalized_tables[normalized_table_name];
    tables.emplace_back(std::move(table));
    conflicting_tables |= tables.size() > 1;
  }

  if (!conflicting_tables) {
    return Status::OK();
  }

  DataTable data_table({ "conflicting table names" });
  for (auto& table : normalized_tables) {
    if (table.second.size() > 1) {
      for (string& kudu_table : table.second) {
        data_table.AddRow({ std::move(kudu_table) });
      }
    }
  }

  cout << "Found Kudu tables whose names are not unique according to Hive's case-insensitive"
       << endl
       << "identifier requirements. These conflicting tables will cause master startup to" << endl
       << "fail when the Hive Metastore integration is enabled:";
  RETURN_NOT_OK(data_table.PrintTo(cout));
  cout << endl
       << "Suggestion: rename the conflicting tables to case-insensitive unique names:" << endl
       << "\t$ kudu table rename_table " << master_addrs
       << " <conflicting_table_name> <new_table_name>" << endl;

  return Status::IllegalState("found tables in Kudu with case-conflicting names");
}

unique_ptr<Mode> BuildHmsMode() {
  const string kHmsUrisDesc =
    "Address of the Hive Metastore instance(s). If not set, the configuration "
    "from the Kudu master is used, so this flag should not be overriden in typical "
    "situations. The provided port must be for the HMS Thrift service. "
    "If a port is not provided, defaults to 9083. If the HMS is deployed in an "
    "HA configuration, multiple comma-separated addresses should be supplied. "
    "The configured value must match the Hive hive.metastore.uris configuration.";

  const string kHmsSaslEnabledDesc =
    "Configures whether Thrift connections to the Hive Metastore use SASL "
    "(Kerberos) security. Only takes effect when --hive_metastore_uris is set, "
    "otherwise the configuration from the Kudu master is used. The configured "
    "value must match the the hive.metastore.sasl.enabled option in the Hive "
    "Metastore configuration.";

  unique_ptr<Action> hms_check =
      ClusterActionBuilder("check", &CheckHmsMetadata)
      .Description("Check metadata consistency between Kudu and the Hive Metastore catalogs")
      .AddOptionalParameter("hive_metastore_sasl_enabled", nullopt, kHmsSaslEnabledDesc)
      .AddOptionalParameter("hive_metastore_uris", nullopt, kHmsUrisDesc)
      .AddOptionalParameter("ignore_other_clusters")
      .Build();

  unique_ptr<Action> hms_downgrade =
      ClusterActionBuilder("downgrade", &HmsDowngrade)
      .Description("Downgrade the metadata to legacy format for Kudu and the Hive Metastores")
      .AddOptionalParameter("hive_metastore_sasl_enabled", nullopt, kHmsSaslEnabledDesc)
      .AddOptionalParameter("hive_metastore_uris", nullopt, kHmsUrisDesc)
      .Build();

  unique_ptr<Action> hms_fix =
      ClusterActionBuilder("fix", &FixHmsMetadata)
      .Description("Fix automatically-repairable metadata inconsistencies in the "
                   "Kudu and Hive Metastore catalogs")
      .AddOptionalParameter("dryrun")
      .AddOptionalParameter("drop_orphan_hms_tables")
      .AddOptionalParameter("create_missing_hms_tables")
      .AddOptionalParameter("fix_inconsistent_tables")
      .AddOptionalParameter("upgrade_hms_tables")
      .AddOptionalParameter("hive_metastore_sasl_enabled", nullopt, kHmsSaslEnabledDesc)
      .AddOptionalParameter("hive_metastore_uris", nullopt, kHmsUrisDesc)
      .AddOptionalParameter("ignore_other_clusters")
      .Build();

  unique_ptr<Action> hms_list =
      ClusterActionBuilder("list", &List)
      .Description("List the Kudu table HMS entries")
      .AddOptionalParameter("columns",
                            Substitute("database,table,type,$0",
                                              HmsClient::kKuduTableNameKey),
                            Substitute("Comma-separated list of HMS entry fields to "
                                       "include in output.\nPossible values: database, "
                                       "table, type, owner, comment, $0, $1, $2, $3, $4",
                                       HmsClient::kKuduTableNameKey,
                                       HmsClient::kKuduTableIdKey,
                                       HmsClient::kKuduClusterIdKey,
                                       HmsClient::kKuduMasterAddrsKey,
                                       HmsClient::kStorageHandlerKey))
      .AddOptionalParameter("format")
      .Build();

  unique_ptr<Action> hms_precheck =
      ClusterActionBuilder("precheck", &Precheck)
      .Description("Check that the Kudu cluster is prepared to enable "
                   "the Hive Metastore integration")
      .Build();

  return ModeBuilder("hms").Description("Operate on remote Hive Metastores")
                           .AddAction(std::move(hms_check))
                           .AddAction(std::move(hms_downgrade))
                           .AddAction(std::move(hms_fix))
                           .AddAction(std::move(hms_list))
                           .AddAction(std::move(hms_precheck))
                           .Build();
}

} // namespace tools
} // namespace kudu
