| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <functional> |
| #include <iostream> |
| #include <iterator> |
| #include <map> |
| #include <memory> |
| #include <optional> |
| #include <set> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/common/schema.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/stringpiece.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/hms/hive_metastore_types.h" |
| #include "kudu/hms/hms_catalog.h" |
| #include "kudu/hms/hms_client.h" |
| #include "kudu/tools/tool_action.h" |
| #include "kudu/tools/tool_action_common.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/string_case.h" |
| |
| DECLARE_bool(force); |
| DECLARE_bool(hive_metastore_sasl_enabled); |
| DECLARE_string(columns); |
| DECLARE_string(hive_metastore_uris); |
| |
| DEFINE_bool(dryrun, false, |
| "Print a message for each fix, but do not make modifications to Kudu or the Hive Metastore."); |
| DEFINE_bool(drop_orphan_hms_tables, false, |
| "Drop orphan Hive Metastore tables which refer to non-existent Kudu tables."); |
| DEFINE_bool(create_missing_hms_tables, true, |
| "Create a Hive Metastore table for each Kudu table which is missing one."); |
| DEFINE_bool(fix_inconsistent_tables, true, |
| "Fix tables whose Kudu and Hive Metastore metadata differ. If the table name is " |
| "different, the table is renamed in Kudu to match the HMS. If the columns " |
| "or other metadata is different the HMS is updated to match Kudu."); |
| DEFINE_bool(upgrade_hms_tables, true, |
| "Upgrade Hive Metastore tables from the legacy Impala metadata format to the " |
| "new Kudu metadata format."); |
| DEFINE_bool(ignore_other_clusters, true, |
| "Whether to ignore entirely separate Kudu clusters, as indicated by a " |
| "different set of master addresses."); |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableAlterer; |
| using kudu::client::sp::shared_ptr; |
| using kudu::hms::HmsCatalog; |
| using kudu::hms::HmsClient; |
| using std::cout; |
| using std::endl; |
| using std::make_pair; |
| using std::nullopt; |
| using std::ostream; |
| using std::pair; |
| using std::set; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::vector; |
| using strings::Split; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tools { |
| |
| // Only alter the table in Kudu but not in the Hive Metastore. |
| Status RenameTableInKuduCatalog(KuduClient* kudu_client, |
| const string& name, |
| const string& new_name) { |
| unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name)); |
| return alterer->RenameTo(new_name) |
| ->modify_external_catalogs(false) |
| ->Alter(); |
| } |
| |
| // Only alter the table in Kudu but not in the Hive Metastore. |
| Status ChangeOwnerInKuduCatalog(KuduClient* kudu_client, |
| const string& name, |
| const string& owner) { |
| unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name)); |
| return alterer->SetOwner(owner) |
| ->modify_external_catalogs(false) |
| ->Alter(); |
| } |
| |
| // Only alter the table in Kudu but not in the Hive Metastore. |
| Status ChangeTableCommentInKuduCatalog(KuduClient* kudu_client, |
| const string& name, |
| const string& comment) { |
| unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name)); |
| return alterer->SetComment(comment) |
| ->modify_external_catalogs(false) |
| ->Alter(); |
| } |
| |
| Status Init(const RunnerContext& context, |
| shared_ptr<KuduClient>* kudu_client, |
| unique_ptr<HmsCatalog>* hms_catalog, |
| string* master_addrs) { |
| string master_addrs_flag; |
| RETURN_NOT_OK(ParseMasterAddressesStr(context, &master_addrs_flag)); |
| |
| // Create a Kudu Client. |
| const vector<string> master_addresses(Split(master_addrs_flag, ",")); |
| RETURN_NOT_OK(CreateKuduClient(master_addresses, kudu_client)); |
| |
| // Get the configured master addresses from the leader master. It's critical |
| // that the check and fix tools use the exact same master address |
| // configuration that the masters do, otherwise the HMS table entries will |
| // disagree on the master addresses property. |
| *master_addrs = (*kudu_client)->GetMasterAddresses(); |
| |
| if (FLAGS_hive_metastore_uris.empty()) { |
| string hive_metastore_uris = (*kudu_client)->GetHiveMetastoreUris(); |
| if (hive_metastore_uris.empty()) { |
| return Status::ConfigurationError( |
| "Could not fetch the Hive Metastore locations from the Kudu master " |
| "since it is not configured with the Hive Metastore integration. " |
| "Run the tool with --hive_metastore_uris and pass in the location(s) " |
| "of the Hive Metastore."); |
| } |
| bool hive_metastore_sasl_enabled = (*kudu_client)->GetHiveMetastoreSaslEnabled(); |
| |
| // Override the flag values. |
| FLAGS_hive_metastore_uris = hive_metastore_uris; |
| FLAGS_hive_metastore_sasl_enabled = hive_metastore_sasl_enabled; |
| } |
| |
| // Create HMS catalog. |
| hms_catalog->reset(new hms::HmsCatalog(master_addrs_flag)); |
| return (*hms_catalog)->Start(); |
| } |
| |
| Status HmsDowngrade(const RunnerContext& context) { |
| shared_ptr<KuduClient> kudu_client; |
| unique_ptr<HmsCatalog> hms_catalog; |
| string master_addrs; |
| Init(context, &kudu_client, &hms_catalog, &master_addrs); |
| |
| // 1. Identify all Kudu tables in the HMS entries. |
| vector<hive::Table> hms_tables; |
| RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables)); |
| |
| // 2. Downgrades all Kudu tables to legacy table format. |
| for (auto& hms_table : hms_tables) { |
| if (hms_table.parameters[HmsClient::kStorageHandlerKey] == HmsClient::kKuduStorageHandler) { |
| RETURN_NOT_OK(hms_catalog->DowngradeToLegacyImpalaTable( |
| Substitute("$0.$1", hms_table.dbName, hms_table.tableName))); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| // Given a Kudu table and a HMS table, checks if their metadata is in sync. |
| bool IsSynced(const set<string>& master_addresses, |
| const KuduTable& kudu_table, |
| const hive::Table& hms_table) { |
| DCHECK(!master_addresses.empty()); |
| auto schema = KuduSchema::ToSchema(kudu_table.schema()); |
| hive::Table hms_table_copy(hms_table); |
| const string* hms_masters_field = FindOrNull(hms_table.parameters, |
| HmsClient::kKuduMasterAddrsKey); |
| if (!hms_masters_field || |
| master_addresses != static_cast<set<string>>(Split(*hms_masters_field, ","))) { |
| return false; |
| } |
| Status s = HmsCatalog::PopulateTable(kudu_table.id(), kudu_table.name(), kudu_table.owner(), |
| schema, kudu_table.comment(), |
| kudu_table.client()->cluster_id(), *hms_masters_field, |
| hms_table.tableType, &hms_table_copy); |
| return s.ok() && hms_table_copy == hms_table; |
| } |
| |
| // Prints catalog information about Kudu tables in data table format to 'out'. |
| Status PrintKuduTables(const string& master_addrs, |
| const vector<shared_ptr<KuduTable>>& kudu_tables, |
| ostream& out) { |
| DataTable table({ |
| "Kudu table", |
| "Kudu table ID", |
| "Kudu master addresses", |
| }); |
| for (const auto& kudu_table : kudu_tables) { |
| table.AddRow({ |
| kudu_table->name(), |
| kudu_table->id(), |
| master_addrs, |
| }); |
| } |
| return table.PrintTo(out); |
| } |
| |
| // Prints catalog information about Kudu and HMS tables in data table format to 'out'. |
| Status PrintTables(const string& master_addrs, |
| vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables, |
| ostream& out) { |
| DataTable table({ |
| "Kudu table", |
| "Kudu table ID", |
| "Kudu cluster ID", |
| "Kudu owner", |
| "Kudu master addresses", |
| "HMS database", |
| "HMS table", |
| "HMS table type", |
| "HMS owner", |
| Substitute("HMS $0", HmsClient::kKuduTableNameKey), |
| Substitute("HMS $0", HmsClient::kKuduTableIdKey), |
| Substitute("HMS $0", HmsClient::kKuduClusterIdKey), |
| Substitute("HMS $0", HmsClient::kKuduMasterAddrsKey), |
| Substitute("HMS $0", HmsClient::kStorageHandlerKey), |
| }); |
| for (auto& pair : tables) { |
| vector<string> row; |
| if (pair.first) { |
| // Note: If adding or removing fields, be sure to adjust `row.resize` below. |
| const KuduTable& kudu_table = *pair.first; |
| row.emplace_back(kudu_table.name()); |
| row.emplace_back(kudu_table.id()); |
| row.emplace_back(kudu_table.client()->cluster_id()); |
| row.emplace_back(kudu_table.owner()); |
| row.emplace_back(master_addrs); |
| } else { |
| row.resize(5); |
| } |
| if (pair.second) { |
| // Note: If adding or removing fields, be sure to adjust `row.resize` below. |
| hive::Table& hms_table = *pair.second; |
| row.emplace_back(hms_table.dbName); |
| row.emplace_back(hms_table.tableName); |
| row.emplace_back(hms_table.tableType); |
| row.emplace_back(hms_table.owner); |
| row.emplace_back(hms_table.parameters[HmsClient::kKuduTableNameKey]); |
| row.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]); |
| row.emplace_back(hms_table.parameters[HmsClient::kKuduClusterIdKey]); |
| row.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]); |
| row.emplace_back(hms_table.parameters[HmsClient::kStorageHandlerKey]); |
| } else { |
| row.resize(14); |
| } |
| table.AddRow(std::move(row)); |
| } |
| return table.PrintTo(out); |
| } |
| |
| // Prints catalog information about Kudu HMS tables in data table format to 'out'. |
| Status PrintHMSTables(vector<hive::Table> tables, ostream& out) { |
| DataTable table({}); |
| for (const auto& column : strings::Split(FLAGS_columns, ",", strings::SkipEmpty())) { |
| vector<string> values; |
| if (iequals(column.ToString(), "database")) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.dbName); |
| } |
| } else if (iequals(column.ToString(), "table")) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.tableName); |
| } |
| } else if (iequals(column.ToString(), "type")) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.tableType); |
| } |
| } else if (iequals(column.ToString(), "owner")) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.owner); |
| } |
| } else if (iequals(column.ToString(), "comment")) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.parameters[HmsClient::kTableCommentKey]); |
| } |
| } else if (iequals(column.ToString(), HmsClient::kKuduTableNameKey)) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.parameters[HmsClient::kKuduTableNameKey]); |
| } |
| } else if (iequals(column.ToString(), HmsClient::kKuduTableIdKey)) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]); |
| } |
| } else if (iequals(column.ToString(), HmsClient::kKuduClusterIdKey)) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.parameters[HmsClient::kKuduClusterIdKey]); |
| } |
| } else if (iequals(column.ToString(), HmsClient::kKuduMasterAddrsKey)) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]); |
| } |
| } else if (iequals(column.ToString(), HmsClient::kStorageHandlerKey)) { |
| for (auto& hms_table : tables) { |
| values.emplace_back(hms_table.parameters[HmsClient::kStorageHandlerKey]); |
| } |
| } else { |
| return Status::InvalidArgument("unknown column (--columns)", column); |
| } |
| table.AddColumn(column.ToString(), std::move(values)); |
| } |
| return table.PrintTo(out); |
| } |
| |
| // A report of inconsistent tables in Kudu and the HMS catalogs. |
| struct CatalogReport { |
| // Kudu tables in the HMS catalog which have no corresponding table in the |
| // Kudu catalog (including legacy tables). |
| vector<hive::Table> orphan_hms_tables; |
| |
| // Tables in the Kudu catalog which have no corresponding table in the HMS catalog. |
| vector<shared_ptr<KuduTable>> missing_hms_tables; |
| |
| // Tables in the Kudu catalog which have a Hive-incompatible name, and which |
| // are not referenced by an existing legacy Hive table (otherwise they would |
| // fall in to 'inconsistent_tables'). |
| // |
| // These tables can not be automatically corrected by the fix tool. |
| vector<shared_ptr<KuduTable>> invalid_name_tables; |
| |
| // Legacy Imapala/Kudu tables (storage handler is com.cloudera.kudu.hive.KuduStorageHandler). |
| vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_hms_tables; |
| |
| // Kudu tables with multiple HMS table entries. The entries may or may not be legacy. |
| // |
| // These tables can not be automatically corrected by the fix tool. |
| vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_hms_tables; |
| |
| // Tables whose Kudu catalog table and HMS table are inconsistent. |
| vector<pair<shared_ptr<KuduTable>, hive::Table>> inconsistent_tables; |
| |
| // Returns true if the report is empty. |
| bool empty() const { |
| return orphan_hms_tables.empty() |
| && legacy_hms_tables.empty() |
| && duplicate_hms_tables.empty() |
| && inconsistent_tables.empty() |
| && missing_hms_tables.empty() |
| && invalid_name_tables.empty(); |
| } |
| }; |
| |
| // Retrieves the entire Kudu catalog, as well as all Kudu tables in the HMS |
| // catalog, and compares them to find inconsistencies. |
| // |
| // Inconsistencies are bucketed into different groups, corresponding to how they |
| // can be repaired. |
| Status AnalyzeCatalogs(const string& master_addrs, |
| HmsCatalog* hms_catalog, |
| KuduClient* kudu_client, |
| CatalogReport* report, |
| int* kudu_catalog_count = nullptr, |
| int* hms_catalog_count = nullptr) { |
| // Step 1: retrieve all Kudu tables, and aggregate them by ID and by name. The |
| // by-ID map will be used to match the HMS Kudu table entries. The by-name map |
| // will be used to match against legacy Impala/Kudu HMS table entries. |
| unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_id; |
| unordered_map<string, shared_ptr<KuduTable>> kudu_tables_by_name; |
| { |
| vector<string> kudu_table_names; |
| RETURN_NOT_OK(kudu_client->ListTables(&kudu_table_names)); |
| if (kudu_catalog_count) { |
| *kudu_catalog_count = kudu_table_names.size(); |
| } |
| for (const string& kudu_table_name : kudu_table_names) { |
| shared_ptr<KuduTable> kudu_table; |
| // TODO(dan): When the error is NotFound, prepend an admonishment about not |
| // running this tool when the catalog is in-flux. |
| RETURN_NOT_OK(kudu_client->OpenTable(kudu_table_name, &kudu_table)); |
| kudu_tables_by_id.emplace(kudu_table->id(), kudu_table); |
| kudu_tables_by_name.emplace(kudu_table->name(), std::move(kudu_table)); |
| } |
| } |
| |
| // Step 2: retrieve all Kudu table entries in the HMS, filter all orphaned |
| // entries which reference non-existent Kudu tables, and group the rest by |
| // table type and table ID. |
| const set<string> kudu_master_addrs = Split(master_addrs, ","); |
| vector<hive::Table> orphan_hms_tables; |
| unordered_map<string, vector<hive::Table>> synchronized_hms_tables_by_id; |
| unordered_map<string, vector<hive::Table>> external_hms_tables_by_id; |
| { |
| vector<hive::Table> hms_tables; |
| RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables)); |
| if (hms_catalog_count) { |
| *hms_catalog_count = hms_tables.size(); |
| } |
| for (hive::Table& hms_table : hms_tables) { |
| // If the addresses in the HMS entry don't overlap at all with the |
| // expected addresses, the entry is likely from another Kudu cluster. |
| // Ignore it if appropriate. |
| if (FLAGS_ignore_other_clusters) { |
| const string* hms_masters_field = FindOrNull(hms_table.parameters, |
| HmsClient::kKuduMasterAddrsKey); |
| vector<string> master_intersection; |
| if (hms_masters_field) { |
| const set<string> hms_master_addrs = Split(*hms_masters_field, ","); |
| std::set_intersection(hms_master_addrs.begin(), hms_master_addrs.end(), |
| kudu_master_addrs.begin(), kudu_master_addrs.end(), |
| std::back_inserter(master_intersection)); |
| } |
| if (master_intersection.empty()) { |
| LOG(INFO) << Substitute("Skipping HMS table $0.$1 with different " |
| "masters specified: $2", hms_table.dbName, hms_table.tableName, |
| hms_masters_field ? *hms_masters_field : "<none>"); |
| continue; |
| } |
| } |
| |
| // If this is a non-legacy, synchronized table, we expect a table ID to exist |
| // in the HMS entry; look up the Kudu table by ID. Otherwise, look it up |
| // by table name. |
| shared_ptr<KuduTable>* kudu_table; |
| const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey]; |
| const string& hms_table_id = hms_table.parameters[HmsClient::kKuduTableIdKey]; |
| const string& hms_table_name = hms_table.parameters[HmsClient::kKuduTableNameKey]; |
| if (storage_handler == HmsClient::kKuduStorageHandler && |
| HmsClient::IsSynchronized(hms_table)) { |
| kudu_table = FindOrNull(kudu_tables_by_id, hms_table_id); |
| // If there is no kudu table that matches the id, or the id doesn't exist, |
| // lookup the table by name. This handles synchronized tables created when HMS |
| // sync was off or tables with IDs out of sync. |
| if (!kudu_table) { |
| kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name); |
| } |
| } else { |
| kudu_table = FindOrNull(kudu_tables_by_name, hms_table_name); |
| } |
| |
| if (kudu_table) { |
| if (HmsClient::IsSynchronized(hms_table)) { |
| synchronized_hms_tables_by_id[(*kudu_table)->id()].emplace_back( |
| std::move(hms_table)); |
| } else if (hms_table.tableType == HmsClient::kExternalTable) { |
| external_hms_tables_by_id[(*kudu_table)->id()].emplace_back( |
| std::move(hms_table)); |
| } |
| } else if (HmsClient::IsSynchronized(hms_table)) { |
| // Note: we only consider synchronized HMS table entries "orphans" |
| // because unsynchronized external tables don't always point at valid |
| // Kudu tables. |
| orphan_hms_tables.emplace_back(std::move(hms_table)); |
| } |
| } |
| } |
| |
| // Step 3: Determine the state of each Kudu table's HMS entry(ies), and bin |
| // them appropriately. |
| vector<pair<shared_ptr<KuduTable>, hive::Table>> legacy_tables; |
| vector<pair<shared_ptr<KuduTable>, hive::Table>> duplicate_tables; |
| vector<pair<shared_ptr<KuduTable>, hive::Table>> stale_tables; |
| vector<shared_ptr<KuduTable>> missing_tables; |
| vector<shared_ptr<KuduTable>> invalid_name_tables; |
| for (auto& kudu_table_pair : kudu_tables_by_id) { |
| shared_ptr<KuduTable> kudu_table = kudu_table_pair.second; |
| // Check all of the synchronized HMS tables. |
| vector<hive::Table>* hms_tables = FindOrNull(synchronized_hms_tables_by_id, |
| kudu_table_pair.first); |
| // If the there are no synchronized HMS table entries, this table is missing |
| // HMS tables and might have an invalid table name. |
| if (!hms_tables) { |
| const string& table_name = kudu_table->name(); |
| string normalized_table_name(table_name.data(), table_name.size()); |
| Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name); |
| if (!s.ok()) { |
| invalid_name_tables.emplace_back(std::move(kudu_table)); |
| } else { |
| missing_tables.emplace_back(std::move(kudu_table)); |
| } |
| // If there is a single synchronized HMS table, this table could be unsynced or |
| // using the legacy handler. |
| } else if (hms_tables->size() == 1) { |
| hive::Table& hms_table = (*hms_tables)[0]; |
| const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey]; |
| if (storage_handler == HmsClient::kKuduStorageHandler && |
| !IsSynced(kudu_master_addrs, *kudu_table, hms_table)) { |
| stale_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table))); |
| } else if (storage_handler == HmsClient::kLegacyKuduStorageHandler) { |
| legacy_tables.emplace_back(make_pair(std::move(kudu_table), std::move(hms_table))); |
| } |
| // Otherwise, there are multiple synchronized HMS tables for a single Kudu table. |
| } else { |
| for (hive::Table& hms_table : *hms_tables) { |
| duplicate_tables.emplace_back(make_pair(kudu_table, std::move(hms_table))); |
| } |
| } |
| } |
| |
| // Check all of the external HMS tables to see if they are using the legacy handler. |
| for (auto& hms_table_pair : external_hms_tables_by_id) { |
| shared_ptr<KuduTable>* kudu_table = FindOrNull(kudu_tables_by_id, hms_table_pair.first); |
| if (kudu_table) { |
| for (hive::Table &hms_table : hms_table_pair.second) { |
| const string &storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey]; |
| if (storage_handler == HmsClient::kLegacyKuduStorageHandler) { |
| legacy_tables.emplace_back(make_pair(std::move(*kudu_table), std::move(hms_table))); |
| } |
| } |
| } |
| } |
| report->orphan_hms_tables.swap(orphan_hms_tables); |
| report->missing_hms_tables.swap(missing_tables); |
| report->invalid_name_tables.swap(invalid_name_tables); |
| report->inconsistent_tables.swap(stale_tables); |
| report->legacy_hms_tables.swap(legacy_tables); |
| report->duplicate_hms_tables.swap(duplicate_tables); |
| return Status::OK(); |
| } |
| |
| Status CheckHmsMetadata(const RunnerContext& context) { |
| // TODO(dan): check that the critical HMS configuration flags |
| // (--hive_metastore_uris, --hive_metastore_sasl_enabled) match on all masters. |
| shared_ptr<KuduClient> kudu_client; |
| unique_ptr<HmsCatalog> hms_catalog; |
| string master_addrs; |
| RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs)); |
| |
| CatalogReport report; |
| RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report)); |
| |
| if (report.empty()) { |
| return Status::OK(); |
| } |
| |
| if (!report.invalid_name_tables.empty()) { |
| cout << "Found Kudu table(s) with Hive-incompatible names:" << endl; |
| RETURN_NOT_OK(PrintKuduTables(master_addrs, report.invalid_name_tables, cout)); |
| cout << endl |
| << "Suggestion: rename the Kudu table(s) to be Hive-compatible, then run the fix tool:" |
| << endl; |
| for (const auto& table : report.invalid_name_tables) { |
| cout << "\t$ kudu table rename_table --modify_external_catalogs=false " |
| << master_addrs << " " << table->name() << " <database-name>.<table-name>" << endl; |
| } |
| cout << endl; |
| } |
| |
| if (!report.duplicate_hms_tables.empty()) { |
| vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables; |
| for (auto& table : report.duplicate_hms_tables) { |
| tables.emplace_back(table.first, &table.second); |
| } |
| cout << "Found Kudu table(s) with multiple corresponding Hive Metastore tables:" << endl; |
| RETURN_NOT_OK(PrintTables(master_addrs, std::move(tables), cout)); |
| cout << endl |
| << "Suggestion: using Impala or the Hive Beeline shell, alter one of the duplicate" |
| << endl |
| << "Hive Metastore tables to be an external table referencing the base Kudu table." |
| << endl |
| << endl; |
| } |
| |
| if (!report.orphan_hms_tables.empty() || !report.missing_hms_tables.empty() |
| || !report.legacy_hms_tables.empty() || !report.inconsistent_tables.empty()) { |
| vector<pair<shared_ptr<KuduTable>, hive::Table*>> tables; |
| for (const auto& kudu_table : report.missing_hms_tables) { |
| tables.emplace_back(kudu_table, nullptr); |
| } |
| for (auto& table : report.legacy_hms_tables) { |
| tables.emplace_back(table.first, &table.second); |
| } |
| for (auto& table : report.inconsistent_tables) { |
| tables.emplace_back(table.first, &table.second); |
| } |
| for (auto& hms_table : report.orphan_hms_tables) { |
| tables.emplace_back(shared_ptr<KuduTable>(), &hms_table); |
| } |
| cout << "Found table(s) with missing or inconsisentent metadata in the Kudu " |
| "catalog or Hive Metastore:" << endl; |
| RETURN_NOT_OK(PrintTables(master_addrs, std::move(tables), cout)); |
| |
| if (report.orphan_hms_tables.empty()) { |
| cout << endl |
| << "Suggestion: use the fix tool to repair the Kudu and Hive Metastore catalog metadata:" |
| << endl |
| << "\t$ kudu hms fix " << master_addrs << endl; |
| } else { |
| cout << endl |
| << "Suggestion: use the fix tool to repair the Kudu and Hive Metastore catalog metadata" |
| << endl |
| << "and drop Hive Metastore table(s) which reference non-existent Kudu tables:" << endl |
| << "\t$ kudu hms fix --drop_orphan_hms_tables " << master_addrs << endl; |
| } |
| } |
| |
| // TODO(dan): add a link to the HMS guide on kudu.apache.org to this message. |
| return Status::IllegalState("found inconsistencies in the Kudu and HMS catalogs"); |
| } |
| |
| // Pretty-prints the table name and ID. |
| string TableIdent(const KuduTable& table) { |
| return Substitute("$0 [id=$1]", table.name(), table.id()); |
| } |
| |
| // Analyzes the Kudu and HMS catalogs and attempts to fix any |
| // automatically-fixable issues. |
| // |
| // Error handling: unexpected errors (e.g. networking errors) are fatal and |
| // result in returning early. Expected application failures such as a rename |
| // failing due to duplicate table being present are logged and execution |
| // continues. |
| Status FixHmsMetadata(const RunnerContext& context) { |
| shared_ptr<KuduClient> kudu_client; |
| unique_ptr<HmsCatalog> hms_catalog; |
| string master_addrs; |
| RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs)); |
| |
| CatalogReport report; |
| int kudu_catalog_count = 0; |
| int hms_catalog_count = 0; |
| RETURN_NOT_OK(AnalyzeCatalogs(master_addrs, hms_catalog.get(), kudu_client.get(), &report, |
| &kudu_catalog_count, &hms_catalog_count)); |
| if (FLAGS_dryrun && kudu_catalog_count == 0) { |
| LOG(INFO) << "NOTE: There are zero kudu tables listed. If the cluster indeed has kudu tables " |
| "please re-run the command with right credentials." << endl; |
| } |
| bool success = true; |
| |
| if (FLAGS_drop_orphan_hms_tables) { |
| for (hive::Table& hms_table : report.orphan_hms_tables) { |
| string table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName); |
| const string& master_addrs_param = hms_table.parameters[HmsClient::kKuduMasterAddrsKey]; |
| |
| // Normalize the master addresses to allow for an equality check that ignores |
| // missing default ports, duplicate addresses, and address order. |
| UnorderedHostPortSet param_set; |
| MasterAddressesToSet(master_addrs_param, ¶m_set); |
| UnorderedHostPortSet cluster_set; |
| MasterAddressesToSet(master_addrs, &cluster_set); |
| |
| if (param_set != cluster_set && !FLAGS_force) { |
| LOG(INFO) << "Skipping drop of orphan HMS table " << table_name |
| << " with master addresses parameter " << master_addrs_param |
| << " because it does not match the --" << kMasterAddressesArg << " argument" |
| << " (use --force to skip this check)"; |
| continue; |
| } |
| |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Dropping orphan HMS table " << table_name; |
| } else { |
| const string& table_id = hms_table.parameters[HmsClient::kKuduTableIdKey]; |
| const string& storage_handler = hms_table.parameters[HmsClient::kStorageHandlerKey]; |
| // All errors are fatal here, since we've already checked that the table exists in the HMS. |
| if (storage_handler == HmsClient::kKuduStorageHandler) { |
| RETURN_NOT_OK_PREPEND(hms_catalog->DropTable(table_id, table_name), |
| Substitute("failed to drop orphan HMS table $0", table_name)); |
| } else { |
| RETURN_NOT_OK_PREPEND(hms_catalog->DropLegacyTable(table_name), |
| Substitute("failed to drop legacy orphan HMS table $0", table_name)); |
| } |
| } |
| } |
| } |
| |
| if (FLAGS_create_missing_hms_tables) { |
| for (const auto& kudu_table : report.missing_hms_tables) { |
| const string& table_id = kudu_table->id(); |
| const string& cluster_id = kudu_table->client()->cluster_id(); |
| const string& table_name = kudu_table->name(); |
| auto schema = KuduSchema::ToSchema(kudu_table->schema()); |
| string normalized_table_name(table_name.data(), table_name.size()); |
| CHECK_OK(hms::HmsCatalog::NormalizeTableName(&normalized_table_name)); |
| |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Creating HMS table for Kudu table " << TableIdent(*kudu_table); |
| } else { |
| Status s = hms_catalog->CreateTable(table_id, table_name, cluster_id, |
| kudu_table->owner(), schema, kudu_table->comment()); |
| if (s.IsAlreadyPresent()) { |
| LOG(ERROR) << "Failed to create HMS table for Kudu table " |
| << TableIdent(*kudu_table) |
| << " because another table already exists in the HMS with that name"; |
| success = false; |
| continue; |
| } |
| if (s.IsInvalidArgument()) { |
| // This most likely means the database doesn't exist, but it is ambiguous. |
| LOG(ERROR) << "Failed to create HMS table for Kudu table " |
| << TableIdent(*kudu_table) |
| << " (database does not exist?): " << s.message().ToString(); |
| success = false; |
| continue; |
| } |
| // All other errors are unexpected. |
| RETURN_NOT_OK_PREPEND(s, |
| Substitute("failed to create HMS table for Kudu table $0", TableIdent(*kudu_table))); |
| } |
| |
| if (normalized_table_name != table_name) { |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(*kudu_table) |
| << " to lowercased Hive-compatible name: " << normalized_table_name; |
| } else { |
| // All errors are fatal. We never expect to get an 'AlreadyPresent' |
| // error, since the catalog manager validates that no two |
| // Hive-compatible table names differ only by case. |
| // |
| // Note that if an error occurs we do not roll-back the HMS table |
| // creation step, since a subsequent run of the tool will recognize |
| // the table as an inconsistent table (Kudu and HMS table names do not |
| // match), and automatically fix it. |
| RETURN_NOT_OK_PREPEND( |
| RenameTableInKuduCatalog(kudu_client.get(), table_name, normalized_table_name), |
| Substitute("failed to rename Kudu table $0 to lowercased Hive compatible name $1", |
| TableIdent(*kudu_table), normalized_table_name)); |
| } |
| } |
| } |
| } |
| |
| if (FLAGS_upgrade_hms_tables) { |
| for (const auto& table_pair : report.legacy_hms_tables) { |
| const KuduTable& kudu_table = *table_pair.first; |
| const hive::Table& hms_table = table_pair.second; |
| string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName); |
| |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Upgrading legacy Impala HMS metadata for table " |
| << hms_table_name; |
| } else { |
| RETURN_NOT_OK_PREPEND(hms_catalog->UpgradeLegacyImpalaTable( |
| kudu_table.id(), kudu_table.client()->cluster_id(), hms_table.dbName, |
| hms_table.tableName, KuduSchema::ToSchema(kudu_table.schema()), |
| kudu_table.comment()), |
| Substitute("failed to upgrade legacy Impala HMS metadata for table $0", |
| hms_table_name)); |
| } |
| |
| if (HmsClient::IsSynchronized(hms_table) && kudu_table.name() != hms_table_name) { |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table) |
| << " to " << hms_table_name; |
| } else { |
| Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name); |
| if (s.IsAlreadyPresent()) { |
| LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table) |
| << " to match the Hive Metastore name " << hms_table_name |
| << ", because a Kudu table with name" << hms_table_name |
| << " already exists"; |
| LOG(INFO) << "Suggestion: rename the conflicting table name manually:\n" |
| << "\t$ kudu table rename_table --modify_external_catalogs=false " |
| << master_addrs << " " << hms_table_name << " <database-name>.<table-name>'"; |
| success = false; |
| continue; |
| } |
| |
| // All other errors are fatal. Note that if an error occurs we do not |
| // roll-back the HMS legacy upgrade step, since a subsequent run of |
| // the tool will recognize the table as an inconsistent table (Kudu |
| // and HMS table names do not match), and automatically fix it. |
| RETURN_NOT_OK_PREPEND(s, |
| Substitute("failed to rename Kudu table $0 to $1", |
| TableIdent(kudu_table), hms_table_name)); |
| } |
| } |
| } |
| } |
| |
| if (FLAGS_fix_inconsistent_tables) { |
| for (const auto& table_pair : report.inconsistent_tables) { |
| const KuduTable& kudu_table = *table_pair.first; |
| const hive::Table& hms_table = table_pair.second; |
| string hms_table_name = Substitute("$0.$1", hms_table.dbName, hms_table.tableName); |
| string owner = kudu_table.owner(); |
| string comment = kudu_table.comment(); |
| |
| if (hms_table_name != kudu_table.name()) { |
| // Update the Kudu table name to match the HMS table name. |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Renaming Kudu table " << TableIdent(kudu_table) |
| << " to " << hms_table_name; |
| } else { |
| Status s = RenameTableInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table_name); |
| if (s.IsAlreadyPresent()) { |
| LOG(ERROR) << "Failed to rename Kudu table " << TableIdent(kudu_table) |
| << " to match HMS table " << hms_table_name |
| << ", because a Kudu table with name " << hms_table_name |
| << " already exists"; |
| success = false; |
| continue; |
| } |
| RETURN_NOT_OK_PREPEND(s, |
| Substitute("failed to rename Kudu table $0 to $1", |
| TableIdent(kudu_table), hms_table_name)); |
| } |
| } |
| |
| // If the HMS table has an owner and Kudu does not, update the Kudu table owner to match |
| // the HMS table owner. Otherwise the metadata step below will ensure the Kudu owner |
| // is updated in the HMS. |
| if (hms_table.owner != owner && owner.empty()) { |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Changing owner of " << TableIdent(kudu_table) |
| << " to " << hms_table.owner << " in Kudu catalog."; |
| } else { |
| RETURN_NOT_OK_PREPEND( |
| ChangeOwnerInKuduCatalog(kudu_client.get(), kudu_table.name(), hms_table.owner), |
| Substitute("failed to change owner of $0 to $1 in Kudu catalog", |
| TableIdent(kudu_table), hms_table.owner)); |
| owner = hms_table.owner; |
| } |
| } |
| |
| // If the HMS table has a table comment and Kudu does not, update the Kudu table comment |
| // to match the HMS table comment. Otherwise the metadata step below will ensure the Kudu |
| // comment is updated in the HMS. |
| const string hms_table_comment = |
| FindWithDefault(hms_table.parameters, hms::HmsClient::kTableCommentKey, ""); |
| if (hms_table_comment != comment && comment.empty()) { |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Changing table comment for " << TableIdent(kudu_table) |
| << " to " << hms_table_comment << " in Kudu catalog."; |
| } else { |
| RETURN_NOT_OK_PREPEND(ChangeTableCommentInKuduCatalog(kudu_client.get(), |
| kudu_table.name(), hms_table_comment), |
| Substitute("failed to change table comment of $0 to $1 in Kudu catalog", |
| TableIdent(kudu_table), hms_table_comment)); |
| comment = hms_table_comment; |
| } |
| } |
| |
| // Update the HMS table metadata to match Kudu. |
| if (FLAGS_dryrun) { |
| LOG(INFO) << "[dryrun] Refreshing HMS table metadata for Kudu table " |
| << TableIdent(kudu_table); |
| } else { |
| auto schema = KuduSchema::ToSchema(kudu_table.schema()); |
| RETURN_NOT_OK_PREPEND( |
| // Disable table ID checking to support fixing tables with unsynchronized IDs. |
| hms_catalog->AlterTable(kudu_table.id(), hms_table_name, hms_table_name, |
| kudu_table.client()->cluster_id(), owner, schema, |
| comment, /* check_id */ false), |
| Substitute("failed to refresh HMS table metadata for Kudu table $0", |
| TableIdent(kudu_table))); |
| } |
| } |
| } |
| LOG(INFO) << Substitute("Number of Kudu tables found in Kudu master catalog: $0", |
| kudu_catalog_count) << endl; |
| LOG(INFO) << Substitute("Number of Kudu tables found in HMS catalog: $0", hms_catalog_count) |
| << endl; |
| if (FLAGS_dryrun || success) { |
| return Status::OK(); |
| } |
| return Status::RuntimeError("Failed to fix some catalog metadata inconsistencies"); |
| } |
| |
| Status List(const RunnerContext& context) { |
| shared_ptr<KuduClient> kudu_client; |
| unique_ptr<HmsCatalog> hms_catalog; |
| string master_addrs; |
| RETURN_NOT_OK(Init(context, &kudu_client, &hms_catalog, &master_addrs)); |
| |
| vector<hive::Table> hms_tables; |
| RETURN_NOT_OK(hms_catalog->GetKuduTables(&hms_tables)); |
| |
| return PrintHMSTables(hms_tables, cout); |
| } |
| |
| Status Precheck(const RunnerContext& context) { |
| string master_addrs; |
| RETURN_NOT_OK(ParseMasterAddressesStr(context, &master_addrs)); |
| shared_ptr<KuduClient> client; |
| const vector<string> master_addresses(Split(master_addrs, ",")); |
| RETURN_NOT_OK(CreateKuduClient(master_addresses, &client)); |
| vector<string> tables; |
| RETURN_NOT_OK(client->ListTables(&tables)); |
| |
| // Map of normalized table name to table names. |
| bool conflicting_tables = false; |
| unordered_map<string, vector<string>> normalized_tables; |
| for (string& table : tables) { |
| string normalized_table_name(table.data(), table.size()); |
| Status s = hms::HmsCatalog::NormalizeTableName(&normalized_table_name); |
| if (!s.ok()) { |
| // This is not a Hive-compatible table name, so there can't be a conflict |
| // among normalized names (see CatalogManager::NormalizeTableName). |
| continue; |
| } |
| vector<string>& tables = normalized_tables[normalized_table_name]; |
| tables.emplace_back(std::move(table)); |
| conflicting_tables |= tables.size() > 1; |
| } |
| |
| if (!conflicting_tables) { |
| return Status::OK(); |
| } |
| |
| DataTable data_table({ "conflicting table names" }); |
| for (auto& table : normalized_tables) { |
| if (table.second.size() > 1) { |
| for (string& kudu_table : table.second) { |
| data_table.AddRow({ std::move(kudu_table) }); |
| } |
| } |
| } |
| |
| cout << "Found Kudu tables whose names are not unique according to Hive's case-insensitive" |
| << endl |
| << "identifier requirements. These conflicting tables will cause master startup to" << endl |
| << "fail when the Hive Metastore integration is enabled:"; |
| RETURN_NOT_OK(data_table.PrintTo(cout)); |
| cout << endl |
| << "Suggestion: rename the conflicting tables to case-insensitive unique names:" << endl |
| << "\t$ kudu table rename_table " << master_addrs |
| << " <conflicting_table_name> <new_table_name>" << endl; |
| |
| return Status::IllegalState("found tables in Kudu with case-conflicting names"); |
| } |
| |
| unique_ptr<Mode> BuildHmsMode() { |
| const string kHmsUrisDesc = |
| "Address of the Hive Metastore instance(s). If not set, the configuration " |
| "from the Kudu master is used, so this flag should not be overriden in typical " |
| "situations. The provided port must be for the HMS Thrift service. " |
| "If a port is not provided, defaults to 9083. If the HMS is deployed in an " |
| "HA configuration, multiple comma-separated addresses should be supplied. " |
| "The configured value must match the Hive hive.metastore.uris configuration."; |
| |
| const string kHmsSaslEnabledDesc = |
| "Configures whether Thrift connections to the Hive Metastore use SASL " |
| "(Kerberos) security. Only takes effect when --hive_metastore_uris is set, " |
| "otherwise the configuration from the Kudu master is used. The configured " |
| "value must match the the hive.metastore.sasl.enabled option in the Hive " |
| "Metastore configuration."; |
| |
| unique_ptr<Action> hms_check = |
| ClusterActionBuilder("check", &CheckHmsMetadata) |
| .Description("Check metadata consistency between Kudu and the Hive Metastore catalogs") |
| .AddOptionalParameter("hive_metastore_sasl_enabled", nullopt, kHmsSaslEnabledDesc) |
| .AddOptionalParameter("hive_metastore_uris", nullopt, kHmsUrisDesc) |
| .AddOptionalParameter("ignore_other_clusters") |
| .Build(); |
| |
| unique_ptr<Action> hms_downgrade = |
| ClusterActionBuilder("downgrade", &HmsDowngrade) |
| .Description("Downgrade the metadata to legacy format for Kudu and the Hive Metastores") |
| .AddOptionalParameter("hive_metastore_sasl_enabled", nullopt, kHmsSaslEnabledDesc) |
| .AddOptionalParameter("hive_metastore_uris", nullopt, kHmsUrisDesc) |
| .Build(); |
| |
| unique_ptr<Action> hms_fix = |
| ClusterActionBuilder("fix", &FixHmsMetadata) |
| .Description("Fix automatically-repairable metadata inconsistencies in the " |
| "Kudu and Hive Metastore catalogs") |
| .AddOptionalParameter("dryrun") |
| .AddOptionalParameter("drop_orphan_hms_tables") |
| .AddOptionalParameter("create_missing_hms_tables") |
| .AddOptionalParameter("fix_inconsistent_tables") |
| .AddOptionalParameter("upgrade_hms_tables") |
| .AddOptionalParameter("hive_metastore_sasl_enabled", nullopt, kHmsSaslEnabledDesc) |
| .AddOptionalParameter("hive_metastore_uris", nullopt, kHmsUrisDesc) |
| .AddOptionalParameter("ignore_other_clusters") |
| .Build(); |
| |
| unique_ptr<Action> hms_list = |
| ClusterActionBuilder("list", &List) |
| .Description("List the Kudu table HMS entries") |
| .AddOptionalParameter("columns", |
| Substitute("database,table,type,$0", |
| HmsClient::kKuduTableNameKey), |
| Substitute("Comma-separated list of HMS entry fields to " |
| "include in output.\nPossible values: database, " |
| "table, type, owner, comment, $0, $1, $2, $3, $4", |
| HmsClient::kKuduTableNameKey, |
| HmsClient::kKuduTableIdKey, |
| HmsClient::kKuduClusterIdKey, |
| HmsClient::kKuduMasterAddrsKey, |
| HmsClient::kStorageHandlerKey)) |
| .AddOptionalParameter("format") |
| .Build(); |
| |
| unique_ptr<Action> hms_precheck = |
| ClusterActionBuilder("precheck", &Precheck) |
| .Description("Check that the Kudu cluster is prepared to enable " |
| "the Hive Metastore integration") |
| .Build(); |
| |
| return ModeBuilder("hms").Description("Operate on remote Hive Metastores") |
| .AddAction(std::move(hms_check)) |
| .AddAction(std::move(hms_downgrade)) |
| .AddAction(std::move(hms_fix)) |
| .AddAction(std::move(hms_list)) |
| .AddAction(std::move(hms_precheck)) |
| .Build(); |
| } |
| |
| } // namespace tools |
| } // namespace kudu |