| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/hms/hms_client.h" |
| |
| #include <algorithm> |
| #include <exception> |
| #include <initializer_list> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <thrift/TApplicationException.h> |
| #include <thrift/Thrift.h> |
| #include <thrift/protocol/TJSONProtocol.h> |
| #include <thrift/protocol/TProtocol.h> |
| #include <thrift/transport/TBufferTransports.h> |
| #include <thrift/transport/TTransport.h> |
| #include <thrift/transport/TTransportException.h> |
| |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/strip.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/hms/ThriftHiveMetastore.h" |
| #include "kudu/hms/hive_metastore_types.h" |
| #include "kudu/thrift/client.h" |
| #include "kudu/thrift/sasl_client_transport.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/string_case.h" |
| |
| DEFINE_bool(disable_hms_incompatible_column_type_check, false, |
| "Whether or not to disable the check for the " |
| "'hive.metastore.disallow.incompatible.col.type.changes' " |
| "configuration in the Hive Metastore. This check is important to " |
| "ensure Hive will allow dropping columns, but may not be necessary " |
| "in some versions of Hive. See HIVE-24987 for more details."); |
| TAG_FLAG(disable_hms_incompatible_column_type_check, advanced); |
| TAG_FLAG(disable_hms_incompatible_column_type_check, hidden); |
| |
| using apache::thrift::TApplicationException; |
| using apache::thrift::TException; |
| using apache::thrift::protocol::TJSONProtocol; |
| using apache::thrift::transport::TMemoryBuffer; |
| using apache::thrift::transport::TTransportException; |
| using kudu::iequals; |
| using kudu::thrift::ClientOptions; |
| using kudu::thrift::CreateClientProtocol; |
| using kudu::thrift::SaslException; |
| using std::shared_ptr; |
| using std::string; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace hms { |
| |
| // The entire set of Hive-specific exceptions is defined in |
| // hive_metastore.thrift. We do not try to handle all of them - TException acts |
| // as a catch all, as well as default for network errors. |
| #define HMS_RET_NOT_OK(call, msg) \ |
| try { \ |
| (call); \ |
| } catch (const hive::AlreadyExistsException& e) { \ |
| return Status::AlreadyPresent((msg), e.what()); \ |
| } catch (const hive::UnknownDBException& e) { \ |
| return Status::NotFound((msg), e.what()); \ |
| } catch (const hive::UnknownTableException& e) { \ |
| return Status::NotFound((msg), e.what()); \ |
| } catch (const hive::NoSuchObjectException& e) { \ |
| return Status::NotFound((msg), e.what()); \ |
| } catch (const hive::InvalidObjectException& e) { \ |
| return Status::InvalidArgument((msg), e.what()); \ |
| } catch (const hive::InvalidOperationException& e) { \ |
| return Status::IllegalState((msg), e.what()); \ |
| } catch (const hive::MetaException& e) { \ |
| return Status::RemoteError((msg), e.what()); \ |
| } catch (const SaslException& e) { \ |
| return e.status().CloneAndPrepend((msg)); \ |
| } catch (const TApplicationException& e) { \ |
| switch (e.getType()) { \ |
| case TApplicationException::UNKNOWN_METHOD: \ |
| return Status::NotSupported((msg), e.what()); \ |
| default: return Status::RemoteError((msg), e.what()); \ |
| } \ |
| } catch (const TTransportException& e) { \ |
| switch (e.getType()) { \ |
| case TTransportException::TIMED_OUT: return Status::TimedOut((msg), e.what()); \ |
| case TTransportException::BAD_ARGS: return Status::InvalidArgument((msg), e.what()); \ |
| case TTransportException::CORRUPTED_DATA: return Status::Corruption((msg), e.what()); \ |
| default: return Status::NetworkError((msg), e.what()); \ |
| } \ |
| } catch (const TException& e) { \ |
| return Status::IOError((msg), e.what()); \ |
| } catch (const std::exception& e) { \ |
| return Status::RuntimeError((msg), e.what()); \ |
| } |
| |
| const char* const HmsClient::kLegacyKuduStorageHandler = |
| "com.cloudera.kudu.hive.KuduStorageHandler"; |
| const char* const HmsClient::kLegacyTablePrefix = "impala::"; |
| const char* const HmsClient::kKuduTableIdKey = "kudu.table_id"; |
| const char* const HmsClient::kKuduClusterIdKey = "kudu.cluster_id"; |
| const char* const HmsClient::kKuduTableNameKey = "kudu.table_name"; |
| const char* const HmsClient::kKuduMasterAddrsKey = "kudu.master_addresses"; |
| const char* const HmsClient::kKuduMasterEventKey = "kudu.master_event"; |
| const char* const HmsClient::kKuduCheckIdKey = "kudu.check_id"; |
| const char* const HmsClient::kKuduStorageHandler = |
| "org.apache.hadoop.hive.kudu.KuduStorageHandler"; |
| |
| const char* const HmsClient::kTransactionalEventListeners = |
| "hive.metastore.transactional.event.listeners"; |
| const char* const HmsClient::kDisallowIncompatibleColTypeChanges = |
| "hive.metastore.disallow.incompatible.col.type.changes"; |
| const char* const HmsClient::kDbNotificationListener = |
| "org.apache.hive.hcatalog.listener.DbNotificationListener"; |
| const char* const HmsClient::kExternalTableKey = "EXTERNAL"; |
| const char* const HmsClient::kExternalPurgeKey = "external.table.purge"; |
| const char* const HmsClient::kStorageHandlerKey = "storage_handler"; |
| const char* const HmsClient::kTableCommentKey = "comment"; |
| const char* const HmsClient::kKuduMetastorePlugin = |
| "org.apache.kudu.hive.metastore.KuduMetastorePlugin"; |
| const char* const HmsClient::kHiveFilterFieldParams = "hive_filter_field_params__"; |
| const char* const HmsClient::kNotificationAddThriftObjects = |
| "hive.metastore.notifications.add.thrift.objects"; |
| |
| const char* const HmsClient::kManagedTable = "MANAGED_TABLE"; |
| const char* const HmsClient::kExternalTable = "EXTERNAL_TABLE"; |
| |
| const uint16_t HmsClient::kDefaultHmsPort = 9083; |
| |
| const int kSlowExecutionWarningThresholdMs = 1000; |
| |
| const char* const HmsClient::kServiceName = "Hive Metastore"; |
| |
| HmsClient::HmsClient(const HostPort& address, const ClientOptions& options) |
| : verify_kudu_sync_config_(options.verify_service_config), |
| client_(hive::ThriftHiveMetastoreClient(CreateClientProtocol(address, options))) { |
| } |
| |
| HmsClient::~HmsClient() { |
| WARN_NOT_OK(Stop(), "failed to shutdown HMS client"); |
| } |
| |
| Status HmsClient::Start() { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "starting HMS client"); |
| HMS_RET_NOT_OK(client_.getOutputProtocol()->getTransport()->open(), |
| "failed to open Hive Metastore connection"); |
| |
| if (!verify_kudu_sync_config_) { |
| return Status::OK(); |
| } |
| |
| // Immediately after connecting to the HMS, check that it is configured with |
| // the required event listeners. |
| string event_listener_config; |
| HMS_RET_NOT_OK(client_.get_config_value(event_listener_config, kTransactionalEventListeners, ""), |
| Substitute("failed to get Hive Metastore $0 configuration", |
| kTransactionalEventListeners)); |
| |
| // Parse the set of listeners from the configuration string. |
| vector<string> listeners = strings::Split(event_listener_config, ",", strings::SkipWhitespace()); |
| for (auto& listener : listeners) { |
| StripWhiteSpace(&listener); |
| } |
| |
| for (const auto& required_listener : { kDbNotificationListener, kKuduMetastorePlugin }) { |
| if (std::find(listeners.begin(), listeners.end(), required_listener) == listeners.end()) { |
| return Status::IllegalState( |
| Substitute("Hive Metastore configuration is missing required " |
| "transactional event listener ($0): $1", |
| kTransactionalEventListeners, required_listener)); |
| } |
| } |
| |
| // Also check that the HMS is configured to allow changing the type of |
| // columns, which is required to support dropping columns. File-based Hive |
| // tables handle columns by offset, and removing a column simply shifts all |
| // remaining columns down by one. The actual data is not changed, but instead |
| // reassigned to the next column. If the HMS is configured to check column |
| // type changes it will validate that the existing data matches the shifted |
| // column layout. This is overly strict for Kudu, since we handle DDL |
| // operations properly. |
| // |
| // See org.apache.hadoop.hive.metastore.MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange. |
| if (!FLAGS_disable_hms_incompatible_column_type_check) { |
| string disallow_incompatible_column_type_changes; |
| HMS_RET_NOT_OK(client_.get_config_value(disallow_incompatible_column_type_changes, |
| kDisallowIncompatibleColTypeChanges, |
| "false"), |
| Substitute("failed to get Hive Metastore $0 configuration", |
| kDisallowIncompatibleColTypeChanges)); |
| |
| if (iequals(disallow_incompatible_column_type_changes, "true")) { |
| return Status::IllegalState(Substitute( |
| "Hive Metastore configuration is invalid: $0 must be set to false", |
| kDisallowIncompatibleColTypeChanges)); |
| } |
| } |
| |
| // Check that the HMS is configured to add the entire thrift Table/Partition |
| // objects to the HMS notifications, which is required to properly parse the |
| // HMS notification log. This is specific to the HMS version shipped in |
| // Cloudera's CDH. |
| string thrift_objects_config; |
| HMS_RET_NOT_OK(client_.get_config_value(thrift_objects_config, |
| kNotificationAddThriftObjects, |
| "true"), |
| Substitute("failed to get Hive Metastore $0 configuration", |
| kNotificationAddThriftObjects)); |
| if (iequals(thrift_objects_config, "false")) { |
| return Status::IllegalState(Substitute( |
| "Hive Metastore configuration is invalid: $0 must be set to true", |
| kNotificationAddThriftObjects)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status HmsClient::Stop() { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "stopping HMS client"); |
| HMS_RET_NOT_OK(client_.getInputProtocol()->getTransport()->close(), |
| "failed to close Hive Metastore connection"); |
| return Status::OK(); |
| } |
| |
| bool HmsClient::IsConnected() { |
| return client_.getInputProtocol()->getTransport()->isOpen(); |
| } |
| |
| Status HmsClient::CreateDatabase(const hive::Database& database) { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "create HMS database"); |
| HMS_RET_NOT_OK(client_.create_database(database), "failed to create Hive Metastore database"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::DropDatabase(const string& database_name, Cascade cascade) { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "drop HMS database"); |
| HMS_RET_NOT_OK(client_.drop_database(database_name, true, cascade == Cascade::kTrue), |
| "failed to drop Hive Metastore database"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetAllDatabases(vector<string>* databases) { |
| DCHECK(databases); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get all HMS databases"); |
| HMS_RET_NOT_OK(client_.get_all_databases(*databases), |
| "failed to get Hive Metastore databases"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetDatabase(const string& pattern, hive::Database* database) { |
| DCHECK(database); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS database"); |
| HMS_RET_NOT_OK(client_.get_database(*database, pattern), |
| "failed to get Hive Metastore database"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::CreateTable(const hive::Table& table, const hive::EnvironmentContext& env_ctx) { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "create HMS table"); |
| HMS_RET_NOT_OK(client_.create_table_with_environment_context(table, env_ctx), |
| "failed to create Hive MetaStore table"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::AlterTable(const std::string& database_name, |
| const std::string& table_name, |
| const hive::Table& table, |
| const hive::EnvironmentContext& env_ctx) { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "alter HMS table"); |
| HMS_RET_NOT_OK(client_.alter_table_with_environment_context(database_name, table_name, |
| table, env_ctx), |
| "failed to alter Hive MetaStore table"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::DropTable(const string& database_name, |
| const string& table_name, |
| const hive::EnvironmentContext& env_ctx) { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "drop HMS table"); |
| HMS_RET_NOT_OK(client_.drop_table_with_environment_context(database_name, table_name, |
| true, env_ctx), |
| "failed to drop Hive Metastore table"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetTableNames(const string& database_name, |
| vector<string>* table_names) { |
| DCHECK(table_names); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get all HMS tables"); |
| HMS_RET_NOT_OK(client_.get_all_tables(*table_names, database_name), |
| "failed to get Hive Metastore tables"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetTableNames(const std::string& database_name, |
| const std::string& filter, |
| std::vector<std::string>* table_names) { |
| DCHECK(table_names); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get filtered HMS tables"); |
| HMS_RET_NOT_OK(client_.get_table_names_by_filter(*table_names, database_name, |
| filter, /*max_tables*/ -1), |
| "failed to get filtered Hive Metastore tables"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetTable(const string& database_name, |
| const string& table_name, |
| hive::Table* table) { |
| DCHECK(table); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS table"); |
| HMS_RET_NOT_OK(client_.get_table(*table, database_name, table_name), |
| "failed to get Hive Metastore table"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetTables(const string& database_name, |
| const vector<string>& table_names, |
| vector<hive::Table>* tables) { |
| DCHECK(tables); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS tables"); |
| HMS_RET_NOT_OK(client_.get_table_objects_by_name(*tables, database_name, table_names), |
| "failed to get Hive Metastore tables"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetCurrentNotificationEventId(int64_t* event_id) { |
| DCHECK(event_id); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, |
| "get HMS current notification event ID"); |
| hive::CurrentNotificationEventId response; |
| HMS_RET_NOT_OK(client_.get_current_notificationEventId(response), |
| "failed to get Hive Metastore current event ID"); |
| *event_id = response.eventId; |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetNotificationEvents(int64_t last_event_id, |
| int32_t max_events, |
| vector<hive::NotificationEvent>* events) { |
| DCHECK(events); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, |
| "get HMS notification events"); |
| hive::NotificationEventRequest request; |
| request.lastEvent = last_event_id; |
| request.__set_maxEvents(max_events); |
| hive::NotificationEventResponse response; |
| HMS_RET_NOT_OK(client_.get_next_notification(response, request), |
| "failed to get Hive Metastore next notification"); |
| events->swap(response.events); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetUuid(string* uuid) { |
| DCHECK(uuid); |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, |
| "get HMS database UUID"); |
| HMS_RET_NOT_OK(client_.get_metastore_db_uuid(*uuid), |
| "failed to get HMS DB UUID"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::AddPartitions(const string& database_name, |
| const string& table_name, |
| vector<hive::Partition> partitions) { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "add HMS table partitions"); |
| hive::AddPartitionsRequest request; |
| hive::AddPartitionsResult response; |
| |
| request.dbName = database_name; |
| request.tblName = table_name; |
| request.parts = std::move(partitions); |
| |
| HMS_RET_NOT_OK(client_.add_partitions_req(response, request), |
| "failed to add Hive Metastore table partitions"); |
| return Status::OK(); |
| } |
| |
| Status HmsClient::GetPartitions(const string& database_name, |
| const string& table_name, |
| vector<hive::Partition>* partitions) { |
| SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS table partitions"); |
| HMS_RET_NOT_OK(client_.get_partitions(*partitions, database_name, table_name, -1), |
| "failed to get Hive Metastore table partitions"); |
| return Status::OK(); |
| } |
| |
| |
| Status HmsClient::DeserializeJsonTable(Slice json, hive::Table* table) { |
| auto membuffer(std::make_shared<TMemoryBuffer>(json.size())); |
| membuffer->write(json.data(), json.size()); |
| TJSONProtocol protocol(membuffer); |
| HMS_RET_NOT_OK(table->read(&protocol), "failed to deserialize JSON table"); |
| return Status::OK(); |
| } |
| |
| bool HmsClient::IsKuduTable(const hive::Table& table) { |
| const string* storage_handler = |
| FindOrNull(table.parameters, hms::HmsClient::kStorageHandlerKey); |
| if (!storage_handler) { |
| return false; |
| } |
| return *storage_handler == hms::HmsClient::kKuduStorageHandler; |
| } |
| |
| bool HmsClient::IsSynchronized(const hive::Table& table) { |
| const string externalPurge = |
| FindWithDefault(table.parameters, hms::HmsClient::kExternalPurgeKey, "false"); |
| return table.tableType == hms::HmsClient::kManagedTable || |
| (table.tableType == hms::HmsClient::kExternalTable && |
| iequals(externalPurge, "true")); |
| } |
| |
| } // namespace hms |
| } // namespace kudu |