blob: 5afa7744378d31d680ce2c4a802a75cd13ec04ba [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/hms/hms_client.h"
#include <algorithm>
#include <cstdint>
#include <map>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/strings/substitute.h"
#include "kudu/hms/hive_metastore_types.h"
#include "kudu/hms/mini_hms.h"
#include "kudu/rpc/sasl_common.h"
#include "kudu/security/test/mini_kdc.h"
#include "kudu/thrift/client.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using std::optional;
using kudu::rpc::SaslProtection;
using std::make_pair;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace hms {
class HmsClientTest : public KuduTest,
public ::testing::WithParamInterface<optional<SaslProtection::Type>> {
public:
static Status CreateTable(HmsClient* client,
const string& database_name,
const string& table_name,
const string& table_id,
const string& cluster_id) {
hive::Table table;
table.dbName = database_name;
table.tableName = table_name;
table.tableType = HmsClient::kManagedTable;
table.__set_parameters({
make_pair(HmsClient::kKuduTableIdKey, table_id),
make_pair(HmsClient::kKuduTableNameKey, table_name),
make_pair(HmsClient::kKuduClusterIdKey, cluster_id),
make_pair(HmsClient::kKuduMasterAddrsKey, string("TODO")),
make_pair(HmsClient::kStorageHandlerKey, HmsClient::kKuduStorageHandler)
});
table.sd.inputFormat = HmsClient::kKuduInputFormat;
table.sd.outputFormat = HmsClient::kKuduOutputFormat;
table.sd.serdeInfo.serializationLib = HmsClient::kKuduSerDeLib;
hive::EnvironmentContext env_ctx;
env_ctx.__set_properties({ make_pair(HmsClient::kKuduMasterEventKey, "true") });
return client->CreateTable(table, env_ctx);
}
Status DropTable(HmsClient* client,
const string& database_name,
const string& table_name,
const string& table_id) {
hive::EnvironmentContext env_ctx;
env_ctx.__set_properties({ make_pair(HmsClient::kKuduTableIdKey, table_id) });
return client->DropTable(database_name, table_name, env_ctx);
}
};
INSTANTIATE_TEST_SUITE_P(ProtectionTypes,
HmsClientTest,
::testing::Values(std::nullopt
, SaslProtection::kIntegrity
// On macos, krb5 has issues repeatedly spinning up new KDCs ('unable to reach
// any KDC in realm KRBTEST.COM, tried 1 KDC'). Integrity protection gives us
// good coverage, so we disable the other variants.
#ifndef __APPLE__
, SaslProtection::kAuthentication
, SaslProtection::kPrivacy
#endif
));
TEST_P(HmsClientTest, TestHmsOperations) {
const auto protection = GetParam();
MiniKdc kdc;
MiniHms hms;
thrift::ClientOptions hms_client_opts;
if (protection) {
ASSERT_OK(kdc.Start());
string spn = "hive/127.0.0.1";
string ktpath;
ASSERT_OK(kdc.CreateServiceKeytab(spn, &ktpath));
ASSERT_OK(rpc::SaslInit());
hms.EnableKerberos(kdc.GetEnvVars()["KRB5_CONFIG"],
spn,
ktpath,
*protection);
ASSERT_OK(kdc.CreateUserPrincipal("alice"));
ASSERT_OK(kdc.Kinit("alice"));
ASSERT_OK(kdc.SetKrb5Environment());
hms_client_opts.enable_kerberos = true;
hms_client_opts.service_principal = "hive";
}
// Set the `KUDU_HMS_SYNC_ENABLED` environment variable in the
// HMS environment to manually enable HMS synchronization checks.
// This means we don't need to stand up a Kudu Cluster for this test.
hms.AddEnvVar("KUDU_HMS_SYNC_ENABLED", "1");
ASSERT_OK(hms.Start());
HmsClient client(hms.address(), hms_client_opts);
ASSERT_FALSE(client.IsConnected());
ASSERT_OK(client.Start());
ASSERT_TRUE(client.IsConnected());
// Create a database.
string database_name = "my_db";
hive::Database db;
db.name = database_name;
ASSERT_OK(client.CreateDatabase(db));
ASSERT_TRUE(client.CreateDatabase(db).IsAlreadyPresent());
// Get all databases.
vector<string> databases;
ASSERT_OK(client.GetAllDatabases(&databases));
std::sort(databases.begin(), databases.end());
EXPECT_EQ(vector<string>({ "default", database_name }), databases) << "Databases: " << databases;
// Get a specific database.
hive::Database my_db;
ASSERT_OK(client.GetDatabase(database_name, &my_db));
EXPECT_EQ(database_name, my_db.name) << "my_db: " << my_db;
string table_name = "my_table";
string table_id = "table-id";
string cluster_id = "cluster-id";
// Check that the HMS rejects Kudu tables without a table ID.
ASSERT_STR_CONTAINS(CreateTable(&client, database_name, table_name, "", cluster_id).ToString(),
"Kudu table entry must contain a table ID");
// Create a table.
ASSERT_OK(CreateTable(&client, database_name, table_name, table_id, cluster_id));
ASSERT_TRUE(CreateTable(&client, database_name, table_name,
table_id, cluster_id).IsAlreadyPresent());
// Retrieve a table.
hive::Table my_table;
ASSERT_OK(client.GetTable(database_name, table_name, &my_table));
EXPECT_EQ(database_name, my_table.dbName) << "my_table: " << my_table;
EXPECT_EQ(table_name, my_table.tableName);
EXPECT_EQ(table_id, my_table.parameters[HmsClient::kKuduTableIdKey]);
EXPECT_EQ(cluster_id, my_table.parameters[HmsClient::kKuduClusterIdKey]);
EXPECT_EQ(HmsClient::kKuduStorageHandler, my_table.parameters[HmsClient::kStorageHandlerKey]);
EXPECT_EQ(HmsClient::kManagedTable, my_table.tableType);
EXPECT_EQ(HmsClient::kKuduInputFormat, my_table.sd.inputFormat);
EXPECT_EQ(HmsClient::kKuduOutputFormat, my_table.sd.outputFormat);
EXPECT_EQ(HmsClient::kKuduSerDeLib, my_table.sd.serdeInfo.serializationLib);
string new_table_name = "my_altered_table";
// Renaming the table with an incorrect table ID should fail.
hive::Table altered_table(my_table);
altered_table.tableName = new_table_name;
altered_table.parameters[HmsClient::kKuduTableIdKey] = "bogus-table-id";
ASSERT_TRUE(client.AlterTable(database_name, table_name, altered_table).IsRemoteError());
// Rename the table.
altered_table.parameters[HmsClient::kKuduTableIdKey] = table_id;
ASSERT_OK(client.AlterTable(database_name, table_name, altered_table));
// Original table is gone.
ASSERT_TRUE(client.AlterTable(database_name, table_name, altered_table).IsIllegalState());
// Check that the altered table's properties are intact.
hive::Table renamed_table;
ASSERT_OK(client.GetTable(database_name, new_table_name, &renamed_table));
EXPECT_EQ(database_name, renamed_table.dbName);
EXPECT_EQ(new_table_name, renamed_table.tableName);
EXPECT_EQ(table_id, renamed_table.parameters[HmsClient::kKuduTableIdKey]);
EXPECT_EQ(HmsClient::kKuduStorageHandler,
renamed_table.parameters[HmsClient::kStorageHandlerKey]);
EXPECT_EQ(HmsClient::kManagedTable, renamed_table.tableType);
EXPECT_EQ(HmsClient::kKuduInputFormat, renamed_table.sd.inputFormat);
EXPECT_EQ(HmsClient::kKuduOutputFormat, renamed_table.sd.outputFormat);
EXPECT_EQ(HmsClient::kKuduSerDeLib, renamed_table.sd.serdeInfo.serializationLib);
// Create a table with an uppercase name.
string uppercase_table_name = "my_UPPERCASE_Table";
ASSERT_OK(CreateTable(&client, database_name, uppercase_table_name,
"uppercase-table-id", cluster_id));
// Create a table with an illegal utf-8 name.
ASSERT_TRUE(CreateTable(&client, database_name, "☃ sculptures 😉",
table_id, cluster_id).IsInvalidArgument());
// Create a non-Kudu table.
hive::Table non_kudu_table;
non_kudu_table.dbName = database_name;
non_kudu_table.tableName = "non_kudu_table";
non_kudu_table.parameters[HmsClient::kStorageHandlerKey] = "bogus.storage.Handler";
ASSERT_OK(client.CreateTable(non_kudu_table));
// Get all table names.
vector<string> table_names;
ASSERT_OK(client.GetTableNames(database_name, &table_names));
std::sort(table_names.begin(), table_names.end());
EXPECT_EQ(vector<string>({ new_table_name, "my_uppercase_table", "non_kudu_table" }), table_names)
<< "table names: " << table_names;
// Get filtered table names.
// NOTE: LIKE filters are used instead of = filters due to HIVE-21614
table_names.clear();
string filter = Substitute(
"$0$1 LIKE \"$2\"", HmsClient::kHiveFilterFieldParams,
HmsClient::kStorageHandlerKey, HmsClient::kKuduStorageHandler);
ASSERT_OK(client.GetTableNames(database_name, filter, &table_names));
std::sort(table_names.begin(), table_names.end());
EXPECT_EQ(vector<string>({ new_table_name, "my_uppercase_table" }), table_names)
<< "table names: " << table_names;
// Get multiple tables.
vector<hive::Table> tables;
ASSERT_OK(client.GetTables(database_name, table_names, &tables));
ASSERT_EQ(2, tables.size());
EXPECT_EQ(new_table_name, tables[0].tableName);
EXPECT_EQ("my_uppercase_table", tables[1].tableName);
// Check that the HMS rejects Kudu table drops with a bogus table ID.
ASSERT_TRUE(DropTable(&client, database_name, new_table_name, "bogus-table-id").IsRemoteError());
// Check that the HMS rejects non-existent table drops.
ASSERT_TRUE(DropTable(&client, database_name, "foo-bar", "bogus-table-id").IsNotFound());
// Drop a table.
ASSERT_OK(DropTable(&client, database_name, new_table_name, table_id));
// Drop the database.
ASSERT_TRUE(client.DropDatabase(database_name).IsIllegalState());
// TODO(HIVE-17008)
// ASSERT_OK(client.DropDatabase(database_name, Cascade::kTrue));
// TODO(HIVE-17008)
// ASSERT_TRUE(client.DropDatabase(database_name).IsNotFound());
int64_t event_id;
ASSERT_OK(client.GetCurrentNotificationEventId(&event_id));
// Retrieve the notification log and spot-check that the results look sensible.
vector<hive::NotificationEvent> events;
ASSERT_OK(client.GetNotificationEvents(-1, 100, &events));
ASSERT_EQ(6, events.size());
EXPECT_EQ("CREATE_DATABASE", events[0].eventType);
EXPECT_EQ("CREATE_TABLE", events[1].eventType);
EXPECT_EQ("ALTER_TABLE", events[2].eventType);
EXPECT_EQ("CREATE_TABLE", events[3].eventType);
EXPECT_EQ("CREATE_TABLE", events[4].eventType);
EXPECT_EQ("DROP_TABLE", events[5].eventType);
// TODO(HIVE-17008)
//EXPECT_EQ("DROP_TABLE", events[6].eventType);
//EXPECT_EQ("DROP_DATABASE", events[7].eventType);
// Retrieve a specific notification log.
events.clear();
ASSERT_OK(client.GetNotificationEvents(2, 1, &events));
ASSERT_EQ(1, events.size()) << "events: " << events;
EXPECT_EQ("ALTER_TABLE", events[0].eventType);
ASSERT_OK(client.Stop());
}
TEST_P(HmsClientTest, TestLargeObjects) {
const auto protection = GetParam();
MiniKdc kdc;
MiniHms hms;
thrift::ClientOptions hms_client_opts;
if (protection) {
ASSERT_OK(kdc.Start());
// Try a non-standard service principal to ensure it works correctly.
string spn = "hive_alternate_sp/127.0.0.1";
string ktpath;
ASSERT_OK(kdc.CreateServiceKeytab(spn, &ktpath));
ASSERT_OK(rpc::SaslInit());
hms.EnableKerberos(kdc.GetEnvVars()["KRB5_CONFIG"],
spn,
ktpath,
*protection);
ASSERT_OK(kdc.CreateUserPrincipal("alice"));
ASSERT_OK(kdc.Kinit("alice"));
ASSERT_OK(kdc.SetKrb5Environment());
hms_client_opts.enable_kerberos = true;
hms_client_opts.service_principal = "hive_alternate_sp";
}
ASSERT_OK(hms.Start());
HmsClient client(hms.address(), hms_client_opts);
ASSERT_OK(client.Start());
string database_name = "default";
string table_name = "big_table";
hive::Table table;
table.dbName = database_name;
table.tableName = table_name;
table.tableType = HmsClient::kManagedTable;
hive::FieldSchema partition_key;
partition_key.name = "c1";
partition_key.type = "int";
table.partitionKeys.emplace_back(std::move(partition_key));
table.sd.inputFormat = HmsClient::kKuduInputFormat;
table.sd.outputFormat = HmsClient::kKuduOutputFormat;
table.sd.serdeInfo.serializationLib = HmsClient::kKuduSerDeLib;
ASSERT_OK(client.CreateTable(table));
// Add a bunch of partitions to the table. This ensures we can send and
// receive really large thrift objects. We have to add the partitions in small
// batches, otherwise Derby chokes.
const int kBatches = 40;
const int kPartitionsPerBatch = 25;
for (int batch_idx = 0; batch_idx < kBatches; batch_idx++) {
vector<hive::Partition> partitions;
for (int partition_idx = 0; partition_idx < kPartitionsPerBatch; partition_idx++) {
hive::Partition partition;
partition.dbName = database_name;
partition.tableName = table_name;
partition.values = { std::to_string(batch_idx * kPartitionsPerBatch + partition_idx) };
partitions.emplace_back(std::move(partition));
}
ASSERT_OK(client.AddPartitions(database_name, table_name, std::move(partitions)));
}
ASSERT_OK(client.GetTable(database_name, table_name, &table));
vector<hive::Partition> partitions;
ASSERT_OK(client.GetPartitions(database_name, table_name, &partitions));
ASSERT_EQ(kBatches * kPartitionsPerBatch, partitions.size());
}
TEST_F(HmsClientTest, TestHmsFaultHandling) {
MiniHms hms;
ASSERT_OK(hms.Start());
thrift::ClientOptions options;
options.recv_timeout = MonoDelta::FromMilliseconds(500),
options.send_timeout = MonoDelta::FromMilliseconds(500);
HmsClient client(hms.address(), options);
ASSERT_OK(client.Start());
// Get a specific database.
hive::Database my_db;
ASSERT_OK(client.GetDatabase("default", &my_db));
// Shutdown the HMS.
ASSERT_OK(hms.Stop());
ASSERT_TRUE(client.GetDatabase("default", &my_db).IsNetworkError());
ASSERT_OK(client.Stop());
// Restart the HMS and ensure the client can connect.
ASSERT_OK(hms.Start());
ASSERT_OK(client.Start());
ASSERT_OK(client.GetDatabase("default", &my_db));
// Pause the HMS and ensure the client times-out appropriately.
ASSERT_OK(hms.Pause());
ASSERT_TRUE(client.GetDatabase("default", &my_db).IsTimedOut());
// Unpause the HMS and ensure the client can continue.
ASSERT_OK(hms.Resume());
ASSERT_OK(client.GetDatabase("default", &my_db));
}
// Test connecting the HMS client to TCP sockets in various invalid states.
TEST_F(HmsClientTest, TestHmsConnect) {
Sockaddr loopback;
ASSERT_OK(loopback.ParseString("127.0.0.1", 0));
thrift::ClientOptions options;
options.recv_timeout = MonoDelta::FromMilliseconds(100),
options.send_timeout = MonoDelta::FromMilliseconds(100);
options.conn_timeout = MonoDelta::FromMilliseconds(100);
// This test will attempt to connect and transfer data upon starting the
// client.
options.verify_service_config = true;
auto start_client = [&options] (Sockaddr addr) {
HmsClient client(HostPort(addr), options);
return client.Start();
};
// Listening, but not accepting socket.
Sockaddr listening;
Socket listening_socket;
ASSERT_OK(listening_socket.Init(loopback.family(), 0));
ASSERT_OK(listening_socket.BindAndListen(loopback, 1));
listening_socket.GetSocketAddress(&listening);
ASSERT_TRUE(start_client(listening).IsTimedOut());
// Bound, but not listening socket.
Sockaddr bound;
Socket bound_socket;
ASSERT_OK(bound_socket.Init(loopback.family(), 0));
ASSERT_OK(bound_socket.Bind(loopback));
bound_socket.GetSocketAddress(&bound);
ASSERT_TRUE(start_client(bound).IsNetworkError());
// Unbound socket.
Sockaddr unbound;
Socket unbound_socket;
ASSERT_OK(unbound_socket.Init(loopback.family(), 0));
ASSERT_OK(unbound_socket.Bind(loopback));
unbound_socket.GetSocketAddress(&unbound);
ASSERT_OK(unbound_socket.Close());
ASSERT_TRUE(start_client(unbound).IsNetworkError());
}
TEST_F(HmsClientTest, TestDeserializeJsonTable) {
string json = R"#({"1":{"str":"table_name"},"2":{"str":"database_name"}})#";
hive::Table table;
ASSERT_OK(HmsClient::DeserializeJsonTable(json, &table));
ASSERT_EQ("table_name", table.tableName);
ASSERT_EQ("database_name", table.dbName);
}
TEST_F(HmsClientTest, TestCaseSensitivity) {
MiniKdc kdc;
MiniHms hms;
// Set the `KUDU_HMS_SYNC_ENABLED` environment variable in the
// HMS environment to manually enable HMS synchronization checks.
// This means we don't need to stand up a Kudu Cluster for this test.
hms.AddEnvVar("KUDU_HMS_SYNC_ENABLED", "1");
ASSERT_OK(hms.Start());
HmsClient client(hms.address(), thrift::ClientOptions());
ASSERT_OK(client.Start());
// Create a database.
hive::Database db;
db.name = "my_db";
ASSERT_OK(client.CreateDatabase(db));
// Create a table.
ASSERT_OK(CreateTable(&client, "my_db", "Foo", "abc123", "Bar"));
hive::Table table;
ASSERT_OK(client.GetTable("my_db", "Foo", &table));
ASSERT_EQ(table.tableName, "foo");
ASSERT_OK(client.GetTable("my_db", "foo", &table));
ASSERT_EQ(table.tableName, "foo");
ASSERT_OK(client.GetTable("MY_DB", "FOO", &table));
ASSERT_EQ(table.dbName, "my_db");
ASSERT_EQ(table.tableName, "foo");
}
} // namespace hms
} // namespace kudu