blob: 62cc0d6f1d98738749f1572ab77a415778add19c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <fstream>
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/common.pb.h"
#include "kudu/common/table_util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/hms/hive_metastore_types.h"
#include "kudu/hms/hms_catalog.h"
#include "kudu/hms/hms_client.h"
#include "kudu/hms/mini_hms.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/integration-tests/hms_itest-base.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/security/test/mini_kdc.h"
#include "kudu/thrift/client.h"
#include "kudu/transactions/txn_system_client.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_string(hive_metastore_uris);
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::hms::HmsClient;
using kudu::transactions::TxnSystemClient;
using std::nullopt;
using std::optional;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
// Test Master <-> HMS catalog synchronization.
class MasterHmsTest : public ExternalMiniClusterITestBase {
public:
void SetUp() override {
ExternalMiniClusterITestBase::SetUp();
ExternalMiniClusterOptions opts;
opts.hms_mode = GetHmsMode();
opts.num_masters = 1;
opts.num_tablet_servers = 1;
opts.enable_kerberos = EnableKerberos();
opts.principal = Principal();
// Tune down the notification log poll period in order to speed up catalog convergence.
opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");
SetFlags(&opts);
StartClusterWithOpts(std::move(opts));
thrift::ClientOptions hms_opts;
hms_opts.enable_kerberos = EnableKerberos();
hms_opts.service_principal = "hive";
ASSERT_OK(harness_.RestartHmsClient(cluster_, hms_opts));
}
void TearDown() override {
if (harness_.hms_client()) {
ASSERT_OK(harness_.hms_client()->Stop());
}
ExternalMiniClusterITestBase::TearDown();
}
Status StartHms() {
return harness_.StartHms(cluster_);
}
Status StopHms() {
return harness_.StopHms(cluster_);
}
Status CreateDatabase(const string& database_name) {
return harness_.CreateDatabase(database_name);
}
Status CreateKuduTable(const string& database_name,
const string& table_name,
MonoDelta timeout = {}) {
return HmsITestHarness::CreateKuduTable(database_name, table_name, client_, timeout);
}
Status CreateHmsTable(const string& database_name,
const string& table_name,
const string& table_type = hms::HmsClient::kManagedTable,
const optional<string>& kudu_table_name = nullopt) {
return harness_.CreateHmsTable(database_name, table_name, table_type, kudu_table_name);
}
Status RenameHmsTable(const string& database_name,
const string& old_table_name,
const string& new_table_name) {
return harness_.RenameHmsTable(database_name, old_table_name, new_table_name);
}
Status ChangeHmsOwner(const string& database_name,
const string& table_name,
const string& new_table_owner) {
return harness_.ChangeHmsOwner(database_name, table_name, new_table_owner);
}
Status ChangeHmsTableComment(const string& database_name,
const string& table_name,
const string& new_table_comment) {
return harness_.ChangeHmsTableComment(database_name, table_name, new_table_comment);
}
Status AlterHmsTableDropColumns(const string& database_name,
const string& table_name) {
return harness_.AlterHmsTableDropColumns(database_name, table_name);
}
Status AlterHmsTableExternalPurge(const string& database_name,
const string& table_name) {
return harness_.AlterHmsTableExternalPurge(database_name, table_name);
}
void CheckTable(const string& database_name,
const string& table_name,
const optional<string>& user,
const string& table_type = hms::HmsClient::kManagedTable) {
harness_.CheckTable(database_name, table_name, user, cluster_, client_, table_type);
}
void CheckTableDoesNotExist(const string& database_name,
const string& table_name) {
harness_.CheckTableDoesNotExist(database_name, table_name, client_);
}
protected:
HmsITestHarness harness_;
private:
virtual HmsMode GetHmsMode() {
return HmsMode::ENABLE_METASTORE_INTEGRATION;
}
virtual bool EnableKerberos() {
return false;
}
virtual string Principal() {
return "kudu";
}
virtual void SetFlags(ExternalMiniClusterOptions* opts) const {}
};
TEST_F(MasterHmsTest, TestCreateTable) {
const char* hms_database_name = "create_db";
const char* hms_table_name = "table";
string table_name = Substitute("$0.$1", hms_database_name, hms_table_name);
// Attempt to create the table before the database is created.
Status s = CreateKuduTable(hms_database_name, hms_table_name);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_OK(CreateDatabase(hms_database_name));
// Create a table entry with the name.
ASSERT_OK(CreateHmsTable(hms_database_name, hms_table_name));
// Attempt to create a Kudu table with the same name.
s = CreateKuduTable(hms_database_name, hms_table_name);
ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "table already exists");
// Attempt to create a Kudu table to an invalid table name.
s = CreateKuduTable(hms_database_name, "☃");
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "create_db.☃");
// Drop the HMS entry and create the table through Kudu.
ASSERT_OK(harness_.hms_client()->DropTable(hms_database_name, hms_table_name));
ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
NO_FATALS(CheckTable(hms_database_name, hms_table_name, /*user=*/nullopt));
// Create an external table in the HMS and validate it's not created Kudu side.
const char* external_table_name = "externalTable";
ASSERT_OK(CreateHmsTable(hms_database_name, external_table_name, HmsClient::kExternalTable));
shared_ptr<KuduTable> table;
s = client_->OpenTable(Substitute("$0.$1", hms_database_name, external_table_name), &table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
// Shutdown the HMS and try to create a table.
ASSERT_OK(StopHms());
s = CreateKuduTable(hms_database_name, "foo");
ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
// Start the HMS and try again.
ASSERT_OK(StartHms());
ASSERT_EVENTUALLY([&] {
// HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
ASSERT_OK(CreateKuduTable(hms_database_name, "foo"));
});
NO_FATALS(CheckTable(hms_database_name, "foo", /*user=*/nullopt));
}
TEST_F(MasterHmsTest, TestRenameTable) {
// Create the database and Kudu table.
ASSERT_OK(CreateDatabase("db"));
ASSERT_OK(CreateKuduTable("db", "a"));
NO_FATALS(CheckTable("db", "a", /*user=*/nullopt));
// Create a non-Kudu ('external') HMS table entry.
ASSERT_OK(CreateHmsTable("db", "b", HmsClient::kExternalTable));
// Attempt to rename the Kudu table to the same name.
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("db.a"));
Status s = table_alterer->RenameTo("db.a")->Alter();
ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "a already exists");
// Attempt to rename the Kudu table to the external table name.
table_alterer.reset(client_->NewTableAlterer("db.a"));
s = table_alterer->RenameTo("db.b")->Alter();
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "b already exists");
// Attempt to rename the Kudu table to an invalid database/table name pair.
table_alterer.reset(client_->NewTableAlterer("db.a"));
s = table_alterer->RenameTo("foo")->Alter();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), kInvalidHiveTableError);
// Attempt to rename the Kudu table to a non-existent database.
table_alterer.reset(client_->NewTableAlterer("db.a"));
s = table_alterer->RenameTo("non_existent_database.table")->Alter();
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
// TODO(HIVE-18852): match on the error message.
// Attempt to rename the Kudu table to an invalid table name.
table_alterer.reset(client_->NewTableAlterer("db.a"));
s = table_alterer->RenameTo("db.☃")->Alter();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "db.☃");
// Shutdown the HMS and try to rename the table.
ASSERT_OK(StopHms());
table_alterer.reset(client_->NewTableAlterer("db.a")->RenameTo("db.c"));
s = table_alterer->Alter();
ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
// Start the HMS and rename the table through Kudu.
ASSERT_OK(StartHms());
ASSERT_EVENTUALLY([&] {
// HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
ASSERT_OK(table_alterer->Alter());
});
NO_FATALS(CheckTable("db", "c", /*user=*/nullopt));
NO_FATALS(CheckTableDoesNotExist("db", "a"));
// Rename the table through the HMS, and ensure the rename is handled in Kudu.
ASSERT_OK(RenameHmsTable("db", "c", "d"));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTable("db", "d", /*user=*/nullopt));
});
// Check that the two tables still exist.
vector<string> tables;
ASSERT_OK(harness_.hms_client()->GetTableNames("db", &tables));
std::sort(tables.begin(), tables.end());
ASSERT_EQ(tables, vector<string>({ "b", "d" })) << tables;
// Rename the external table through the HMS and ensure Kudu allows it.
ASSERT_OK(RenameHmsTable("db", "b", "e"));
// Regression test for HIVE-19569
// If HIVE-19569 is in effect the rename across databases will result in DROP
// TABLE and CREATE TABLE events, which will cause the notification log
// listener to drop the table. The alter will succeed (see KUDU-2475), but the
// subsequent checks will fail, since the table will be deleted.
ASSERT_OK(CreateDatabase("db1"));
ASSERT_OK(CreateDatabase("db2"));
ASSERT_OK(CreateKuduTable("db1", "t1"));
NO_FATALS(CheckTable("db1", "t1", /*user=*/ nullopt));
table_alterer.reset(client_->NewTableAlterer("db1.t1"));
ASSERT_OK(table_alterer->RenameTo("db2.t2")->Alter());
NO_FATALS(CheckTable("db2", "t2", /*user=*/ nullopt));
NO_FATALS(CheckTableDoesNotExist("db1", "t1"));
NO_FATALS(CheckTableDoesNotExist("db1", "t2"));
}
TEST_F(MasterHmsTest, TestAlterTableOwner) {
// Create the Kudu table.
ASSERT_OK(CreateKuduTable("default", "userTable"));
NO_FATALS(CheckTable("default", "userTable", /*user=*/ nullopt));
// Change the owner through the HMS, and ensure the owner is handled in Kudu.
const string user_a = "user_a";
ASSERT_OK(ChangeHmsOwner("default", "userTable", user_a));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTable("default", "userTable", user_a));
});
// Change the owner through Kudu, and ensure the owner is reflected in HMS.
const string user_b = "user_b";
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("default.userTable"));
ASSERT_OK(table_alterer->SetOwner(user_b)->Alter());
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTable("default", "userTable", user_b));
});
}
TEST_F(MasterHmsTest, TestAlterTableComment) {
// Create the Kudu table.
ASSERT_OK(CreateKuduTable("default", "commentTable"));
NO_FATALS(CheckTable("default", "commentTable", /*user=*/ nullopt));
// Change the table comment through the HMS, and ensure the comment is handled in Kudu.
const string comment_a = "comment a";
ASSERT_OK(ChangeHmsTableComment("default", "commentTable", comment_a));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTable("default", "commentTable", /*user=*/nullopt));
});
// Change the table comment through Kudu, and ensure the comment is reflected in HMS.
const string comment_b = "comment b";
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("default.commentTable"));
ASSERT_OK(table_alterer->SetComment(comment_b)->Alter());
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTable("default", "commentTable", /*user=*/ nullopt));
});
}
TEST_F(MasterHmsTest, TestAlterTable) {
// Create the Kudu table.
ASSERT_OK(CreateKuduTable("default", "a"));
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
// Alter the HMS table entry in a destructive way (remove all columns).
ASSERT_OK(AlterHmsTableDropColumns("default", "a"));
// Drop a column in Kudu. This should correct the entire set of columns in the HMS.
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("default.a"));
ASSERT_OK(table_alterer->DropColumn("int8_val")->Alter());
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
// Alter a column comment in Kudu. This should be reflected in the HMS.
unique_ptr<KuduTableAlterer> comment_alterer(client_->NewTableAlterer("default.a"));
comment_alterer->AlterColumn("key")->Comment("");
comment_alterer->AlterColumn("int16_val")->Comment("A new comment");
ASSERT_OK(comment_alterer->Alter());
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
// Create the Kudu table and alter the HMS entry to be an external table
// with `external.table.purge = true`, then alter it in the HMS and ensure
// the Kudu table is actually altered.
ASSERT_OK(CreateKuduTable("default", "b"));
ASSERT_OK(AlterHmsTableExternalPurge("default", "b"));
NO_FATALS(CheckTable("default", "b", /*user=*/ nullopt, hms::HmsClient::kExternalTable));
unique_ptr<KuduTableAlterer> comment_alterer_b(client_->NewTableAlterer("default.b"));
comment_alterer_b->AlterColumn("int16_val")->Comment("Another comment");
ASSERT_OK(comment_alterer_b->Alter());
NO_FATALS(CheckTable("default", "b", /*user=*/ nullopt, hms::HmsClient::kExternalTable));
// Shutdown the HMS and try to alter the table.
ASSERT_OK(StopHms());
table_alterer.reset(client_->NewTableAlterer("default.a")->DropColumn("int16_val"));
Status s = table_alterer->Alter();
ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
// Start the HMS and try again.
ASSERT_OK(StartHms());
ASSERT_EVENTUALLY([&] {
// HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
ASSERT_OK(table_alterer->Alter());
});
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
// Only alter the table in Kudu, the corresponding table in the HMS will not be altered.
table_alterer.reset(client_->NewTableAlterer("default.a")->RenameTo("default.c")
->modify_external_catalogs(false));
ASSERT_OK(table_alterer->Alter());
bool exists;
ASSERT_OK(client_->TableExists("default.c", &exists));
ASSERT_TRUE(exists);
hive::Table hms_table;
ASSERT_OK(harness_.hms_client()->GetTable("default", "a", &hms_table));
}
TEST_F(MasterHmsTest, TestDeleteTable) {
// Create a Kudu table, then drop it from Kudu and ensure the HMS entry is removed.
ASSERT_OK(CreateKuduTable("default", "a"));
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
hive::Table hms_table;
ASSERT_OK(harness_.hms_client()->GetTable("default", "a", &hms_table));
ASSERT_OK(client_->DeleteTable("default.a"));
NO_FATALS(CheckTableDoesNotExist("default", "a"));
// Create the Kudu table, then drop it from the HMS, and ensure the Kudu table is deleted.
ASSERT_OK(CreateKuduTable("default", "b"));
NO_FATALS(CheckTable("default", "b", /*user=*/ nullopt));
hive::Table hms_table_b;
ASSERT_OK(harness_.hms_client()->GetTable("default", "b", &hms_table_b));
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable("default.b", &table));
ASSERT_OK(harness_.hms_client()->DropTable("default", "b"));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTableDoesNotExist("default", "b"));
});
// Create the Kudu table and alter the HMS entry to be an external table
// with `external.table.purge = true`, then drop it from the HMS, and ensure
// the Kudu table is deleted.
ASSERT_OK(CreateKuduTable("default", "b"));
ASSERT_OK(AlterHmsTableExternalPurge("default", "b"));
NO_FATALS(CheckTable("default", "b", /*user=*/ nullopt, hms::HmsClient::kExternalTable));
shared_ptr<KuduTable> table2;
ASSERT_OK(client_->OpenTable("default.b", &table2));
ASSERT_OK(harness_.hms_client()->DropTable("default", "b"));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTableDoesNotExist("default", "b"));
});
// Ensure that dropping a table while the HMS is unreachable fails.
ASSERT_OK(CreateKuduTable("default", "c"));
NO_FATALS(CheckTable("default", "c", /*user=*/ nullopt));
ASSERT_OK(StopHms());
Status s = client_->DeleteTable("default.c");
ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
ASSERT_OK(StartHms());
NO_FATALS(CheckTable("default", "c", /*user=*/ nullopt));
ASSERT_EVENTUALLY([&] {
// HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
ASSERT_OK(client_->DeleteTable("default.c"));
});
NO_FATALS(CheckTableDoesNotExist("default", "c"));
// Create a Kudu table, then only drop it from Kudu. Ensure the HMS
// entry is not removed.
ASSERT_OK(CreateKuduTable("default", "d"));
NO_FATALS(CheckTable("default", "d", /*user=*/ nullopt));
hive::Table hms_table_d;
ASSERT_OK(harness_.hms_client()->GetTable("default", "d", &hms_table_d));
ASSERT_OK(client_->DeleteTableInCatalogs("default.d", false));
s = client_->OpenTable(Substitute("$0.$1", "default", "d"), &table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_OK(harness_.hms_client()->GetTable("default", "d", &hms_table_d));
// Create and drop a non-Kudu ('external') HMS table entry and ensure Kudu allows it.
ASSERT_OK(CreateHmsTable("default", "externalTable", HmsClient::kExternalTable));
ASSERT_OK(harness_.hms_client()->DropTable("default", "externalTable"));
}
// Test to verify that the soft-deletion of tables is not supported when
// HMS is enabled.
TEST_F(MasterHmsTest, TableSoftDeleteNotSupportedWithHmsEnabled) {
// TODO(kedeng) : change the test case when state sync to HMS
// Create a Kudu table, then soft delete it from Kudu.
ASSERT_OK(CreateKuduTable("default", "a"));
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
hive::Table hms_table;
// Set --hive_metastore_uris to make HmsCatalog::IsEnabled() returns 'true'.
const auto& hms_uris = cluster_->hms()->uris();
ASSERT_TRUE(!hms_uris.empty());
FLAGS_hive_metastore_uris = hms_uris;
ASSERT_TRUE(hms::HmsCatalog::IsEnabled());
Status s = client_->SoftDeleteTable("default.a", 6000);
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
// The RecallTable is not supported, so the table id is ineffective.
s = client_->RecallTable("default.a");
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "RecallDeletedTable is not supported");
ASSERT_OK(harness_.hms_client()->GetTable("default", "a", &hms_table));
// The table is remain in the Kudu cluster.
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable("default.a", &table));
}
// This test makes sure that with the HMS integration enabled, both alter and
// drop/delete for a table work as expected after the soft-delete feature for
// tables has been introduced. There might be other scenarios elsewhere testing
// for that implicitly, but this scenario is run explicitly to check for the
// backward compatibility in that context.
TEST_F(MasterHmsTest, AlterAndDeleteTableWhenHmsEnabled) {
// Create the database and Kudu table.
ASSERT_OK(CreateDatabase("db"));
ASSERT_OK(CreateKuduTable("db", "a"));
NO_FATALS(CheckTable("db", "a", /*user=*/nullopt));
const auto& hms_uris = cluster_->hms()->uris();
ASSERT_TRUE(!hms_uris.empty());
// Set --hive_metastore_uris to make HmsCatalog::IsEnabled() returns 'true'.
FLAGS_hive_metastore_uris = hms_uris;
ASSERT_TRUE(hms::HmsCatalog::IsEnabled());
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("db.a"));
ASSERT_OK(table_alterer->RenameTo("db.b")->Alter());
ASSERT_OK(client_->DeleteTable("db.b"));
NO_FATALS(CheckTableDoesNotExist("db", "b"));
}
TEST_F(MasterHmsTest, TestNotificationLogListener) {
// Create a Kudu table.
ASSERT_OK(CreateKuduTable("default", "a"));
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
// Rename the table in the HMS, and ensure that the notification log listener
// detects the rename and updates the Kudu catalog accordingly.
ASSERT_OK(RenameHmsTable("default", "a", "b"));
ASSERT_EVENTUALLY([&] {
NO_FATALS({
CheckTable("default", "b", /*user=*/ nullopt);
CheckTableDoesNotExist("default", "a");
});
});
// Drop the table in the HMS, and ensure that the notification log listener
// detects the drop and updates the Kudu catalog accordingly.
ASSERT_OK(harness_.hms_client()->DropTable("default", "b"));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTableDoesNotExist("default", "b"));
});
// Rename a table from A to B to A, and ensure that Kudu doesn't continue
// applying notification log events in a self-perpetuating loop.
ASSERT_OK(CreateKuduTable("default", "a"));
unique_ptr<KuduTableAlterer> table_alterer;
table_alterer.reset(client_->NewTableAlterer("default.a")->RenameTo("default.b"));
ASSERT_OK(table_alterer->Alter());
NO_FATALS(CheckTable("default", "b", /*user=*/ nullopt));
table_alterer.reset(client_->NewTableAlterer("default.b")->RenameTo("default.a"));
ASSERT_OK(table_alterer->Alter());
NO_FATALS(CheckTable("default", "a", /*user=*/ nullopt));
// Ensure that Kudu can rename a table just after it's been renamed through the HMS.
ASSERT_OK(RenameHmsTable("default", "a", "b"));
table_alterer.reset(client_->NewTableAlterer("default.b")->RenameTo("default.c"));
ASSERT_OK(table_alterer->Alter());
// Ensure that Kudu can drop a table just after it's been renamed through the HMS.
ASSERT_OK(RenameHmsTable("default", "c", "a"));
ASSERT_OK(client_->DeleteTable("default.a"));
// Test concurrent drops from the HMS and Kudu.
// Scenario 1: drop from the HMS first.
ASSERT_OK(CreateKuduTable("default", "a"));
ASSERT_OK(harness_.hms_client()->DropTable("default", "a"));
Status s = client_->DeleteTable("default.a");
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
NO_FATALS(CheckTableDoesNotExist("default", "a"));
// Scenario 2: drop from Kudu first.
ASSERT_OK(CreateKuduTable("default", "a"));
ASSERT_OK(client_->DeleteTable("default.a"));
s = harness_.hms_client()->DropTable("default", "a");
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
NO_FATALS(CheckTableDoesNotExist("default", "a"));
// Create the Kudu table and alter the HMS entry to be an external table
// with `external.table.purge = true`.
ASSERT_OK(CreateKuduTable("default", "d"));
ASSERT_OK(AlterHmsTableExternalPurge("default", "d"));
NO_FATALS(CheckTable("default", "d", /*user=*/ nullopt, hms::HmsClient::kExternalTable));
// Rename the table in the HMS, and ensure that the notification log listener
// detects the rename and updates the Kudu catalog accordingly.
ASSERT_OK(RenameHmsTable("default", "d", "e"));
ASSERT_EVENTUALLY([&] {
NO_FATALS({
CheckTable("default", "e", /*user=*/ nullopt, hms::HmsClient::kExternalTable);
CheckTableDoesNotExist("default", "d");
});
});
// Drop the table in the HMS, and ensure that the notification log listener
// detects the drop and updates the Kudu catalog accordingly.
ASSERT_OK(harness_.hms_client()->DropTable("default", "e"));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTableDoesNotExist("default", "e"));
});
}
TEST_F(MasterHmsTest, TestIgnoreExternalTables) {
// Set up a managed table, and a couple of external tables to point at it.
ASSERT_OK(CreateKuduTable("default", "managed"));
const string kManagedTableName = "default.managed";
ASSERT_OK(CreateHmsTable("default", "ext1", HmsClient::kExternalTable, kManagedTableName));
ASSERT_OK(CreateHmsTable("default", "ext2", HmsClient::kExternalTable, kManagedTableName));
// Drop a table in the HMS and check that it didn't affect the underlying
// Kudu table.
ASSERT_OK(harness_.hms_client()->DropTable("default", "ext1"));
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kManagedTableName, &table));
// Do the same, but rename the HMS table.
hive::Table ext;
ASSERT_OK(harness_.hms_client()->GetTable("default", "ext2", &ext));
ext.tableName = "ext3";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "ext2", ext));
ASSERT_OK(client_->OpenTable(kManagedTableName, &table));
Status s = harness_.hms_client()->GetTable("default", "ext2", &ext);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
// Alter the table in Kudu, the external tables in the HMS will not be
// altered.
unique_ptr<KuduTableAlterer> table_alterer(
client_->NewTableAlterer(kManagedTableName)->RenameTo("default.other"));
ASSERT_OK(table_alterer->Alter());
ASSERT_OK(harness_.hms_client()->GetTable("default", "ext3", &ext));
ASSERT_EQ(kManagedTableName, ext.parameters[HmsClient::kKuduTableNameKey]);
}
TEST_F(MasterHmsTest, TestIgnoreOtherClusterIds) {
// Set up a few tables.
ASSERT_OK(CreateKuduTable("default", "same"));
ASSERT_OK(CreateKuduTable("default", "other"));
ASSERT_OK(CreateKuduTable("default", "noid"));
shared_ptr<KuduTable> kudu_table;
hive::Table hive_table;
// Alter the cluster id for the `other` table so that it no longer matches the cluster.
// This event and any future events should be ignored.
ASSERT_OK(harness_.hms_client()->GetTable("default", "other", &hive_table));
hive_table.parameters[HmsClient::kKuduClusterIdKey] = "other_cluster_id";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "other", hive_table));
// Remove the cluster id for the `noid` table to represent a table created before
// cluster id support was added.
ASSERT_OK(harness_.hms_client()->GetTable("default", "noid", &hive_table));
EraseKeyReturnValuePtr(&hive_table.parameters, HmsClient::kKuduClusterIdKey);
ASSERT_OK(harness_.hms_client()->AlterTable("default", "noid", hive_table));
// Alter the tables in the HMS, the `same` and `noid` tables should be updated in Kudu.
ASSERT_OK(harness_.hms_client()->GetTable("default", "same", &hive_table));
hive_table.tableName = "same1";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "same", hive_table));
ASSERT_OK(harness_.hms_client()->GetTable("default", "other", &hive_table));
hive_table.tableName = "other1";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "other", hive_table));
ASSERT_OK(harness_.hms_client()->GetTable("default", "noid", &hive_table));
hive_table.tableName = "noid1";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "noid", hive_table));
ASSERT_EVENTUALLY([&] {
ASSERT_OK(client_->OpenTable("default.same1", &kudu_table));
ASSERT_OK(client_->OpenTable("default.other", &kudu_table));
ASSERT_OK(client_->OpenTable("default.noid1", &kudu_table));
Status s = client_->OpenTable("default.other1", &kudu_table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
});
// Alter the tables back.
ASSERT_OK(harness_.hms_client()->GetTable("default", "same1", &hive_table));
hive_table.tableName = "same";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "same1", hive_table));
ASSERT_OK(harness_.hms_client()->GetTable("default", "other1", &hive_table));
hive_table.tableName = "other";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "other1", hive_table));
ASSERT_OK(harness_.hms_client()->GetTable("default", "noid1", &hive_table));
hive_table.tableName = "noid";
ASSERT_OK(harness_.hms_client()->AlterTable("default", "noid1", hive_table));
// Drop the tables in the HMS, the `same` and `noid` tables should be dropped in Kudu.
ASSERT_OK(harness_.hms_client()->DropTable("default", "same"));
ASSERT_OK(harness_.hms_client()->DropTable("default", "other"));
ASSERT_OK(harness_.hms_client()->DropTable("default", "noid"));
ASSERT_EVENTUALLY([&] {
ASSERT_OK(client_->OpenTable("default.other", &kudu_table));
Status s = client_->OpenTable("default.same", &kudu_table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
s = client_->OpenTable("default.noid", &kudu_table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
});
}
TEST_F(MasterHmsTest, TestUppercaseIdentifiers) {
ASSERT_OK(CreateKuduTable("default", "MyTable"));
NO_FATALS(CheckTable("default", "MyTable", /*user=*/nullopt));
NO_FATALS(CheckTable("default", "mytable", /*user=*/nullopt));
NO_FATALS(CheckTable("default", "MYTABLE", /*user=*/nullopt));
// Kudu table schema lookups should be case-insensitive.
for (const auto& name : { "default.MyTable",
"default.mytable",
"DEFAULT.MYTABLE",
"default.mYtABLE" }) {
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(name, &table));
// The client uses the requested name as the table object's name.
ASSERT_EQ(name, table->name());
}
// Listing tables shows the normalized case.
vector<string> tables;
ASSERT_OK(client_->ListTables(&tables));
ASSERT_EQ(tables, vector<string>({ "default.mytable" }));
// Rename the table to the same normalized name, but with a different case.
unique_ptr<KuduTableAlterer> table_alterer;
table_alterer.reset(client_->NewTableAlterer("default.mytable"));
Status s = table_alterer->RenameTo("DEFAULT.MYTABLE")->Alter();
ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
// Rename the table to something different.
table_alterer.reset(client_->NewTableAlterer("DEFAULT.MYTABLE"));
ASSERT_OK(table_alterer->RenameTo("default.T_1/1")->Alter());
NO_FATALS(CheckTable("default", "T_1/1", /*user=*/nullopt));
NO_FATALS(CheckTable("default", "t_1/1", /*user=*/nullopt));
NO_FATALS(CheckTable("DEFAULT", "T_1/1", /*user=*/nullopt));
// Rename the table through the HMS.
ASSERT_OK(RenameHmsTable("default", "T_1/1", "AbC"));
ASSERT_EVENTUALLY([&] {
NO_FATALS(CheckTable("default", "AbC", /*user=*/nullopt));
});
// Listing tables shows the normalized case.
tables.clear();
ASSERT_OK(client_->ListTables(&tables));
ASSERT_EQ(tables, vector<string>({ "default.abc" }));
// Drop the table.
ASSERT_OK(client_->DeleteTable("DEFAULT.abc"));
NO_FATALS(CheckTableDoesNotExist("default", "AbC"));
NO_FATALS(CheckTableDoesNotExist("default", "abc"));
}
// Test that we can create a transaction status table that doesn't get
// synchronized to the HMS.
TEST_F(MasterHmsTest, TestTransactionStatusTableDoesntSync) {
// Create a transaction status table.
unique_ptr<TxnSystemClient> txn_sys_client;
ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
cluster_->service_principal(),
&txn_sys_client));
ASSERT_OK(txn_sys_client->CreateTxnStatusTable(100));
// We shouldn't see the table in the HMS catalog.
vector<string> tables;
ASSERT_OK(harness_.hms_client()->GetTableNames("kudu_system", &tables));
ASSERT_TRUE(tables.empty());
// We should still be able to add partitions.
ASSERT_OK(txn_sys_client->AddTxnStatusTableRange(100, 200));
}
class MasterHmsUpgradeTest : public MasterHmsTest {
public:
HmsMode GetHmsMode() override {
return HmsMode::ENABLE_HIVE_METASTORE;
}
};
TEST_F(MasterHmsUpgradeTest, TestConflictingNormalizedNames) {
ASSERT_OK(CreateKuduTable("default", "MyTable"));
ASSERT_OK(CreateKuduTable("default", "mytable"));
// Shutdown the masters and turn on the HMS integration. The masters should
// fail to startup because of conflicting normalized table names.
cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
cluster_->EnableMetastoreIntegration();
// Typically, restarting the cluster will fail because the master will
// immediately try to elect itself leader, and CHECK fail upon seeing the
// conflicting table names. However, in TSAN or otherwise slow situations the
// master may be able to register the tablet server before attempting to elect
// itself leader, in which case ExternalMiniCluster::Restart() can succeed. In
// this situation a fallback to a leader-only API will deterministically fail.
Status s = cluster_->Restart();
if (!s.ok()) {
ASSERT_TRUE(s.IsTimedOut() || s.IsNetworkError()) << s.ToString();
} else {
vector<string> tables;
s = client_->ListTables(&tables);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
}
// Disable the metastore integration and rename one of the conflicting tables.
cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
cluster_->DisableMetastoreIntegration();
ASSERT_OK(cluster_->Restart());
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("default.mytable"));
ASSERT_OK(table_alterer->RenameTo("default.mytable-renamed")->Alter());
// Try again to enable the integration.
cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
cluster_->EnableMetastoreIntegration();
ASSERT_OK(cluster_->Restart());
vector<string> tables;
client_->ListTables(&tables);
std::sort(tables.begin(), tables.end());
ASSERT_EQ(tables, vector<string>({ "default.MyTable", "default.mytable-renamed" }));
}
// Checks that existing tables with HMS-incompatible names can be renamed post
// upgrade using a Kudu-catalog only alter.
TEST_F(MasterHmsUpgradeTest, TestRenameExistingTables) {
ASSERT_OK(CreateKuduTable("default", "UPPERCASE"));
ASSERT_OK(CreateKuduTable("default", "illegal-chars⁉"));
// Shutdown the masters and turn on the HMS integration
cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
cluster_->EnableMetastoreIntegration();
ASSERT_OK(cluster_->Restart());
vector<string> tables;
ASSERT_OK(client_->ListTables(&tables));
std::sort(tables.begin(), tables.end());
ASSERT_EQ(tables, vector<string>({ "default.UPPERCASE", "default.illegal-chars⁉" }));
// Rename the tables using a Kudu catalog only rename.
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer("default.UPPERCASE"));
ASSERT_OK(alterer->RenameTo("default.uppercase")->modify_external_catalogs(false)->Alter());
alterer.reset(client_->NewTableAlterer("default.illegal-chars⁉"));
ASSERT_OK(alterer->RenameTo("default.illegal_chars")->modify_external_catalogs(false)->Alter());
tables.clear();
client_->ListTables(&tables);
std::sort(tables.begin(), tables.end());
ASSERT_EQ(tables, vector<string>({ "default.illegal_chars", "default.uppercase" }));
}
class MasterHmsKerberizedTest : public MasterHmsTest {
public:
bool EnableKerberos() override {
return true;
}
string Principal() override {
return "oryx";
}
};
// Checks that table ownership in a Kerberized cluster is set to the user
// short-name (instead of the full Kerberos principal).
TEST_F(MasterHmsKerberizedTest, TestTableOwnership) {
// Log in as the test user and reset the client to pick up the change in user.
ASSERT_OK(cluster_->kdc()->Kinit("test-user"));
ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
// Create a table as the user and ensure that the ownership is set correctly.
ASSERT_OK(CreateKuduTable("default", "my_table"));
hive::Table table;
ASSERT_OK(harness_.hms_client()->GetTable("default", "my_table", &table));
ASSERT_EQ("test-user", table.owner);
}
class MasterHmsDBTest : public MasterHmsTest {
public:
void SetFlags(ExternalMiniClusterOptions* opts) const override {
opts->extra_master_flags.emplace_back("--logtostderr=false");
}
};
// TODO(achennaka): Skip test until server timeouts seen in ASAN builds are resolved.
TEST_F(MasterHmsDBTest, DISABLED_TestHMSDBErase) {
// Generate 2 HMS events. With just one event created the published
// event id is not 0 after the database drop.
ASSERT_OK(CreateKuduTable("default", "a"));
unique_ptr<KuduTableAlterer> table_alterer;
table_alterer.reset(client_->NewTableAlterer("default.a")->RenameTo("default.b"));
ASSERT_OK(table_alterer->Alter());
// Ensure the table is present in HMS database.
NO_FATALS(CheckTable("default", "b", /*user=*/ nullopt));
// Simulate accidental HMS database loss.
ASSERT_OK(StopHms());
ASSERT_OK(cluster_->hms()->DeleteDatabaseDir());
ASSERT_OK(StartHms());
// Now everytime we poll HMS for new events, we should be logging an error message.
constexpr const char* const str = "The event ID 2 last seen by Kudu master is greater "
"than 0 currently reported by HMS. Has the HMS database "
"been reset (backup&restore, etc.)?";
ASSERT_EVENTUALLY([&] {
string line;
bool log_found = false;
std::ifstream in(strings::Substitute("$0/kudu-master.ERROR", cluster_->master()->log_dir()));
while (std::getline(in, line)) {
if (line.find(str) != std::string::npos) {
log_found = true;
break;
}
}
ASSERT_TRUE(log_found);
});
}
} // namespace kudu