blob: 467079c58bff7701b4600f1967482d136e272d73 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint> // IWYU pragma: keep
#include <functional>
#include <memory>
#include <ostream> // IWYU pragma: keep
#include <set>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client-test-util.h" // IWYU pragma: keep
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/common.pb.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/strip.h" // IWYU pragma: keep
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/master/sys_catalog.h" // IWYU pragma: keep
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h" // IWYU pragma: keep
#include "kudu/util/random.h"
#include "kudu/util/status.h"
#include "kudu/util/subprocess.h" // IWYU pragma: keep
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
METRIC_DECLARE_entity(server);
METRIC_DECLARE_histogram(handler_latency_kudu_consensus_ConsensusService_GetNodeInstance);
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalDaemon;
using kudu::cluster::ExternalMaster;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::cluster::ScopedResumeExternalDaemon;
using kudu::itest::GetInt64Metric;
using std::set;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace kudu {
// Note: this test needs to be in the client namespace in order for
// KuduClient::Data class methods to be visible via FRIEND_TEST macro.
namespace client {
const int kNumTabletServerReplicas = 3;
// Parameterized based on HmsMode.
class MasterFailoverTest : public KuduTest,
public ::testing::WithParamInterface<HmsMode> {
public:
enum CreateTableMode {
kWaitForCreate = 0,
kNoWaitForCreate = 1
};
MasterFailoverTest() {
opts_.num_masters = 3;
opts_.num_tablet_servers = kNumTabletServerReplicas;
opts_.hms_mode = GetParam();
// Reduce various timeouts below as to make the detection of
// leader master failures (specifically, failures as result of
// long pauses) more rapid.
// Set max missed heartbeats periods to 1.0 (down from 3.0).
opts_.extra_master_flags.emplace_back("--leader_failure_max_missed_heartbeat_periods=1.0");
// Set the TS->master heartbeat timeout to 1 second (down from 15 seconds).
opts_.extra_tserver_flags.emplace_back("--heartbeat_rpc_timeout_ms=1000");
// Allow one TS heartbeat failure before retrying with back-off (down from 3).
opts_.extra_tserver_flags.emplace_back("--heartbeat_max_failures_before_backoff=1");
// Wait for 500 ms after 'max_consecutive_failed_heartbeats'
// before trying again (down from 1 second).
opts_.extra_tserver_flags.emplace_back("--heartbeat_interval_ms=500");
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
NO_FATALS(RestartCluster());
}
virtual void TearDown() OVERRIDE {
if (cluster_) {
cluster_->Shutdown();
}
KuduTest::TearDown();
}
void RestartCluster() {
if (cluster_) {
cluster_->Shutdown();
cluster_.reset();
}
cluster_.reset(new ExternalMiniCluster(opts_));
ASSERT_OK(cluster_->Start());
KuduClientBuilder builder;
// Create and alter table operation timeouts can be extended via their
// builders, but there's no such option for DeleteTable, so we extend
// the global operation timeout.
builder.default_admin_operation_timeout(MonoDelta::FromSeconds(90));
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
}
Status CreateTable(const std::string& table_name, CreateTableMode mode) {
KuduSchema schema;
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->NotNull();
CHECK_OK(b.Build(&schema));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
return table_creator->table_name(table_name)
.schema(&schema)
.set_range_partition_columns({ "key" })
.wait(mode == kWaitForCreate)
.Create();
}
Status RenameTable(const std::string& table_name_orig, const std::string& table_name_new) {
unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_name_orig));
return table_alterer
->RenameTo(table_name_new)
->wait(true)
->Alter();
}
protected:
ExternalMiniClusterOptions opts_;
unique_ptr<ExternalMiniCluster> cluster_;
shared_ptr<KuduClient> client_;
};
// Run the test with the HMS integration enabled and disabled.
INSTANTIATE_TEST_CASE_P(HmsConfigurations, MasterFailoverTest, ::testing::ValuesIn(
vector<HmsMode> { HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION }
));
// Test that synchronous CreateTable (issue CreateTable call and then
// wait until the table has been created) works even when the original
// leader master has been paused.
TEST_P(MasterFailoverTest, TestCreateTableSync) {
const char* kTableName = "default.test_create_table_sync";
if (!AllowSlowTests()) {
LOG(INFO) << "This test can only be run in slow mode.";
return;
}
LOG(INFO) << "Pausing leader master";
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
ASSERT_OK(cluster_->master(leader_idx)->Pause());
ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx));
// As Pause() is asynchronous, the following sequence of events is possible:
//
// 1. Pause() is issued.
// 2. CreateTable() is issued.
// 3. Leader master receives CreateTable() and creates the table.
// 4. Leader master is paused before the CreateTable() response is sent.
// 5. Client times out, finds the new master, and retries CreateTable().
// 6. The retry fails because the table was already created in step 3.
Status s = CreateTable(kTableName, kWaitForCreate);
ASSERT_TRUE(s.ok() || s.IsAlreadyPresent()) << s.ToString();
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
ASSERT_EQ(0, CountTableRows(table.get()));
}
// Test that we can issue a CreateTable call, pause the leader master
// immediately after, then verify that the table has been created on
// the newly elected leader master.
TEST_P(MasterFailoverTest, TestPauseAfterCreateTableIssued) {
const char* kTableName = "default.test_pause_after_create_table_issued";
if (!AllowSlowTests()) {
LOG(INFO) << "This test can only be run in slow mode.";
return;
}
ASSERT_OK(CreateTable(kTableName, kNoWaitForCreate));
LOG(INFO) << "Pausing leader master";
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
ASSERT_OK(cluster_->master(leader_idx)->Pause());
ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx));
AssertEventually([&]() {
bool in_progress;
ASSERT_OK(client_->IsCreateTableInProgress(kTableName, &in_progress));
ASSERT_FALSE(in_progress);
}, MonoDelta::FromSeconds(90));
NO_PENDING_FATALS();
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));
ASSERT_EQ(0, CountTableRows(table.get()));
}
// Test the scenario where we create a table, pause the leader master,
// and then issue the DeleteTable call: DeleteTable should go to the newly
// elected leader master and succeed.
TEST_P(MasterFailoverTest, TestDeleteTableSync) {
const char* kTableName = "default.test_delete_table_sync";
if (!AllowSlowTests()) {
LOG(INFO) << "This test can only be run in slow mode.";
return;
}
ASSERT_OK(CreateTable(kTableName, kWaitForCreate));
LOG(INFO) << "Pausing leader master";
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
ASSERT_OK(cluster_->master(leader_idx)->Pause());
ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx));
// It's possible for DeleteTable() to delete the table and still return
// NotFound. See TestCreateTableSync for details.
Status s = client_->DeleteTable(kTableName);
ASSERT_TRUE(s.ok() || s.IsNotFound()) << s.ToString();
shared_ptr<KuduTable> table;
s = client_->OpenTable(kTableName, &table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
}
// Test the scenario where we create a table, pause the leader master,
// and then issue the AlterTable call renaming a table: AlterTable
// should go to the newly elected leader master and succeed, renaming
// the table.
//
// TODO(unknown): Add an equivalent async test. Add a test for adding and/or
// renaming a column in a table.
TEST_P(MasterFailoverTest, TestRenameTableSync) {
const char* kTableNameOrig = "default.test_alter_table_sync";
const char* kTableNameNew = "default.test_alter_table_sync_renamed";
if (!AllowSlowTests()) {
LOG(INFO) << "This test can only be run in slow mode.";
return;
}
ASSERT_OK(CreateTable(kTableNameOrig, kWaitForCreate));
LOG(INFO) << "Pausing leader master";
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
ASSERT_OK(cluster_->master(leader_idx)->Pause());
ScopedResumeExternalDaemon resume_daemon(cluster_->master(leader_idx));
// It's possible for AlterTable() to rename the table and still return
// NotFound. See TestCreateTableSync for details.
Status s = RenameTable(kTableNameOrig, kTableNameNew);
ASSERT_TRUE(s.ok() || s.IsNotFound()) << s.ToString();
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableNameNew, &table));
s = client_->OpenTable(kTableNameOrig, &table);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
}
TEST_P(MasterFailoverTest, TestKUDU1374) {
const char* kTableName = "default.test_kudu_1374";
// Wait at least one additional heartbeat interval after creating the table.
// The idea is to guarantee that all tservers sent a tablet report with the
// new (dirty) tablet, and should now be sending empty incremental tablet
// reports.
ASSERT_OK(CreateTable(kTableName, kWaitForCreate));
SleepFor(MonoDelta::FromMilliseconds(1000));
// Force all asynchronous RPCs sent by the leader master to fail. This
// means the subsequent AlterTable() will be hidden from the tservers,
// at least while this master is leader.
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
ExternalDaemon* leader_master = cluster_->master(leader_idx);
ASSERT_OK(cluster_->SetFlag(leader_master,
"catalog_manager_fail_ts_rpcs", "true"));
unique_ptr<KuduTableAlterer> alter(client_->NewTableAlterer(kTableName));
alter->AddColumn("new_col")->Type(KuduColumnSchema::INT32);
ASSERT_OK(alter
->wait(false)
->Alter());
leader_master->Shutdown();
// Wait for the AlterTable() to finish. Progress is as follows:
// 1. TS notices the change in leadership and sends a full tablet report.
// 2. Leader master notices that the reported tablet isn't fully altered
// and sends the TS an AlterSchema() RPC.
// 3. TS updates the tablet's schema. This also dirties the tablet.
// 4. TS send an incremental tablet report containing the dirty tablet.
// 5. Leader master sees that all tablets in the table now have the new
// schema and ends the AlterTable() operation.
// 6. The next IsAlterTableInProgress() call returns false.
AssertEventually([&]() {
bool in_progress;
ASSERT_OK(client_->IsAlterTableInProgress(kTableName, &in_progress));
ASSERT_FALSE(in_progress);
}, MonoDelta::FromSeconds(90));
NO_PENDING_FATALS();
}
TEST_P(MasterFailoverTest, TestMasterUUIDResolution) {
// After a fresh start, the masters should have received RPCs asking for
// their UUIDs.
for (int i = 0; i < cluster_->num_masters(); i++) {
int64_t num_get_node_instances;
ASSERT_OK(GetInt64Metric(
cluster_->master(i)->bound_http_hostport(),
&METRIC_ENTITY_server, "kudu.master",
&METRIC_handler_latency_kudu_consensus_ConsensusService_GetNodeInstance,
"total_count", &num_get_node_instances));
// Client-side timeouts may increase the number of calls beyond the raw
// number of masters.
ASSERT_GE(num_get_node_instances, cluster_->num_masters());
}
// Restart the masters. They should reuse one another's UUIDs from the cached
// consensus metadata instead of sending RPCs to discover them. See KUDU-526.
cluster_->Shutdown();
ASSERT_OK(cluster_->Restart());
// To determine whether the cached UUIDs were used, let's look at the number
// of GetNodeInstance() calls each master serviced. It should be zero.
for (int i = 0; i < cluster_->num_masters(); i++) {
ExternalMaster* master = cluster_->master(i);
int64_t num_get_node_instances;
ASSERT_OK(GetInt64Metric(
master->bound_http_hostport(),
&METRIC_ENTITY_server, "kudu.master",
&METRIC_handler_latency_kudu_consensus_ConsensusService_GetNodeInstance,
"total_count", &num_get_node_instances));
EXPECT_EQ(0, num_get_node_instances) <<
Substitute(
"Following restart, master $0 has serviced $1 GetNodeInstance() calls",
master->bound_rpc_hostport().ToString(),
num_get_node_instances);
}
}
TEST_P(MasterFailoverTest, TestMasterPermanentFailure) {
const string kBinPath = cluster_->GetBinaryPath("kudu");
Random r(SeedRandom());
// Repeat the test for each master.
for (int i = 0; i < cluster_->num_masters(); i++) {
ExternalMaster* failed_master = cluster_->master(i);
// "Fail" a master and blow away its state completely.
failed_master->Shutdown();
ASSERT_OK(failed_master->DeleteFromDisk());
// Pick another master at random to serve as a basis for recovery.
//
// This isn't completely safe; see KUDU-1556 for details.
ExternalMaster* other_master;
do {
other_master = cluster_->master(r.Uniform(cluster_->num_masters()));
} while (other_master->uuid() == failed_master->uuid());
// Find the UUID of the failed master using the other master's cmeta file.
string uuid;
{
vector<string> args = {
kBinPath,
"local_replica",
"cmeta",
"print_replica_uuids",
"--fs_wal_dir=" + other_master->wal_dir(),
"--fs_data_dirs=" + other_master->data_dir(),
master::SysCatalogTable::kSysCatalogTabletId
};
string output;
ASSERT_OK(Subprocess::Call(args, "", &output));
StripWhiteSpace(&output);
LOG(INFO) << "UUIDS: " << output;
set<string> uuids = Split(output, " ");
// Isolate the failed master's UUID by eliminating the UUIDs of the
// healthy masters from the set.
for (int j = 0; j < cluster_->num_masters(); j++) {
if (j == i) continue;
uuids.erase(cluster_->master(j)->uuid());
}
ASSERT_EQ(1, uuids.size());
uuid = *uuids.begin();
}
// Format a new filesystem with the same UUID as the failed master.
{
vector<string> args = {
kBinPath,
"fs",
"format",
"--fs_wal_dir=" + failed_master->wal_dir(),
"--fs_data_dirs=" + failed_master->data_dir(),
"--uuid=" + uuid
};
ASSERT_OK(Subprocess::Call(args));
}
// Copy the master tablet from the other master.
{
vector<string> args = {
kBinPath,
"local_replica",
"copy_from_remote",
"--fs_wal_dir=" + failed_master->wal_dir(),
"--fs_data_dirs=" + failed_master->data_dir(),
master::SysCatalogTable::kSysCatalogTabletId,
other_master->bound_rpc_hostport().ToString()
};
ASSERT_OK(Subprocess::Call(args));
}
// Bring up the new master.
//
// Technically this reuses the failed master's data directory, but that
// directory was emptied when we "failed" the master, so this still
// qualifies as a "new" master for all intents and purposes.
ASSERT_OK(failed_master->Start());
// Do some operations.
string table_name = Substitute("default.table_$0", i);
ASSERT_OK(CreateTable(table_name, kWaitForCreate));
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(table_name, &table));
ASSERT_EQ(0, CountTableRows(table.get()));
// Repeat these operations with each of the masters paused.
//
// Only in slow mode.
if (AllowSlowTests()) {
for (int j = 0; j < cluster_->num_masters(); j++) {
ASSERT_OK(cluster_->master(j)->Pause());
ScopedResumeExternalDaemon resume_daemon(cluster_->master(j));
string table_name = Substitute("default.table_$0_$1", i, j);
// See TestCreateTableSync to understand why we must check for
// IsAlreadyPresent as well.
Status s = CreateTable(table_name, kWaitForCreate);
ASSERT_TRUE(s.ok() || s.IsAlreadyPresent()) << s.ToString();
ASSERT_OK(client_->OpenTable(table_name, &table));
ASSERT_EQ(0, CountTableRows(table.get()));
}
}
}
}
} // namespace client
} // namespace kudu