| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/hms/hms_client.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <map> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/hms/hive_metastore_types.h" |
| #include "kudu/hms/mini_hms.h" |
| #include "kudu/rpc/sasl_common.h" |
| #include "kudu/security/test/mini_kdc.h" |
| #include "kudu/thrift/client.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/net/socket.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using std::optional; |
| using kudu::rpc::SaslProtection; |
| using std::make_pair; |
| using std::string; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace hms { |
| |
| class HmsClientTest : public KuduTest, |
| public ::testing::WithParamInterface<optional<SaslProtection::Type>> { |
| public: |
| |
| static Status CreateTable(HmsClient* client, |
| const string& database_name, |
| const string& table_name, |
| const string& table_id, |
| const string& cluster_id) { |
| hive::Table table; |
| table.dbName = database_name; |
| table.tableName = table_name; |
| table.tableType = HmsClient::kManagedTable; |
| table.__set_parameters({ |
| make_pair(HmsClient::kKuduTableIdKey, table_id), |
| make_pair(HmsClient::kKuduTableNameKey, table_name), |
| make_pair(HmsClient::kKuduClusterIdKey, cluster_id), |
| make_pair(HmsClient::kKuduMasterAddrsKey, string("TODO")), |
| make_pair(HmsClient::kStorageHandlerKey, HmsClient::kKuduStorageHandler) |
| }); |
| |
| hive::EnvironmentContext env_ctx; |
| env_ctx.__set_properties({ make_pair(HmsClient::kKuduMasterEventKey, "true") }); |
| |
| return client->CreateTable(table, env_ctx); |
| } |
| |
| Status DropTable(HmsClient* client, |
| const string& database_name, |
| const string& table_name, |
| const string& table_id) { |
| hive::EnvironmentContext env_ctx; |
| env_ctx.__set_properties({ make_pair(HmsClient::kKuduTableIdKey, table_id) }); |
| return client->DropTable(database_name, table_name, env_ctx); |
| } |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(ProtectionTypes, |
| HmsClientTest, |
| ::testing::Values(std::nullopt |
| , SaslProtection::kIntegrity |
| // On macos, krb5 has issues repeatedly spinning up new KDCs ('unable to reach |
| // any KDC in realm KRBTEST.COM, tried 1 KDC'). Integrity protection gives us |
| // good coverage, so we disable the other variants. |
| #ifndef __APPLE__ |
| , SaslProtection::kAuthentication |
| , SaslProtection::kPrivacy |
| #endif |
| )); |
| |
| TEST_P(HmsClientTest, TestHmsOperations) { |
| const auto protection = GetParam(); |
| |
| MiniKdc kdc; |
| MiniHms hms; |
| thrift::ClientOptions hms_client_opts; |
| |
| if (protection) { |
| ASSERT_OK(kdc.Start()); |
| |
| string spn = "hive/127.0.0.1"; |
| string ktpath; |
| ASSERT_OK(kdc.CreateServiceKeytab(spn, &ktpath)); |
| |
| ASSERT_OK(rpc::SaslInit()); |
| hms.EnableKerberos(kdc.GetEnvVars()["KRB5_CONFIG"], |
| spn, |
| ktpath, |
| *protection); |
| |
| ASSERT_OK(kdc.CreateUserPrincipal("alice")); |
| ASSERT_OK(kdc.Kinit("alice")); |
| ASSERT_OK(kdc.SetKrb5Environment()); |
| hms_client_opts.enable_kerberos = true; |
| hms_client_opts.service_principal = "hive"; |
| } |
| |
| // Set the `KUDU_HMS_SYNC_ENABLED` environment variable in the |
| // HMS environment to manually enable HMS synchronization checks. |
| // This means we don't need to stand up a Kudu Cluster for this test. |
| hms.AddEnvVar("KUDU_HMS_SYNC_ENABLED", "1"); |
| |
| ASSERT_OK(hms.Start()); |
| |
| HmsClient client(hms.address(), hms_client_opts); |
| ASSERT_FALSE(client.IsConnected()); |
| ASSERT_OK(client.Start()); |
| ASSERT_TRUE(client.IsConnected()); |
| |
| // Create a database. |
| string database_name = "my_db"; |
| hive::Database db; |
| db.name = database_name; |
| ASSERT_OK(client.CreateDatabase(db)); |
| ASSERT_TRUE(client.CreateDatabase(db).IsAlreadyPresent()); |
| |
| // Get all databases. |
| vector<string> databases; |
| ASSERT_OK(client.GetAllDatabases(&databases)); |
| std::sort(databases.begin(), databases.end()); |
| EXPECT_EQ(vector<string>({ "default", database_name }), databases) << "Databases: " << databases; |
| |
| // Get a specific database. |
| hive::Database my_db; |
| ASSERT_OK(client.GetDatabase(database_name, &my_db)); |
| EXPECT_EQ(database_name, my_db.name) << "my_db: " << my_db; |
| |
| string table_name = "my_table"; |
| string table_id = "table-id"; |
| string cluster_id = "cluster-id"; |
| |
| // Check that the HMS rejects Kudu tables without a table ID. |
| ASSERT_STR_CONTAINS(CreateTable(&client, database_name, table_name, "", cluster_id).ToString(), |
| "Kudu table entry must contain a table ID"); |
| |
| // Create a table. |
| ASSERT_OK(CreateTable(&client, database_name, table_name, table_id, cluster_id)); |
| ASSERT_TRUE(CreateTable(&client, database_name, table_name, |
| table_id, cluster_id).IsAlreadyPresent()); |
| |
| // Retrieve a table. |
| hive::Table my_table; |
| ASSERT_OK(client.GetTable(database_name, table_name, &my_table)); |
| EXPECT_EQ(database_name, my_table.dbName) << "my_table: " << my_table; |
| EXPECT_EQ(table_name, my_table.tableName); |
| EXPECT_EQ(table_id, my_table.parameters[HmsClient::kKuduTableIdKey]); |
| EXPECT_EQ(cluster_id, my_table.parameters[HmsClient::kKuduClusterIdKey]); |
| EXPECT_EQ(HmsClient::kKuduStorageHandler, my_table.parameters[HmsClient::kStorageHandlerKey]); |
| EXPECT_EQ(HmsClient::kManagedTable, my_table.tableType); |
| |
| string new_table_name = "my_altered_table"; |
| |
| // Renaming the table with an incorrect table ID should fail. |
| hive::Table altered_table(my_table); |
| altered_table.tableName = new_table_name; |
| altered_table.parameters[HmsClient::kKuduTableIdKey] = "bogus-table-id"; |
| ASSERT_TRUE(client.AlterTable(database_name, table_name, altered_table).IsRemoteError()); |
| |
| // Rename the table. |
| altered_table.parameters[HmsClient::kKuduTableIdKey] = table_id; |
| ASSERT_OK(client.AlterTable(database_name, table_name, altered_table)); |
| |
| // Original table is gone. |
| ASSERT_TRUE(client.AlterTable(database_name, table_name, altered_table).IsIllegalState()); |
| |
| // Check that the altered table's properties are intact. |
| hive::Table renamed_table; |
| ASSERT_OK(client.GetTable(database_name, new_table_name, &renamed_table)); |
| EXPECT_EQ(database_name, renamed_table.dbName); |
| EXPECT_EQ(new_table_name, renamed_table.tableName); |
| EXPECT_EQ(table_id, renamed_table.parameters[HmsClient::kKuduTableIdKey]); |
| EXPECT_EQ(HmsClient::kKuduStorageHandler, |
| renamed_table.parameters[HmsClient::kStorageHandlerKey]); |
| EXPECT_EQ(HmsClient::kManagedTable, renamed_table.tableType); |
| |
| // Create a table with an uppercase name. |
| string uppercase_table_name = "my_UPPERCASE_Table"; |
| ASSERT_OK(CreateTable(&client, database_name, uppercase_table_name, |
| "uppercase-table-id", cluster_id)); |
| |
| // Create a table with an illegal utf-8 name. |
| ASSERT_TRUE(CreateTable(&client, database_name, "☃ sculptures 😉", |
| table_id, cluster_id).IsInvalidArgument()); |
| |
| // Create a non-Kudu table. |
| hive::Table non_kudu_table; |
| non_kudu_table.dbName = database_name; |
| non_kudu_table.tableName = "non_kudu_table"; |
| non_kudu_table.parameters[HmsClient::kStorageHandlerKey] = "bogus.storage.Handler"; |
| ASSERT_OK(client.CreateTable(non_kudu_table)); |
| |
| // Get all table names. |
| vector<string> table_names; |
| ASSERT_OK(client.GetTableNames(database_name, &table_names)); |
| std::sort(table_names.begin(), table_names.end()); |
| EXPECT_EQ(vector<string>({ new_table_name, "my_uppercase_table", "non_kudu_table" }), table_names) |
| << "table names: " << table_names; |
| |
| // Get filtered table names. |
| // NOTE: LIKE filters are used instead of = filters due to HIVE-21614 |
| table_names.clear(); |
| string filter = Substitute( |
| "$0$1 LIKE \"$2\"", HmsClient::kHiveFilterFieldParams, |
| HmsClient::kStorageHandlerKey, HmsClient::kKuduStorageHandler); |
| ASSERT_OK(client.GetTableNames(database_name, filter, &table_names)); |
| std::sort(table_names.begin(), table_names.end()); |
| EXPECT_EQ(vector<string>({ new_table_name, "my_uppercase_table" }), table_names) |
| << "table names: " << table_names; |
| |
| // Get multiple tables. |
| vector<hive::Table> tables; |
| ASSERT_OK(client.GetTables(database_name, table_names, &tables)); |
| ASSERT_EQ(2, tables.size()); |
| EXPECT_EQ(new_table_name, tables[0].tableName); |
| EXPECT_EQ("my_uppercase_table", tables[1].tableName); |
| |
| // Check that the HMS rejects Kudu table drops with a bogus table ID. |
| ASSERT_TRUE(DropTable(&client, database_name, new_table_name, "bogus-table-id").IsRemoteError()); |
| // Check that the HMS rejects non-existent table drops. |
| ASSERT_TRUE(DropTable(&client, database_name, "foo-bar", "bogus-table-id").IsNotFound()); |
| |
| // Drop a table. |
| ASSERT_OK(DropTable(&client, database_name, new_table_name, table_id)); |
| |
| // Drop the database. |
| ASSERT_TRUE(client.DropDatabase(database_name).IsIllegalState()); |
| // TODO(HIVE-17008) |
| // ASSERT_OK(client.DropDatabase(database_name, Cascade::kTrue)); |
| // TODO(HIVE-17008) |
| // ASSERT_TRUE(client.DropDatabase(database_name).IsNotFound()); |
| |
| int64_t event_id; |
| ASSERT_OK(client.GetCurrentNotificationEventId(&event_id)); |
| |
| // Retrieve the notification log and spot-check that the results look sensible. |
| vector<hive::NotificationEvent> events; |
| ASSERT_OK(client.GetNotificationEvents(-1, 100, &events)); |
| |
| ASSERT_EQ(6, events.size()); |
| EXPECT_EQ("CREATE_DATABASE", events[0].eventType); |
| EXPECT_EQ("CREATE_TABLE", events[1].eventType); |
| EXPECT_EQ("ALTER_TABLE", events[2].eventType); |
| EXPECT_EQ("CREATE_TABLE", events[3].eventType); |
| EXPECT_EQ("CREATE_TABLE", events[4].eventType); |
| EXPECT_EQ("DROP_TABLE", events[5].eventType); |
| // TODO(HIVE-17008) |
| //EXPECT_EQ("DROP_TABLE", events[6].eventType); |
| //EXPECT_EQ("DROP_DATABASE", events[7].eventType); |
| |
| // Retrieve a specific notification log. |
| events.clear(); |
| ASSERT_OK(client.GetNotificationEvents(2, 1, &events)); |
| ASSERT_EQ(1, events.size()) << "events: " << events; |
| EXPECT_EQ("ALTER_TABLE", events[0].eventType); |
| ASSERT_OK(client.Stop()); |
| } |
| |
| TEST_P(HmsClientTest, TestLargeObjects) { |
| const auto protection = GetParam(); |
| |
| MiniKdc kdc; |
| MiniHms hms; |
| thrift::ClientOptions hms_client_opts; |
| |
| if (protection) { |
| ASSERT_OK(kdc.Start()); |
| |
| // Try a non-standard service principal to ensure it works correctly. |
| string spn = "hive_alternate_sp/127.0.0.1"; |
| string ktpath; |
| ASSERT_OK(kdc.CreateServiceKeytab(spn, &ktpath)); |
| |
| ASSERT_OK(rpc::SaslInit()); |
| hms.EnableKerberos(kdc.GetEnvVars()["KRB5_CONFIG"], |
| spn, |
| ktpath, |
| *protection); |
| |
| ASSERT_OK(kdc.CreateUserPrincipal("alice")); |
| ASSERT_OK(kdc.Kinit("alice")); |
| ASSERT_OK(kdc.SetKrb5Environment()); |
| hms_client_opts.enable_kerberos = true; |
| hms_client_opts.service_principal = "hive_alternate_sp"; |
| } |
| |
| ASSERT_OK(hms.Start()); |
| |
| HmsClient client(hms.address(), hms_client_opts); |
| ASSERT_OK(client.Start()); |
| |
| string database_name = "default"; |
| string table_name = "big_table"; |
| |
| hive::Table table; |
| table.dbName = database_name; |
| table.tableName = table_name; |
| table.tableType = HmsClient::kManagedTable; |
| hive::FieldSchema partition_key; |
| partition_key.name = "c1"; |
| partition_key.type = "int"; |
| table.partitionKeys.emplace_back(std::move(partition_key)); |
| |
| ASSERT_OK(client.CreateTable(table)); |
| |
| // Add a bunch of partitions to the table. This ensures we can send and |
| // receive really large thrift objects. We have to add the partitions in small |
| // batches, otherwise Derby chokes. |
| const int kBatches = 40; |
| const int kPartitionsPerBatch = 25; |
| |
| for (int batch_idx = 0; batch_idx < kBatches; batch_idx++) { |
| vector<hive::Partition> partitions; |
| for (int partition_idx = 0; partition_idx < kPartitionsPerBatch; partition_idx++) { |
| hive::Partition partition; |
| partition.dbName = database_name; |
| partition.tableName = table_name; |
| partition.values = { std::to_string(batch_idx * kPartitionsPerBatch + partition_idx) }; |
| partitions.emplace_back(std::move(partition)); |
| } |
| |
| ASSERT_OK(client.AddPartitions(database_name, table_name, std::move(partitions))); |
| } |
| |
| ASSERT_OK(client.GetTable(database_name, table_name, &table)); |
| |
| vector<hive::Partition> partitions; |
| ASSERT_OK(client.GetPartitions(database_name, table_name, &partitions)); |
| ASSERT_EQ(kBatches * kPartitionsPerBatch, partitions.size()); |
| } |
| |
| TEST_F(HmsClientTest, TestHmsFaultHandling) { |
| MiniHms hms; |
| ASSERT_OK(hms.Start()); |
| |
| thrift::ClientOptions options; |
| options.recv_timeout = MonoDelta::FromMilliseconds(500), |
| options.send_timeout = MonoDelta::FromMilliseconds(500); |
| HmsClient client(hms.address(), options); |
| ASSERT_OK(client.Start()); |
| |
| // Get a specific database. |
| hive::Database my_db; |
| ASSERT_OK(client.GetDatabase("default", &my_db)); |
| |
| // Shutdown the HMS. |
| ASSERT_OK(hms.Stop()); |
| ASSERT_TRUE(client.GetDatabase("default", &my_db).IsNetworkError()); |
| ASSERT_OK(client.Stop()); |
| |
| // Restart the HMS and ensure the client can connect. |
| ASSERT_OK(hms.Start()); |
| ASSERT_OK(client.Start()); |
| ASSERT_OK(client.GetDatabase("default", &my_db)); |
| |
| // Pause the HMS and ensure the client times-out appropriately. |
| ASSERT_OK(hms.Pause()); |
| ASSERT_TRUE(client.GetDatabase("default", &my_db).IsTimedOut()); |
| |
| // Unpause the HMS and ensure the client can continue. |
| ASSERT_OK(hms.Resume()); |
| ASSERT_OK(client.GetDatabase("default", &my_db)); |
| } |
| |
| // Test connecting the HMS client to TCP sockets in various invalid states. |
| TEST_F(HmsClientTest, TestHmsConnect) { |
| Sockaddr loopback; |
| ASSERT_OK(loopback.ParseString("127.0.0.1", 0)); |
| |
| thrift::ClientOptions options; |
| options.recv_timeout = MonoDelta::FromMilliseconds(100), |
| options.send_timeout = MonoDelta::FromMilliseconds(100); |
| options.conn_timeout = MonoDelta::FromMilliseconds(100); |
| |
| // This test will attempt to connect and transfer data upon starting the |
| // client. |
| options.verify_service_config = true; |
| |
| auto start_client = [&options] (Sockaddr addr) { |
| HmsClient client(HostPort(addr), options); |
| return client.Start(); |
| }; |
| |
| // Listening, but not accepting socket. |
| Sockaddr listening; |
| Socket listening_socket; |
| ASSERT_OK(listening_socket.Init(loopback.family(), 0)); |
| ASSERT_OK(listening_socket.BindAndListen(loopback, 1)); |
| listening_socket.GetSocketAddress(&listening); |
| ASSERT_TRUE(start_client(listening).IsTimedOut()); |
| |
| // Bound, but not listening socket. |
| Sockaddr bound; |
| Socket bound_socket; |
| ASSERT_OK(bound_socket.Init(loopback.family(), 0)); |
| ASSERT_OK(bound_socket.Bind(loopback)); |
| bound_socket.GetSocketAddress(&bound); |
| ASSERT_TRUE(start_client(bound).IsNetworkError()); |
| |
| // Unbound socket. |
| Sockaddr unbound; |
| Socket unbound_socket; |
| ASSERT_OK(unbound_socket.Init(loopback.family(), 0)); |
| ASSERT_OK(unbound_socket.Bind(loopback)); |
| unbound_socket.GetSocketAddress(&unbound); |
| ASSERT_OK(unbound_socket.Close()); |
| ASSERT_TRUE(start_client(unbound).IsNetworkError()); |
| } |
| |
| TEST_F(HmsClientTest, TestDeserializeJsonTable) { |
| string json = R"#({"1":{"str":"table_name"},"2":{"str":"database_name"}})#"; |
| hive::Table table; |
| ASSERT_OK(HmsClient::DeserializeJsonTable(json, &table)); |
| ASSERT_EQ("table_name", table.tableName); |
| ASSERT_EQ("database_name", table.dbName); |
| } |
| |
| TEST_F(HmsClientTest, TestCaseSensitivity) { |
| MiniKdc kdc; |
| MiniHms hms; |
| |
| // Set the `KUDU_HMS_SYNC_ENABLED` environment variable in the |
| // HMS environment to manually enable HMS synchronization checks. |
| // This means we don't need to stand up a Kudu Cluster for this test. |
| hms.AddEnvVar("KUDU_HMS_SYNC_ENABLED", "1"); |
| |
| ASSERT_OK(hms.Start()); |
| |
| HmsClient client(hms.address(), thrift::ClientOptions()); |
| ASSERT_OK(client.Start()); |
| |
| // Create a database. |
| hive::Database db; |
| db.name = "my_db"; |
| ASSERT_OK(client.CreateDatabase(db)); |
| |
| // Create a table. |
| ASSERT_OK(CreateTable(&client, "my_db", "Foo", "abc123", "Bar")); |
| |
| hive::Table table; |
| ASSERT_OK(client.GetTable("my_db", "Foo", &table)); |
| ASSERT_EQ(table.tableName, "foo"); |
| |
| ASSERT_OK(client.GetTable("my_db", "foo", &table)); |
| ASSERT_EQ(table.tableName, "foo"); |
| |
| ASSERT_OK(client.GetTable("MY_DB", "FOO", &table)); |
| ASSERT_EQ(table.dbName, "my_db"); |
| ASSERT_EQ(table.tableName, "foo"); |
| } |
| |
| } // namespace hms |
| } // namespace kudu |