blob: f531a46a3ddbb432cc4666a1f3a669e7dec17057 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/hms/hms_client.h"
#include <algorithm>
#include <exception>
#include <initializer_list>
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <thrift/TApplicationException.h>
#include <thrift/Thrift.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/protocol/TProtocol.h> // IWYU pragma: keep
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransport.h> // IWYU pragma: keep
#include <thrift/transport/TTransportException.h>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/strip.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/hms/ThriftHiveMetastore.h"
#include "kudu/hms/hive_metastore_types.h"
#include "kudu/thrift/client.h"
#include "kudu/thrift/sasl_client_transport.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/string_case.h"
DEFINE_bool(disable_hms_incompatible_column_type_check, false,
"Whether or not to disable the check for the "
"'hive.metastore.disallow.incompatible.col.type.changes' "
"configuration in the Hive Metastore. This check is important to "
"ensure Hive will allow dropping columns, but may not be necessary "
"in some versions of Hive. See HIVE-24987 for more details.");
TAG_FLAG(disable_hms_incompatible_column_type_check, advanced);
TAG_FLAG(disable_hms_incompatible_column_type_check, hidden);
using apache::thrift::TApplicationException;
using apache::thrift::TException;
using apache::thrift::protocol::TJSONProtocol;
using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TTransportException;
using kudu::iequals;
using kudu::thrift::ClientOptions;
using kudu::thrift::CreateClientProtocol;
using kudu::thrift::SaslException;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace hms {
// The entire set of Hive-specific exceptions is defined in
// hive_metastore.thrift. We do not try to handle all of them - TException acts
// as a catch all, as well as default for network errors.
#define HMS_RET_NOT_OK(call, msg) \
try { \
(call); \
} catch (const hive::AlreadyExistsException& e) { \
return Status::AlreadyPresent((msg), e.what()); \
} catch (const hive::UnknownDBException& e) { \
return Status::NotFound((msg), e.what()); \
} catch (const hive::UnknownTableException& e) { \
return Status::NotFound((msg), e.what()); \
} catch (const hive::NoSuchObjectException& e) { \
return Status::NotFound((msg), e.what()); \
} catch (const hive::InvalidObjectException& e) { \
return Status::InvalidArgument((msg), e.what()); \
} catch (const hive::InvalidOperationException& e) { \
return Status::IllegalState((msg), e.what()); \
} catch (const hive::MetaException& e) { \
return Status::RemoteError((msg), e.what()); \
} catch (const SaslException& e) { \
return e.status().CloneAndPrepend((msg)); \
} catch (const TApplicationException& e) { \
switch (e.getType()) { \
case TApplicationException::UNKNOWN_METHOD: \
return Status::NotSupported((msg), e.what()); \
default: return Status::RemoteError((msg), e.what()); \
} \
} catch (const TTransportException& e) { \
switch (e.getType()) { \
case TTransportException::TIMED_OUT: return Status::TimedOut((msg), e.what()); \
case TTransportException::BAD_ARGS: return Status::InvalidArgument((msg), e.what()); \
case TTransportException::CORRUPTED_DATA: return Status::Corruption((msg), e.what()); \
default: return Status::NetworkError((msg), e.what()); \
} \
} catch (const TException& e) { \
return Status::IOError((msg), e.what()); \
} catch (const std::exception& e) { \
return Status::RuntimeError((msg), e.what()); \
}
const char* const HmsClient::kLegacyKuduStorageHandler =
"com.cloudera.kudu.hive.KuduStorageHandler";
const char* const HmsClient::kLegacyTablePrefix = "impala::";
const char* const HmsClient::kKuduTableIdKey = "kudu.table_id";
const char* const HmsClient::kKuduClusterIdKey = "kudu.cluster_id";
const char* const HmsClient::kKuduTableNameKey = "kudu.table_name";
const char* const HmsClient::kKuduMasterAddrsKey = "kudu.master_addresses";
const char* const HmsClient::kKuduMasterEventKey = "kudu.master_event";
const char* const HmsClient::kKuduCheckIdKey = "kudu.check_id";
const char* const HmsClient::kKuduStorageHandler =
"org.apache.hadoop.hive.kudu.KuduStorageHandler";
const char* const HmsClient::kTransactionalEventListeners =
"hive.metastore.transactional.event.listeners";
const char* const HmsClient::kDisallowIncompatibleColTypeChanges =
"hive.metastore.disallow.incompatible.col.type.changes";
const char* const HmsClient::kDbNotificationListener =
"org.apache.hive.hcatalog.listener.DbNotificationListener";
const char* const HmsClient::kExternalTableKey = "EXTERNAL";
const char* const HmsClient::kExternalPurgeKey = "external.table.purge";
const char* const HmsClient::kStorageHandlerKey = "storage_handler";
const char* const HmsClient::kTableCommentKey = "comment";
const char* const HmsClient::kKuduMetastorePlugin =
"org.apache.kudu.hive.metastore.KuduMetastorePlugin";
const char* const HmsClient::kHiveFilterFieldParams = "hive_filter_field_params__";
const char* const HmsClient::kNotificationAddThriftObjects =
"hive.metastore.notifications.add.thrift.objects";
const char* const HmsClient::kKuduInputFormat ="org.apache.hadoop.hive.kudu.KuduInputFormat";
const char* const HmsClient::kKuduOutputFormat ="org.apache.hadoop.hive.kudu.KuduOutputFormat";
const char* const HmsClient::kKuduSerDeLib = "org.apache.hadoop.hive.kudu.KuduSerDe";
const char* const HmsClient::kManagedTable = "MANAGED_TABLE";
const char* const HmsClient::kExternalTable = "EXTERNAL_TABLE";
const uint16_t HmsClient::kDefaultHmsPort = 9083;
const int kSlowExecutionWarningThresholdMs = 1000;
const char* const HmsClient::kServiceName = "Hive Metastore";
HmsClient::HmsClient(const HostPort& address, const ClientOptions& options)
: verify_kudu_sync_config_(options.verify_service_config),
client_(hive::ThriftHiveMetastoreClient(CreateClientProtocol(address, options))) {
}
HmsClient::~HmsClient() {
WARN_NOT_OK(Stop(), "failed to shutdown HMS client");
}
Status HmsClient::Start() {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "starting HMS client");
HMS_RET_NOT_OK(client_.getOutputProtocol()->getTransport()->open(),
"failed to open Hive Metastore connection");
if (!verify_kudu_sync_config_) {
return Status::OK();
}
// Immediately after connecting to the HMS, check that it is configured with
// the required event listeners.
string event_listener_config;
HMS_RET_NOT_OK(client_.get_config_value(event_listener_config, kTransactionalEventListeners, ""),
Substitute("failed to get Hive Metastore $0 configuration",
kTransactionalEventListeners));
// Parse the set of listeners from the configuration string.
vector<string> listeners = strings::Split(event_listener_config, ",", strings::SkipWhitespace());
for (auto& listener : listeners) {
StripWhiteSpace(&listener);
}
for (const auto& required_listener : { kDbNotificationListener, kKuduMetastorePlugin }) {
if (std::find(listeners.begin(), listeners.end(), required_listener) == listeners.end()) {
return Status::IllegalState(
Substitute("Hive Metastore configuration is missing required "
"transactional event listener ($0): $1",
kTransactionalEventListeners, required_listener));
}
}
// Also check that the HMS is configured to allow changing the type of
// columns, which is required to support dropping columns. File-based Hive
// tables handle columns by offset, and removing a column simply shifts all
// remaining columns down by one. The actual data is not changed, but instead
// reassigned to the next column. If the HMS is configured to check column
// type changes it will validate that the existing data matches the shifted
// column layout. This is overly strict for Kudu, since we handle DDL
// operations properly.
//
// See org.apache.hadoop.hive.metastore.MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange.
if (!FLAGS_disable_hms_incompatible_column_type_check) {
string disallow_incompatible_column_type_changes;
HMS_RET_NOT_OK(client_.get_config_value(disallow_incompatible_column_type_changes,
kDisallowIncompatibleColTypeChanges,
"false"),
Substitute("failed to get Hive Metastore $0 configuration",
kDisallowIncompatibleColTypeChanges));
if (iequals(disallow_incompatible_column_type_changes, "true")) {
return Status::IllegalState(Substitute(
"Hive Metastore configuration is invalid: $0 must be set to false",
kDisallowIncompatibleColTypeChanges));
}
}
// Check that the HMS is configured to add the entire thrift Table/Partition
// objects to the HMS notifications, which is required to properly parse the
// HMS notification log. This is specific to the HMS version shipped in
// Cloudera's CDH.
string thrift_objects_config;
HMS_RET_NOT_OK(client_.get_config_value(thrift_objects_config,
kNotificationAddThriftObjects,
"true"),
Substitute("failed to get Hive Metastore $0 configuration",
kNotificationAddThriftObjects));
if (iequals(thrift_objects_config, "false")) {
return Status::IllegalState(Substitute(
"Hive Metastore configuration is invalid: $0 must be set to true",
kNotificationAddThriftObjects));
}
return Status::OK();
}
Status HmsClient::Stop() {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "stopping HMS client");
HMS_RET_NOT_OK(client_.getInputProtocol()->getTransport()->close(),
"failed to close Hive Metastore connection");
return Status::OK();
}
bool HmsClient::IsConnected() {
return client_.getInputProtocol()->getTransport()->isOpen();
}
Status HmsClient::CreateDatabase(const hive::Database& database) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "create HMS database");
HMS_RET_NOT_OK(client_.create_database(database), "failed to create Hive Metastore database");
return Status::OK();
}
Status HmsClient::DropDatabase(const string& database_name, Cascade cascade) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "drop HMS database");
HMS_RET_NOT_OK(client_.drop_database(database_name, true, cascade == Cascade::kTrue),
"failed to drop Hive Metastore database");
return Status::OK();
}
Status HmsClient::GetAllDatabases(vector<string>* databases) {
DCHECK(databases);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get all HMS databases");
HMS_RET_NOT_OK(client_.get_all_databases(*databases),
"failed to get Hive Metastore databases");
return Status::OK();
}
Status HmsClient::GetDatabase(const string& pattern, hive::Database* database) {
DCHECK(database);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS database");
HMS_RET_NOT_OK(client_.get_database(*database, pattern),
"failed to get Hive Metastore database");
return Status::OK();
}
Status HmsClient::CreateTable(const hive::Table& table, const hive::EnvironmentContext& env_ctx) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "create HMS table");
HMS_RET_NOT_OK(client_.create_table_with_environment_context(table, env_ctx),
"failed to create Hive MetaStore table");
return Status::OK();
}
Status HmsClient::AlterTable(const std::string& database_name,
const std::string& table_name,
const hive::Table& table,
const hive::EnvironmentContext& env_ctx) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "alter HMS table");
HMS_RET_NOT_OK(client_.alter_table_with_environment_context(database_name, table_name,
table, env_ctx),
"failed to alter Hive MetaStore table");
return Status::OK();
}
Status HmsClient::DropTable(const string& database_name,
const string& table_name,
const hive::EnvironmentContext& env_ctx) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "drop HMS table");
HMS_RET_NOT_OK(client_.drop_table_with_environment_context(database_name, table_name,
true, env_ctx),
"failed to drop Hive Metastore table");
return Status::OK();
}
Status HmsClient::GetTableNames(const string& database_name,
vector<string>* table_names) {
DCHECK(table_names);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get all HMS tables");
HMS_RET_NOT_OK(client_.get_all_tables(*table_names, database_name),
"failed to get Hive Metastore tables");
return Status::OK();
}
Status HmsClient::GetTableNames(const std::string& database_name,
const std::string& filter,
std::vector<std::string>* table_names) {
DCHECK(table_names);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get filtered HMS tables");
HMS_RET_NOT_OK(client_.get_table_names_by_filter(*table_names, database_name,
filter, /*max_tables*/ -1),
"failed to get filtered Hive Metastore tables");
return Status::OK();
}
Status HmsClient::GetTable(const string& database_name,
const string& table_name,
hive::Table* table) {
DCHECK(table);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS table");
HMS_RET_NOT_OK(client_.get_table(*table, database_name, table_name),
"failed to get Hive Metastore table");
return Status::OK();
}
Status HmsClient::GetTables(const string& database_name,
const vector<string>& table_names,
vector<hive::Table>* tables) {
DCHECK(tables);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS tables");
HMS_RET_NOT_OK(client_.get_table_objects_by_name(*tables, database_name, table_names),
"failed to get Hive Metastore tables");
return Status::OK();
}
Status HmsClient::GetCurrentNotificationEventId(int64_t* event_id) {
DCHECK(event_id);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs,
"get HMS current notification event ID");
hive::CurrentNotificationEventId response;
HMS_RET_NOT_OK(client_.get_current_notificationEventId(response),
"failed to get Hive Metastore current event ID");
*event_id = response.eventId;
return Status::OK();
}
Status HmsClient::GetNotificationEvents(int64_t last_event_id,
int32_t max_events,
vector<hive::NotificationEvent>* events) {
DCHECK(events);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs,
"get HMS notification events");
hive::NotificationEventRequest request;
request.lastEvent = last_event_id;
request.__set_maxEvents(max_events);
hive::NotificationEventResponse response;
HMS_RET_NOT_OK(client_.get_next_notification(response, request),
"failed to get Hive Metastore next notification");
events->swap(response.events);
return Status::OK();
}
Status HmsClient::GetUuid(string* uuid) {
DCHECK(uuid);
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs,
"get HMS database UUID");
HMS_RET_NOT_OK(client_.get_metastore_db_uuid(*uuid),
"failed to get HMS DB UUID");
return Status::OK();
}
Status HmsClient::AddPartitions(const string& database_name,
const string& table_name,
vector<hive::Partition> partitions) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "add HMS table partitions");
hive::AddPartitionsRequest request;
hive::AddPartitionsResult response;
request.dbName = database_name;
request.tblName = table_name;
request.parts = std::move(partitions);
HMS_RET_NOT_OK(client_.add_partitions_req(response, request),
"failed to add Hive Metastore table partitions");
return Status::OK();
}
Status HmsClient::GetPartitions(const string& database_name,
const string& table_name,
vector<hive::Partition>* partitions) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, kSlowExecutionWarningThresholdMs, "get HMS table partitions");
HMS_RET_NOT_OK(client_.get_partitions(*partitions, database_name, table_name, -1),
"failed to get Hive Metastore table partitions");
return Status::OK();
}
Status HmsClient::DeserializeJsonTable(Slice json, hive::Table* table) {
auto membuffer(std::make_shared<TMemoryBuffer>(json.size()));
membuffer->write(json.data(), json.size());
TJSONProtocol protocol(membuffer);
HMS_RET_NOT_OK(table->read(&protocol), "failed to deserialize JSON table");
return Status::OK();
}
bool HmsClient::IsKuduTable(const hive::Table& table) {
const string* storage_handler =
FindOrNull(table.parameters, hms::HmsClient::kStorageHandlerKey);
if (!storage_handler) {
return false;
}
return *storage_handler == hms::HmsClient::kKuduStorageHandler;
}
bool HmsClient::IsSynchronized(const hive::Table& table) {
const string externalPurge =
FindWithDefault(table.parameters, hms::HmsClient::kExternalPurgeKey, "false");
return table.tableType == hms::HmsClient::kManagedTable ||
(table.tableType == hms::HmsClient::kExternalTable &&
iequals(externalPurge, "true"));
}
} // namespace hms
} // namespace kudu