blob: ff7e84c870a7e53c6973579815efd9e7a5d2b24a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <deque>
#include <iterator>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/integration-tests/ts_itest-base.h"
#include "kudu/master/master.pb.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/tool_test_util.h"
#include "kudu/tserver/tablet_server-test-base.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_int32(num_replicas);
DECLARE_int32(num_tablet_servers);
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalTabletServer;
using kudu::consensus::COMMITTED_OPID;
using kudu::consensus::ConsensusStatePB;
using kudu::consensus::EXCLUDE_HEALTH_REPORT;
using kudu::consensus::OpId;
using kudu::itest::FindTabletFollowers;
using kudu::itest::FindTabletLeader;
using kudu::itest::GetConsensusState;
using kudu::itest::StartElection;
using kudu::itest::WaitUntilLeader;
using kudu::itest::TabletServerMap;
using kudu::itest::TServerDetails;
using kudu::itest::WAIT_FOR_LEADER;
using kudu::itest::WaitForReplicasReportedToMaster;
using kudu::itest::WaitForServersToAgree;
using kudu::itest::WaitUntilCommittedConfigNumVotersIs;
using kudu::itest::WaitUntilCommittedOpIdIndexIs;
using kudu::itest::WaitUntilTabletInState;
using kudu::itest::WaitUntilTabletRunning;
using kudu::master::VOTER_REPLICA;
using kudu::pb_util::SecureDebugString;
using kudu::tserver::ListTabletsResponsePB;
using std::atomic;
using std::back_inserter;
using std::copy;
using std::deque;
using std::endl;
using std::ostringstream;
using std::string;
using std::thread;
using std::tuple;
using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace kudu {
class Schema;
namespace tools {
// Helper to format info when a tool action fails.
static string ToolRunInfo(const Status& s, const string& out, const string& err) {
ostringstream str;
str << s.ToString() << endl;
str << "stdout: " << out << endl;
str << "stderr: " << err << endl;
return str.str();
}
// Helper macro for tool tests. Use as follows:
//
// ASSERT_TOOL_OK("cluster", "ksck", master_addrs);
//
// The failure Status result of RunKuduTool is usually useless, so this macro
// also logs the stdout and stderr in case of failure, for easier diagnosis.
// TODO(wdberkeley): Add a macro to retrieve stdout or stderr, or a macro for
// when one of those should match a string.
#define ASSERT_TOOL_OK(...) do { \
const vector<string>& _args{__VA_ARGS__}; \
string _out, _err; \
const Status& _s = RunKuduTool(_args, &_out, &_err); \
if (_s.ok()) { \
SUCCEED(); \
} else { \
FAIL() << ToolRunInfo(_s, _out, _err); \
} \
} while (0);
class AdminCliTest : public tserver::TabletServerIntegrationTestBase {
};
// Test config change while running a workload.
// 1. Instantiate external mini cluster with 3 TS.
// 2. Create table with 2 replicas.
// 3. Invoke CLI to trigger a config change.
// 4. Wait until the new server bootstraps.
// 5. Profit!
TEST_F(AdminCliTest, TestChangeConfig) {
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
"--allow_unsafe_replication_factor=true",
// If running with the 3-4-3 replication scheme, the catalog manager removes
// excess replicas, so it's necessary to disable that default behavior
// since this test manages replicas on its own.
"--catalog_manager_evict_excess_replicas=false",
};
const vector<string> kTserverFlags = {
"--enable_leader_failure_detection=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 2;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
TabletServerMap active_tablet_servers;
TabletServerMap::const_iterator iter = tablet_replicas_.find(tablet_id_);
TServerDetails* leader = iter->second;
TServerDetails* follower = (++iter)->second;
InsertOrDie(&active_tablet_servers, leader->uuid(), leader);
InsertOrDie(&active_tablet_servers, follower->uuid(), follower);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_NE(nullptr, new_node);
// Elect the leader (still only a consensus config size of 2).
ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers,
tablet_id_, 1));
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_timeout_allowed(true);
workload.set_write_timeout_millis(10000);
workload.set_num_replicas(FLAGS_num_replicas);
workload.set_num_write_threads(1);
workload.set_write_batch_size(1);
workload.Setup();
workload.Start();
// Wait until the Master knows about the leader tserver.
TServerDetails* master_observed_leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
LOG(INFO) << "Adding replica at tserver with UUID "
<< new_node->uuid() << " as VOTER...";
ASSERT_TOOL_OK(
"tablet",
"change_config",
"add_replica",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_,
new_node->uuid(),
"VOTER"
);
InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node);
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
leader, tablet_id_,
MonoDelta::FromSeconds(10)));
workload.StopAndJoin();
int num_batches = workload.batches_completed();
LOG(INFO) << "Waiting for replicas to agree...";
// Wait for all servers to replicate everything up through the last write op.
// Since we don't batch, there should be at least # rows inserted log entries,
// plus the initial leader's no-op, plus 1 for
// the added replica for a total == #rows + 2.
int min_log_index = num_batches + 2;
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30),
active_tablet_servers, tablet_id_,
min_log_index));
int rows_inserted = workload.rows_inserted();
LOG(INFO) << "Number of rows inserted: " << rows_inserted;
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::AT_LEAST, rows_inserted));
// Now remove the server.
LOG(INFO) << "Removing replica at tserver with UUID "
<< new_node->uuid() << " from the config...";
ASSERT_TOOL_OK(
"tablet",
"change_config",
"remove_replica",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_,
new_node->uuid()
);
ASSERT_EQ(1, active_tablet_servers.erase(new_node->uuid()));
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
leader, tablet_id_,
MonoDelta::FromSeconds(10)));
}
enum class Kudu1097 {
Disable,
Enable,
};
enum class DownTS {
None,
TabletPeer,
// Regression case for KUDU-2331.
UninvolvedTS,
};
class MoveTabletParamTest :
public AdminCliTest,
public ::testing::WithParamInterface<tuple<Kudu1097, DownTS>> {
};
TEST_P(MoveTabletParamTest, Test) {
const MonoDelta timeout = MonoDelta::FromSeconds(30);
const auto& param = GetParam();
const auto enable_kudu_1097 = std::get<0>(param);
const auto downTS = std::get<1>(param);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
vector<string> ts_flags, master_flags;
ts_flags = master_flags = {
Substitute("--raft_prepare_replacement_before_eviction=$0",
enable_kudu_1097 == Kudu1097::Enable) };
NO_FATALS(BuildAndStart(ts_flags, master_flags));
vector<string> tservers;
AppendKeysFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
deque<string> active_tservers;
for (auto iter = tablet_replicas_.find(tablet_id_); iter != tablet_replicas_.cend(); ++iter) {
active_tservers.push_back(iter->second->uuid());
}
ASSERT_EQ(FLAGS_num_replicas, active_tservers.size());
deque<string> inactive_tservers;
std::sort(tservers.begin(), tservers.end());
std::sort(active_tservers.begin(), active_tservers.end());
std::set_difference(tservers.cbegin(), tservers.cend(),
active_tservers.cbegin(), active_tservers.cend(),
std::back_inserter(inactive_tservers));
ASSERT_EQ(FLAGS_num_tablet_servers - FLAGS_num_replicas, inactive_tservers.size());
// The workload is light (1 thread, 1 op batches) so that new replicas
// bootstrap and converge quickly.
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_num_replicas(FLAGS_num_replicas);
workload.set_num_write_threads(1);
workload.set_write_batch_size(1);
workload.set_write_timeout_millis(timeout.ToMilliseconds());
workload.Setup();
workload.Start();
if (downTS == DownTS::TabletPeer) {
// To test that the move fails if any peer is down, when downTS is
// 'TabletPeer', shut down a server besides 'add' that hosts a replica.
NO_FATALS(cluster_->tablet_server_by_uuid(active_tservers.back())->Shutdown());
} else if (downTS == DownTS::UninvolvedTS) {
// Regression case for KUDU-2331, where move_replica would fail if any tablet
// server is down, even if that tablet server was not involved in the move.
NO_FATALS(cluster_->tablet_server_by_uuid(inactive_tservers.back())->Shutdown());
}
// If we're not bringing down a tablet server, do 3 moves.
// Assuming no ad hoc leadership changes, 3 guarantees the leader is moved at least once.
int num_moves = AllowSlowTests() && (downTS == DownTS::None) ? 3 : 1;
for (int i = 0; i < num_moves; i++) {
const string remove = active_tservers.front();
const string add = inactive_tservers.front();
vector<string> tool_command = {
"tablet",
"change_config",
"move_replica",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_,
remove,
add,
};
string stdout, stderr;
Status s = RunKuduTool(tool_command, &stdout, &stderr);
if (downTS == DownTS::TabletPeer) {
ASSERT_TRUE(s.IsRuntimeError());
workload.StopAndJoin();
return;
}
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
active_tservers.pop_front();
active_tservers.push_back(add);
inactive_tservers.pop_front();
inactive_tservers.push_back(remove);
// Allow the added server time to catch up so it applies the newest configuration.
// If we don't wait, the initial ksck of move_tablet can fail with consensus conflict.
TabletServerMap active_tservers_map;
for (const string& uuid : active_tservers) {
InsertOrDie(&active_tservers_map, uuid, tablet_servers_[uuid]);
}
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(/*num_voters=*/ FLAGS_num_replicas,
active_tservers_map[add],
tablet_id_, timeout));
NO_FATALS(WaitUntilCommittedConfigNumMembersIs(/*num_members=*/ FLAGS_num_replicas,
active_tservers_map[add],
tablet_id_, timeout));
}
workload.StopAndJoin();
NO_FATALS(cluster_->AssertNoCrashes());
// If a tablet server is down, we need to skip the ClusterVerifier.
if (downTS == DownTS::None) {
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
}
}
INSTANTIATE_TEST_CASE_P(EnableKudu1097AndDownTS, MoveTabletParamTest,
::testing::Combine(::testing::Values(Kudu1097::Disable,
Kudu1097::Enable),
::testing::Values(DownTS::None,
DownTS::TabletPeer,
DownTS::UninvolvedTS)));
Status RunUnsafeChangeConfig(const string& tablet_id,
const string& dst_host,
const vector<string>& peer_uuid_list) {
vector<string> command_args = {
"remote_replica",
"unsafe_change_config",
dst_host,
tablet_id
};
copy(peer_uuid_list.begin(), peer_uuid_list.end(), back_inserter(command_args));
return RunKuduTool(command_args);
}
// Test unsafe config change when there is one follower survivor in the cluster.
// 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down leader and follower1.
// 3. Trigger unsafe config change on follower2 having follower2 in the config.
// 4. Wait until the new config is populated on follower2(new leader) and master.
// 5. Bring up leader and follower1 and verify replicas are deleted.
// 6. Verify that new config doesn't contain old leader and follower1.
TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
// tserver_unresponsive_timeout_ms is useful so that master considers
// the live tservers for tablet re-replication.
NO_FATALS(BuildAndStart());
LOG(INFO) << "Finding tablet leader and waiting for things to start...";
string tablet_id = tablet_replicas_.begin()->first;
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id, kTimeout,
WAIT_FOR_LEADER,
VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id, kTimeout, &leader_ts));
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id, kTimeout, &followers));
OpId opid;
ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id, COMMITTED_OPID, kTimeout, &opid));
// Shut down master so it doesn't interfere while we shut down the leader and
// one of the other followers.
cluster_->master()->Shutdown();
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
LOG(INFO) << "Forcing unsafe config change on remaining follower " << followers[0]->uuid();
const string& follower0_addr =
cluster_->tablet_server_by_uuid(followers[0]->uuid())->bound_rpc_addr().ToString();
ASSERT_OK(RunUnsafeChangeConfig(tablet_id, follower0_addr, { followers[0]->uuid() }));
ASSERT_OK(WaitUntilLeader(followers[0], tablet_id, kTimeout));
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, followers[0], tablet_id, kTimeout));
LOG(INFO) << "Restarting master...";
// Restart master so it can re-replicate the tablet to remaining tablet servers.
ASSERT_OK(cluster_->master()->Restart());
// Wait for master to re-replicate.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, followers[0], tablet_id, kTimeout));
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
ASSERT_OK(WaitForOpFromCurrentTerm(followers[0], tablet_id, COMMITTED_OPID, kTimeout, &opid));
active_tablet_servers.clear();
std::unordered_set<string> replica_uuids;
for (const auto& loc : tablet_locations.replicas()) {
const string& uuid = loc.ts_info().permanent_uuid();
InsertOrDie(&active_tablet_servers, uuid, tablet_servers_[uuid]);
}
ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id, opid.index()));
// Verify that two new servers are part of new config and old
// servers are gone.
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid());
}
// Also verify that when we bring back followers[1] and leader,
// we should see the tablet in TOMBSTONED state on these servers.
ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart());
ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart());
ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::STOPPED, kTimeout));
ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::STOPPED, kTimeout));
}
// Test unsafe config change when there is one leader survivor in the cluster.
// 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both followers.
// 3. Trigger unsafe config change on leader having leader in the config.
// 4. Wait until the new config is populated on leader and master.
// 5. Verify that new config does not contain old followers.
TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
// Shut down servers follower1 and follower2,
// so that we can force new config on remaining leader.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
LOG(INFO) << "Forcing unsafe config change on tserver " << leader_ts->uuid();
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
// Check that new config is populated to a new follower.
vector<TServerDetails*> all_tservers;
TServerDetails *new_follower = nullptr;
AppendValuesFromMap(tablet_servers_, &all_tservers);
for (const auto& ts :all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_follower = ts;
break;
}
}
ASSERT_TRUE(new_follower != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on new_follower.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_follower, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
}
}
// Test unsafe config change when the unsafe config contains 2 nodes.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down leader.
// 3. Trigger unsafe config change on follower1 having follower1 and follower2 in the config.
// 4. Wait until the new config is populated on new_leader and master.
// 5. Verify that new config does not contain old leader.
TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 4;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
// Shut down leader and prepare 2-node config.
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid();
const string& follower1_addr = Substitute("$0:$1",
followers[1]->registration.rpc_addresses(0).host(),
followers[1]->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { followers[0]->uuid(),
followers[1]->uuid(), };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
// Find a remaining node which will be picked for re-replication.
vector<TServerDetails*> all_tservers;
AppendValuesFromMap(tablet_servers_, &all_tservers);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on follower1.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid());
}
}
// Test unsafe config change on a 5-replica tablet when the unsafe config contains 2 nodes.
// 1. Instantiate external minicluster with 1 tablet having 5 replicas and 8 TS.
// 2. Shut down leader and 2 followers.
// 3. Trigger unsafe config change on a surviving follower with those
// 2 surviving followers in the new config.
// 4. Wait until the new config is populated on new_leader and master.
// 5. Verify that new config does not contain old leader and old followers.
TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
// Retire the dead servers early with these settings.
FLAGS_num_tablet_servers = 8;
FLAGS_num_replicas = 5;
NO_FATALS(BuildAndStart());
vector<TServerDetails*> tservers;
vector<ExternalTabletServer*> external_tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
for (TServerDetails* ts : tservers) {
external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid()));
}
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
ASSERT_EQ(followers.size(), 4);
cluster_->tablet_server_by_uuid(followers[2]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[3]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid();
const string& follower1_addr = Substitute("$0:$1",
followers[1]->registration.rpc_addresses(0).host(),
followers[1]->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { followers[0]->uuid(),
followers[1]->uuid(), };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
// Find a remaining node which will be picked for re-replication.
vector<TServerDetails*> all_tservers;
AppendValuesFromMap(tablet_servers_, &all_tservers);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 5 voters back on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid());
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[2]->uuid());
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[3]->uuid());
}
}
// Test unsafe config change when there is a pending config on a surviving leader.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both the followers.
// 3. Trigger a regular config change on the leader which remains pending on leader.
// 4. Trigger unsafe config change on the surviving leader.
// 5. Wait until the new config is populated on leader and master.
// 6. Verify that new config does not contain old followers and a standby node
// has populated the new config.
TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
ASSERT_EQ(followers.size(), 2);
// Shut down servers follower1 and follower2,
// so that leader can't replicate future config change ops.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Now try to replicate a ChangeConfig operation. This should get stuck and time out
// because the server can't replicate any operations.
Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
MonoDelta::FromSeconds(2), -1);
ASSERT_TRUE(s.IsTimedOut());
LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
<< "command when change config op is pending on the leader.";
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
// Find a remaining node which will be picked for re-replication.
vector<TServerDetails*> all_tservers;
AppendValuesFromMap(tablet_servers_, &all_tservers);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
}
}
// Test unsafe config change when there is a pending config on a surviving follower.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both the followers.
// 3. Trigger a regular config change on the leader which remains pending on leader.
// 4. Trigger a leader_step_down command such that leader is forced to become follower.
// 5. Trigger unsafe config change on the follower.
// 6. Wait until the new config is populated on leader and master.
// 7. Verify that new config does not contain old followers and a standby node
// has populated the new config.
TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
// Shut down servers follower1 and follower2,
// so that leader can't replicate future config change ops.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Restart master to cleanup cache of dead servers from its
// list of candidate servers to place the new replicas.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
// Now try to replicate a ChangeConfig operation. This should get stuck and time out
// because the server can't replicate any operations.
Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
MonoDelta::FromSeconds(2), -1);
ASSERT_TRUE(s.IsTimedOut());
// Force leader to step down, best effort command since the leadership
// could change anytime during cluster lifetime.
string stderr;
s = RunKuduTool({
"tablet",
"leader_step_down",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, nullptr, &stderr);
bool not_currently_leader = stderr.find(
Status::IllegalState("").CodeAsString()) != string::npos;
ASSERT_TRUE(s.ok() || not_currently_leader) << "stderr: " << stderr;
LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
<< "command when change config op is pending on the leader.";
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
// Find a remaining node which will be picked for re-replication.
vector<TServerDetails*> all_tservers;
AppendValuesFromMap(tablet_servers_, &all_tservers);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 3 voters on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
}
}
// Test unsafe config change when there are back to back pending configs on leader logs.
// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
// 2. Shut down both the followers.
// 3. Trigger a regular config change on the leader which remains pending on leader.
// 4. Set a fault crash flag to trigger upon next commit of config change.
// 5. Trigger unsafe config change on the surviving leader which should trigger
// the fault while the old config change is being committed.
// 6. Shutdown and restart the leader and verify that tablet bootstrapped on leader.
// 7. Verify that a new node has populated the new config with 3 voters.
TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 5;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
// Shut down servers follower1 and follower2,
// so that leader can't replicate future config change ops.
cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
// Now try to replicate a ChangeConfig operation. This should get stuck and time out
// because the server can't replicate any operations.
Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
MonoDelta::FromSeconds(2), -1);
ASSERT_TRUE(s.IsTimedOut());
LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
<< "command when change config op is pending on the leader.";
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
const vector<string> peer_uuid_list = { leader_ts->uuid() };
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
// Inject the crash via fault_crash_before_cmeta_flush flag.
// Tablet will find 2 pending configs back to back during bootstrap,
// one from ChangeConfig (RemoveServer) and another from UnsafeChangeConfig.
ASSERT_OK(cluster_->SetFlag(
cluster_->tablet_server_by_uuid(leader_ts->uuid()),
"fault_crash_before_cmeta_flush", "1.0"));
// Find a remaining node which will be picked for re-replication.
vector<TServerDetails*> all_tservers;
AppendValuesFromMap(tablet_servers_, &all_tservers);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_TRUE(new_node != nullptr);
// Restart master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers.
cluster_->master()->Shutdown();
ASSERT_OK(cluster_->master()->Restart());
ASSERT_OK(cluster_->tablet_server_by_uuid(
leader_ts->uuid())->WaitForInjectedCrash(kTimeout));
cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
ASSERT_OK(cluster_->tablet_server_by_uuid(
leader_ts->uuid())->Restart());
ASSERT_OK(WaitForNumTabletsOnTS(leader_ts, 1, kTimeout, nullptr));
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
}
}
// Test unsafe config change on a 5-replica tablet when the mulitple pending configs
// on the surviving node.
// 1. Instantiate external minicluster with 1 tablet having 5 replicas and 9 TS.
// 2. Shut down all the followers.
// 3. Trigger unsafe config changes on the surviving leader with those
// dead followers in the new config.
// 4. Wait until the new config is populated on the master and the new leader.
// 5. Verify that new config does not contain old followers.
TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_tablet_servers = 9;
FLAGS_num_replicas = 5;
// Retire the dead servers early with these settings.
NO_FATALS(BuildAndStart());
vector<TServerDetails*> tservers;
vector<ExternalTabletServer*> external_tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
for (TServerDetails* ts : tservers) {
external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid()));
}
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
auto iter = tablet_replicas_.equal_range(tablet_id_);
for (auto it = iter.first; it != iter.second; ++it) {
InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
}
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
vector<TServerDetails*> followers;
for (const auto& elem : active_tablet_servers) {
if (elem.first == leader_ts->uuid()) {
continue;
}
followers.push_back(elem.second);
cluster_->tablet_server_by_uuid(elem.first)->Shutdown();
}
ASSERT_EQ(4, followers.size());
// Shutdown master to cleanup cache of dead servers from its list of candidate
// servers to trigger placement of new replicas on healthy servers when we restart later.
cluster_->master()->Shutdown();
const string& leader_addr = Substitute("$0:$1",
leader_ts->registration.rpc_addresses(0).host(),
leader_ts->registration.rpc_addresses(0).port());
// This should keep the multiple pending configs on the node since we are
// adding all the dead followers to the new config, and then eventually we write
// just one surviving node to the config.
// New config write sequences are: {ABCDE}, {ABCD}, {ABC}, {AB}, {A},
// A being the leader node where config is written and rest of the nodes are
// dead followers.
for (int num_replicas = followers.size(); num_replicas >= 0; num_replicas--) {
vector<string> peer_uuid_list;
peer_uuid_list.push_back(leader_ts->uuid());
for (int i = 0; i < num_replicas; i++) {
peer_uuid_list.push_back(followers[i]->uuid());
}
ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
}
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, leader_ts, tablet_id_, kTimeout));
ASSERT_OK(cluster_->master()->Restart());
// Find a remaining node which will be picked for re-replication.
vector<TServerDetails*> all_tservers;
AppendValuesFromMap(tablet_servers_, &all_tservers);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : all_tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_TRUE(new_node != nullptr);
// Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
// so it is safer to wait until consensus metadata has 5 voters on new_node.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
5, tablet_id_, kTimeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
// Verify that old followers aren't part of new config.
for (const auto& old_follower : followers) {
ASSERT_NE(replica.ts_info().permanent_uuid(), old_follower->uuid());
}
}
}
Status GetTermFromConsensus(const vector<TServerDetails*>& tservers,
const string& tablet_id,
int64_t *current_term) {
ConsensusStatePB cstate;
for (auto& ts : tservers) {
RETURN_NOT_OK(
GetConsensusState(ts, tablet_id, MonoDelta::FromSeconds(10), EXCLUDE_HEALTH_REPORT,
&cstate));
if (!cstate.leader_uuid().empty() &&
IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config()) &&
cstate.has_current_term()) {
*current_term = cstate.current_term();
return Status::OK();
}
}
return Status::NotFound(Substitute(
"No leader replica found for tablet $0", tablet_id));
}
TEST_F(AdminCliTest, TestLeaderStepDown) {
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts,
tablet_id_,
MonoDelta::FromSeconds(10)));
}
int64_t current_term;
ASSERT_OK(GetTermFromConsensus(tservers, tablet_id_,
&current_term));
// The leader for the given tablet may change anytime, resulting in
// the command returning an error code. Hence checking for term advancement
// only if the leader_step_down succeeds. It is also unsafe to check
// the term advancement without honoring status of the command since
// there may not have been another election in the meanwhile.
string stderr;
Status s = RunKuduTool({
"tablet",
"leader_step_down",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, nullptr, &stderr);
bool not_currently_leader = stderr.find(
Status::IllegalState("").CodeAsString()) != string::npos;
ASSERT_TRUE(s.ok() || not_currently_leader) << "stderr: " << stderr;
if (s.ok()) {
int64_t new_term;
ASSERT_EVENTUALLY([&]() {
ASSERT_OK(GetTermFromConsensus(tservers, tablet_id_,
&new_term));
ASSERT_GT(new_term, current_term);
});
}
}
TEST_F(AdminCliTest, TestLeaderStepDownWhenNotPresent) {
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(
{ "--enable_leader_failure_detection=false" },
{ "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" }));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
ASSERT_OK(WaitUntilTabletRunning(ts,
tablet_id_,
MonoDelta::FromSeconds(10)));
}
int64_t current_term;
ASSERT_TRUE(GetTermFromConsensus(tservers, tablet_id_,
&current_term).IsNotFound());
string stdout;
ASSERT_OK(RunKuduTool({
"tablet",
"leader_step_down",
cluster_->master()->bound_rpc_addr().ToString(),
tablet_id_
}, &stdout));
ASSERT_STR_CONTAINS(stdout,
Substitute("No leader replica found for tablet $0",
tablet_id_));
}
TEST_F(AdminCliTest, TestDeleteTable) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
string master_address = cluster_->master()->bound_rpc_addr().ToString();
shared_ptr<KuduClient> client;
ASSERT_OK(KuduClientBuilder()
.add_master_server_addr(master_address)
.Build(&client));
ASSERT_TOOL_OK(
"table",
"delete",
master_address,
kTableId
);
vector<string> tables;
ASSERT_OK(client->ListTables(&tables));
ASSERT_TRUE(tables.empty());
}
TEST_F(AdminCliTest, TestListTables) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;
NO_FATALS(BuildAndStart());
string stdout;
ASSERT_OK(RunKuduTool({
"table",
"list",
cluster_->master()->bound_rpc_addr().ToString()
}, &stdout));
vector<string> stdout_lines = Split(stdout, ",", strings::SkipEmpty());
ASSERT_EQ(1, stdout_lines.size());
ASSERT_EQ(Substitute("$0\n", kTableId), stdout_lines[0]);
}
TEST_F(AdminCliTest, TestListTablesDetail) {
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart());
// Add another table to test multiple tables output.
const string kAnotherTableId = "TestAnotherTable";
KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kAnotherTableId)
.schema(&client_schema)
.set_range_partition_columns({ "key" })
.num_replicas(FLAGS_num_replicas)
.Create());
// Grab list of tablet_ids from any tserver.
vector<TServerDetails*> tservers;
vector<string> tablet_ids;
AppendValuesFromMap(tablet_servers_, &tservers);
ListRunningTabletIds(tservers.front(),
MonoDelta::FromSeconds(30), &tablet_ids);
string stdout;
ASSERT_OK(RunKuduTool({
"table",
"list",
"--list_tablets",
cluster_->master()->bound_rpc_addr().ToString()
}, &stdout));
vector<string> stdout_lines = Split(stdout, "\n", strings::SkipEmpty());
// Verify multiple tables along with their tablets and replica-uuids.
ASSERT_EQ(10, stdout_lines.size());
ASSERT_STR_CONTAINS(stdout, kTableId);
ASSERT_STR_CONTAINS(stdout, kAnotherTableId);
ASSERT_STR_CONTAINS(stdout, tablet_ids.front());
ASSERT_STR_CONTAINS(stdout, tablet_ids.back());
for (auto& ts : tservers) {
ASSERT_STR_CONTAINS(stdout, ts->uuid());
ASSERT_STR_CONTAINS(stdout, ts->uuid());
}
}
TEST_F(AdminCliTest, RebalancerReportOnly) {
static const char kReferenceOutput[] =
R"***(Per-server replica distribution summary:
Statistic | Value
-----------------------+----------
Minimum Replica Count | 0
Maximum Replica Count | 1
Average Replica Count | 0.600000
Per-table replica distribution summary:
Replica Skew | Value
--------------+----------
Minimum | 1
Maximum | 1
Average | 1.000000)***";
FLAGS_num_tablet_servers = 5;
NO_FATALS(BuildAndStart());
string out;
string err;
Status s = RunKuduTool({
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
"--report_only",
}, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
// The rebalancer should report on tablet replica distribution. The output
// should match the reference report: the distribution of the replicas
// is 100% repeatable given the number of tables created by the test,
// the replication factor and the number of tablet servers.
ASSERT_STR_CONTAINS(out, kReferenceOutput);
// The actual rebalancing should not run.
ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:")
<< ToolRunInfo(s, out, err);
}
// Make sure the rebalancer doesn't start if a tablet server is down.
class RebalanceStartCriteriaTest :
public AdminCliTest,
public ::testing::WithParamInterface<Kudu1097> {
};
INSTANTIATE_TEST_CASE_P(, RebalanceStartCriteriaTest,
::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
const vector<string> kMasterFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
};
const vector<string> kTserverFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
};
FLAGS_num_tablet_servers = 5;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
// Shutdown one of the tablet servers.
HostPort ts_host_port;
{
auto* ts = cluster_->tablet_server(0);
ASSERT_NE(nullptr, ts);
ts_host_port = ts->bound_rpc_hostport();
ts->Shutdown();
}
string out;
string err;
Status s = RunKuduTool({
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString()
}, &out, &err);
ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
const auto err_msg_pattern = Substitute(
"Illegal state: tablet server .* \\($0\\): "
"unacceptable health status UNAVAILABLE",
ts_host_port.ToString());
ASSERT_STR_MATCHES(err, err_msg_pattern);
}
// Create tables with unbalanced replica distribution: useful in
// rebalancer-related tests.
static Status CreateUnbalancedTables(
cluster::ExternalMiniCluster* cluster,
client::KuduClient* client,
const Schema& table_schema,
const string& table_name_pattern,
int num_tables,
int rep_factor,
int tserver_idx_from,
int tserver_num,
int tserver_unresponsive_ms) {
// Keep running only some tablet servers and shut down the rest.
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
cluster->tablet_server(i)->Shutdown();
}
// Wait for the catalog manager to understand that not all tablet servers
// are available.
SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4));
// Create tables with their tablet replicas landing only on the tablet servers
// which are up and running.
KuduSchema client_schema(client::KuduSchemaFromSchema(table_schema));
for (auto i = 0; i < num_tables; ++i) {
const string table_name = Substitute(table_name_pattern, i);
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
RETURN_NOT_OK(table_creator->table_name(table_name)
.schema(&client_schema)
.add_hash_partitions({ "key" }, 3)
.num_replicas(rep_factor)
.Create());
RETURN_NOT_OK(RunKuduTool({
"perf",
"loadgen",
cluster->master()->bound_rpc_addr().ToString(),
Substitute("--table_name=$0", table_name),
Substitute("--table_num_replicas=$0", rep_factor),
"--string_fixed=unbalanced_tables_test",
}));
}
for (auto i = tserver_idx_from; i < tserver_num; ++i) {
RETURN_NOT_OK(cluster->tablet_server(i)->Restart());
}
return Status::OK();
}
// A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica
// management schemes. During replica movement, a light workload is run against
// every table being rebalanced. This test covers different replication factors.
class RebalanceParamTest :
public AdminCliTest,
public ::testing::WithParamInterface<tuple<int, Kudu1097>> {
};
INSTANTIATE_TEST_CASE_P(, RebalanceParamTest,
::testing::Combine(::testing::Values(1, 2, 3, 5),
::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
TEST_P(RebalanceParamTest, Rebalance) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
const auto& param = GetParam();
const auto kRepFactor = std::get<0>(param);
const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
constexpr auto kNumTservers = 7;
constexpr auto kNumTables = 5;
const string table_name_pattern = "rebalance_test_table_$0";
constexpr auto kTserverUnresponsiveMs = 3000;
const auto timeout = MonoDelta::FromSeconds(30);
const vector<string> kMasterFlags = {
"--allow_unsafe_replication_factor",
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
};
const vector<string> kTserverFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
};
FLAGS_num_tablet_servers = kNumTservers;
FLAGS_num_replicas = kRepFactor;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
ASSERT_OK(CreateUnbalancedTables(
cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables,
kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs));
// Workloads aren't run for 3-2-3 replica movement with RF = 1 because
// the tablet is unavailable during the move until the target voter replica
// is up and running. That might take some time, and to avoid flakiness or
// setting longer timeouts, RF=1 replicas are moved with no concurrent
// workload running.
//
// TODO(aserbin): clarify why even with 3-4-3 it's a bit flaky now.
vector<unique_ptr<TestWorkload>> workloads;
//if (kRepFactor > 1 || is_343_scheme) {
if (kRepFactor > 1) {
for (auto i = 0; i < kNumTables; ++i) {
const string table_name = Substitute(table_name_pattern, i);
// The workload is light (1 thread, 1 op batches) so that new replicas
// bootstrap and converge quickly.
unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
workload->set_table_name(table_name);
workload->set_num_replicas(kRepFactor);
workload->set_num_write_threads(1);
workload->set_write_batch_size(1);
workload->set_write_timeout_millis(timeout.ToMilliseconds());
workload->set_already_present_allowed(true);
workload->Setup();
workload->Start();
workloads.emplace_back(std::move(workload));
}
}
const vector<string> tool_args = {
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
"--move_single_replicas=enabled",
};
{
string out;
string err;
const Status s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< "stderr: " << err;
}
// Next run should report the cluster as balanced and no replica movement
// should be attempted.
{
string out;
string err;
const Status s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out,
"rebalancing is complete: cluster is balanced (moved 0 replicas)")
<< "stderr: " << err;
}
for (auto& workload : workloads) {
workload->StopAndJoin();
}
NO_FATALS(cluster_->AssertNoCrashes());
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
// Now add a new tablet server into the cluster and make sure the rebalancer
// will re-distribute replicas.
ASSERT_OK(cluster_->AddTabletServer());
{
string out;
string err;
const Status s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< "stderr: " << err;
// The cluster was un-balanced, so many replicas should have been moved.
ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)");
}
NO_FATALS(cluster_->AssertNoCrashes());
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
}
// Common base for the rebalancer-related test below.
class RebalancingTest :
public tserver::TabletServerIntegrationTestBase,
public ::testing::WithParamInterface<Kudu1097> {
public:
RebalancingTest(int num_tables = 10,
int rep_factor = 3,
int num_tservers = 8,
int tserver_unresponsive_ms = 3000,
const string& table_name_pattern = "rebalance_test_table_$0")
: TabletServerIntegrationTestBase(),
is_343_scheme_(GetParam() == Kudu1097::Enable),
num_tables_(num_tables),
rep_factor_(rep_factor),
num_tservers_(num_tservers),
tserver_unresponsive_ms_(tserver_unresponsive_ms),
table_name_pattern_(table_name_pattern) {
master_flags_ = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
Substitute("--tserver_unresponsive_timeout_ms=$0", tserver_unresponsive_ms_),
};
tserver_flags_ = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_),
};
}
protected:
static const char* const kExitOnSignalStr;
void Prepare(const vector<string>& extra_tserver_flags = {},
const vector<string>& extra_master_flags = {}) {
copy(extra_tserver_flags.begin(), extra_tserver_flags.end(),
back_inserter(tserver_flags_));
copy(extra_master_flags.begin(), extra_master_flags.end(),
back_inserter(master_flags_));
FLAGS_num_tablet_servers = num_tservers_;
FLAGS_num_replicas = rep_factor_;
NO_FATALS(BuildAndStart(tserver_flags_, master_flags_));
ASSERT_OK(CreateUnbalancedTables(
cluster_.get(), client_.get(), schema_, table_name_pattern_,
num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_,
tserver_unresponsive_ms_));
}
// When the rebalancer starts moving replicas, ksck detects corruption
// (that's why RuntimeError), seeing affected tables as non-healthy
// with data state of corresponding tablets as TABLET_DATA_COPYING. If using
// this method, it's a good idea to inject some latency into tablet copying
// to be able to spot the TABLET_DATA_COPYING state, see the
// '--tablet_copy_download_file_inject_latency_ms' flag for tservers.
bool IsRebalancingInProgress() {
string out;
const auto s = RunKuduTool({
"cluster",
"ksck",
cluster_->master()->bound_rpc_addr().ToString(),
}, &out);
if (s.IsRuntimeError() &&
out.find("Data state: TABLET_DATA_COPYING") != string::npos) {
return true;
}
return false;
}
const bool is_343_scheme_;
const int num_tables_;
const int rep_factor_;
const int num_tservers_;
const int tserver_unresponsive_ms_;
const string table_name_pattern_;
vector<string> tserver_flags_;
vector<string> master_flags_;
};
const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal";
// Make sure the rebalancer is able to do its job if running concurrently
// with DDL activity on the cluster.
class DDLDuringRebalancingTest : public RebalancingTest {
public:
DDLDuringRebalancingTest()
: RebalancingTest(20 /* num_tables */) {
}
};
INSTANTIATE_TEST_CASE_P(, DDLDuringRebalancingTest,
::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
NO_FATALS(Prepare());
// The latch that controls the lifecycle of the concurrent DDL activity.
CountDownLatch run_latch(1);
thread creator([&]() {
KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
for (auto idx = 0; ; ++idx) {
if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
break;
}
const string table_name = Substitute("rebalancer_extra_table_$0", idx++);
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
CHECK_OK(table_creator->table_name(table_name)
.schema(&client_schema)
.add_hash_partitions({ "key" }, 3)
.num_replicas(rep_factor_)
.Create());
}
});
auto creator_cleanup = MakeScopedCleanup([&]() {
run_latch.CountDown();
creator.join();
});
thread deleter([&]() {
for (auto idx = 0; idx < num_tables_; ++idx) {
if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) {
break;
}
CHECK_OK(client_->DeleteTable(Substitute(table_name_pattern_, idx++)));
}
});
auto deleter_cleanup = MakeScopedCleanup([&]() {
run_latch.CountDown();
deleter.join();
});
thread alterer([&]() {
const string kTableName = "rebalancer_dynamic_table";
const string kNewTableName = "rebalancer_dynamic_table_new_name";
while (true) {
// Create table.
{
KuduSchema schema;
KuduSchemaBuilder builder;
builder.AddColumn("key")->Type(KuduColumnSchema::INT64)->
NotNull()->
PrimaryKey();
builder.AddColumn("a")->Type(KuduColumnSchema::INT64);
CHECK_OK(builder.Build(&schema));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
CHECK_OK(table_creator->table_name(kTableName)
.schema(&schema)
.set_range_partition_columns({})
.num_replicas(rep_factor_)
.Create());
}
if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
break;
}
// Drop a column.
{
unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
alt->DropColumn("a");
CHECK_OK(alt->Alter());
}
if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
break;
}
// Add back the column with different type.
{
unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
alt->AddColumn("a")->Type(KuduColumnSchema::STRING);
CHECK_OK(alt->Alter());
}
if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
break;
}
// Rename the table.
{
unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
alt->RenameTo(kNewTableName);
CHECK_OK(alt->Alter());
}
if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
break;
}
// Drop the renamed table.
CHECK_OK(client_->DeleteTable(kNewTableName));
if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) {
break;
}
}
});
auto alterer_cleanup = MakeScopedCleanup([&]() {
run_latch.CountDown();
alterer.join();
});
thread timer([&]() {
SleepFor(MonoDelta::FromSeconds(30));
run_latch.CountDown();
});
auto timer_cleanup = MakeScopedCleanup([&]() {
timer.join();
});
const vector<string> tool_args = {
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
};
// Run the rebalancer concurrently with the DDL operations. The second run
// of the rebalancer (the second run starts after joining the timer thread)
// is necessary to balance the cluster after the DDL activity stops: that's
// the easiest way to make sure the rebalancer will take into account
// all DDL changes that happened.
//
// The signal to terminate the DDL activity (done via run_latch.CountDown())
// is sent from a separate timer thread instead of doing SleepFor() after
// the first run of the rebalancer followed by run_latch.CountDown().
// That's to avoid dependency on the rebalancer behavior if it spots on-going
// DDL activity and continues running over and over again.
for (auto i = 0; i < 2; ++i) {
if (i == 1) {
timer.join();
timer_cleanup.cancel();
// Wait for all the DDL activity to complete.
alterer.join();
alterer_cleanup.cancel();
deleter.join();
deleter_cleanup.cancel();
creator.join();
creator_cleanup.cancel();
}
string out;
string err;
const auto s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< "stderr: " << err;
}
// Next (3rd) run should report the cluster as balanced and
// no replica movement should be attempted.
{
string out;
string err;
const auto s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out,
"rebalancing is complete: cluster is balanced (moved 0 replicas)")
<< "stderr: " << err;
}
NO_FATALS(cluster_->AssertNoCrashes());
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
}
// Make sure it's safe to run multiple rebalancers concurrently. The rebalancers
// might report errors, but they should not get stuck and the cluster should
// remain in good shape (i.e. no crashes, no data inconsistencies). Re-running a
// single rebalancer session again should bring the cluster to a balanced state.
class ConcurrentRebalancersTest : public RebalancingTest {
public:
ConcurrentRebalancersTest()
: RebalancingTest(10 /* num_tables */) {
}
};
INSTANTIATE_TEST_CASE_P(, ConcurrentRebalancersTest,
::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
NO_FATALS(Prepare());
const vector<string> tool_args = {
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
};
const auto runner_func = [&]() {
string out;
string err;
const auto s = RunKuduTool(tool_args, &out, &err);
if (!s.ok()) {
// One might expect a bad status returned: e.g., due to some race so
// the rebalancer didn't able to make progress for more than
// --max_staleness_interval_sec, etc.
LOG(INFO) << "rebalancer run info: " << ToolRunInfo(s, out, err);
}
// Should not exit on a signal: not expecting SIGSEGV, SIGABRT, etc.
return s.ToString().find(kExitOnSignalStr) == string::npos;
};
CountDownLatch start_synchronizer(1);
vector<thread> concurrent_runners;
for (auto i = 0; i < 5; ++i) {
concurrent_runners.emplace_back([&]() {
start_synchronizer.Wait();
CHECK(runner_func());
});
}
// Run rebalancers concurrently and wait for their completion.
start_synchronizer.CountDown();
for (auto& runner : concurrent_runners) {
runner.join();
}
NO_FATALS(cluster_->AssertNoCrashes());
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
{
string out;
string err;
const auto s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< "stderr: " << err;
}
// Next run should report the cluster as balanced and no replica movement
// should be attempted: at least one run of the rebalancer prior to this
// should succeed, so next run is about running the tool against already
// balanced cluster.
{
string out;
string err;
const auto s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out,
"rebalancing is complete: cluster is balanced (moved 0 replicas)")
<< "stderr: " << err;
}
NO_FATALS(cluster_->AssertNoCrashes());
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
}
// The rebalancer should stop and exit upon detecting a tablet server that
// went down. That's a simple and effective way of preventing concurrent replica
// movement by the rebalancer and the automatic re-replication (the catalog
// manager tries to move replicas from the unreachable tablet server).
class TserverGoesDownDuringRebalancingTest : public RebalancingTest {
public:
TserverGoesDownDuringRebalancingTest() :
RebalancingTest(5 /* num_tables */) {
}
};
INSTANTIATE_TEST_CASE_P(, TserverGoesDownDuringRebalancingTest,
::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
const vector<string> kTserverExtraFlags = {
// Slow down tablet copy to make rebalancing step running longer
// and become observable via tablet data states output by ksck.
"--tablet_copy_download_file_inject_latency_ms=1500",
"--follower_unavailable_considered_failed_sec=30",
};
NO_FATALS(Prepare(kTserverExtraFlags));
// Pre-condition: 'kudu cluster ksck' should be happy with the cluster state
// shortly after initial setup.
ASSERT_EVENTUALLY([&]() {
ASSERT_TOOL_OK(
"cluster",
"ksck",
cluster_->master()->bound_rpc_addr().ToString()
)
});
Random r(SeedRandom());
const uint32_t shutdown_tserver_idx = r.Next() % num_tservers_;
atomic<bool> run(true);
// The thread that shuts down the selected tablet server.
thread stopper([&]() {
while (run && !IsRebalancingInProgress()) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
// All right, it's time to stop the selected tablet server.
cluster_->tablet_server(shutdown_tserver_idx)->Shutdown();
});
auto stopper_cleanup = MakeScopedCleanup([&]() {
run = false;
stopper.join();
});
{
string out;
string err;
const auto s = RunKuduTool({
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
// Limiting the number of replicas to move. This is to make the rebalancer
// run longer, making sure the rebalancing is in progress when the tablet
// server goes down.
"--max_moves_per_server=1",
}, &out, &err);
ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err);
// The rebalancer tool should not crash.
ASSERT_STR_NOT_CONTAINS(s.ToString(), kExitOnSignalStr);
ASSERT_STR_MATCHES(
err, "Illegal state: tablet server .* \\(.*\\): "
"unacceptable health status UNAVAILABLE");
}
run = false;
stopper.join();
stopper_cleanup.cancel();
ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart());
NO_FATALS(cluster_->AssertNoCrashes());
}
// The rebalancer should continue working and complete rebalancing successfully
// if a new tablet server is added while the cluster is being rebalanced.
class TserverAddedDuringRebalancingTest : public RebalancingTest {
public:
TserverAddedDuringRebalancingTest()
: RebalancingTest(10 /* num_tables */) {
}
};
INSTANTIATE_TEST_CASE_P(, TserverAddedDuringRebalancingTest,
::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
const vector<string> kTserverExtraFlags = {
// Slow down tablet copy to make rebalancing step running longer
// and become observable via tablet data states output by ksck.
"--tablet_copy_download_file_inject_latency_ms=1500",
"--follower_unavailable_considered_failed_sec=30",
};
NO_FATALS(Prepare(kTserverExtraFlags));
const vector<string> tool_args = {
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
};
atomic<bool> run(true);
thread runner([&]() {
while (run) {
string out;
string err;
const auto s = RunKuduTool(tool_args, &out, &err);
CHECK(s.ok()) << ToolRunInfo(s, out, err);
}
});
auto runner_cleanup = MakeScopedCleanup([&]() {
run = false;
runner.join();
});
while (!IsRebalancingInProgress()) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
// It's time to sneak in and add new tablet server.
ASSERT_OK(cluster_->AddTabletServer());
run = false;
runner.join();
runner_cleanup.cancel();
// The rebalancer should not fail, and eventually, after a new tablet server
// is added, the cluster should become balanced.
ASSERT_EVENTUALLY([&]() {
string out;
string err;
const auto s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out,
"rebalancing is complete: cluster is balanced (moved 0 replicas)")
<< "stderr: " << err;
});
NO_FATALS(cluster_->AssertNoCrashes());
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
}
// Run rebalancer in 'election storms' environment and make sure the rebalancer
// does not exit prematurely or exhibit any other unexpected behavior.
class RebalancingDuringElectionStormTest : public RebalancingTest {
};
INSTANTIATE_TEST_CASE_P(, RebalancingDuringElectionStormTest,
::testing::Values(Kudu1097::Disable, Kudu1097::Enable));
TEST_P(RebalancingDuringElectionStormTest, RoundRobin) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
NO_FATALS(Prepare());
atomic<bool> elector_run(true);
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
// The timeout is a time-to-run for the stormy elector thread as well.
// Making longer timeout for workload in case of TSAN/ASAN is not needed:
// having everything generated written is not required.
const auto timeout = MonoDelta::FromSeconds(5);
#else
const auto timeout = MonoDelta::FromSeconds(10);
#endif
const auto start_time = MonoTime::Now();
thread elector([&]() {
// Mininum viable divider for modulo ('%') to allow the result to grow by
// the rules below.
auto max_sleep_ms = 2.0;
while (elector_run && MonoTime::Now() < start_time + timeout) {
for (const auto& e : tablet_servers_) {
const auto& ts_uuid = e.first;
const auto* ts = e.second;
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
auto const s = itest::ListTablets(ts, timeout, &tablets);
if (!s.ok()) {
LOG(WARNING) << ts_uuid << ": failed to get tablet list :"
<< s.ToString();
continue;
}
consensus::ConsensusServiceProxy proxy(
cluster_->messenger(),
cluster_->tablet_server_by_uuid(ts_uuid)->bound_rpc_addr(),
"tserver " + ts_uuid);
for (const auto& tablet : tablets) {
const auto& tablet_id = tablet.tablet_status().tablet_id();
consensus::RunLeaderElectionRequestPB req;
req.set_tablet_id(tablet_id);
req.set_dest_uuid(ts_uuid);
rpc::RpcController rpc;
rpc.set_timeout(timeout);
consensus::RunLeaderElectionResponsePB resp;
WARN_NOT_OK(proxy.RunLeaderElection(req, &resp, &rpc),
Substitute("failed to start election for tablet $0",
tablet_id));
}
if (!elector_run || start_time + timeout <= MonoTime::Now()) {
break;
}
auto sleep_ms = rand() % static_cast<int>(max_sleep_ms);
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
max_sleep_ms = std::min(max_sleep_ms * 1.1, 2000.0);
}
}
});
auto elector_cleanup = MakeScopedCleanup([&]() {
elector_run = false;
elector.join();
});
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
vector<unique_ptr<TestWorkload>> workloads;
for (auto i = 0; i < num_tables_; ++i) {
const string table_name = Substitute(table_name_pattern_, i);
// The workload is light (1 thread, 1 op batches) and lenient to failures.
unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
workload->set_table_name(table_name);
workload->set_num_replicas(rep_factor_);
workload->set_num_write_threads(1);
workload->set_write_batch_size(1);
workload->set_write_timeout_millis(timeout.ToMilliseconds());
workload->set_already_present_allowed(true);
workload->set_remote_error_allowed(true);
workload->set_timeout_allowed(true);
workload->Setup();
workload->Start();
workloads.emplace_back(std::move(workload));
}
#endif
const vector<string> tool_args = {
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
};
while (MonoTime::Now() < start_time + timeout) {
// Rebalancer should not report any errors even if it's an election storm
// unless a tablet server is reported as unavailable by ksck: the latter
// usually happens because GetConsensusState requests are dropped due to
// backpressure.
string out;
string err;
const Status s = RunKuduTool(tool_args, &out, &err);
if (s.ok()) {
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< ToolRunInfo(s, out, err);
} else {
ASSERT_STR_CONTAINS(err, "unacceptable health status UNAVAILABLE")
<< ToolRunInfo(s, out, err);
}
}
elector_run = false;
elector.join();
elector_cleanup.cancel();
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
for (auto& workload : workloads) {
workload->StopAndJoin();
}
#endif
// There might be some re-replication started as a result of election storm,
// etc. Eventually, the system should heal itself and 'kudu cluster ksck'
// should report no issues.
ASSERT_EVENTUALLY([&]() {
ASSERT_TOOL_OK(
"cluster",
"ksck",
cluster_->master()->bound_rpc_addr().ToString()
)
});
// The rebalancer should successfully rebalance the cluster after ksck
// reported 'all is well'.
{
string out;
string err;
const Status s = RunKuduTool(tool_args, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< ToolRunInfo(s, out, err);
}
NO_FATALS(cluster_->AssertNoCrashes());
}
// A test to verify how the rebalancer handles replicas of single-replica
// tablets in case of various values of the '--move_single_replicas' flag
// and replica management schemes.
class RebalancerAndSingleReplicaTablets :
public AdminCliTest,
public ::testing::WithParamInterface<tuple<string, Kudu1097>> {
};
INSTANTIATE_TEST_CASE_P(, RebalancerAndSingleReplicaTablets,
::testing::Combine(::testing::Values("auto", "enabled", "disabled"),
::testing::Values(Kudu1097::Disable, Kudu1097::Enable)));
TEST_P(RebalancerAndSingleReplicaTablets, SingleReplicasStayOrMove) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
constexpr auto kRepFactor = 1;
constexpr auto kNumTservers = 2 * (kRepFactor + 1);
constexpr auto kNumTables = kNumTservers;
constexpr auto kTserverUnresponsiveMs = 3000;
const auto& param = GetParam();
const auto& move_single_replica = std::get<0>(param);
const auto is_343_scheme = (std::get<1>(param) == Kudu1097::Enable);
const string table_name_pattern = "rebalance_test_table_$0";
const vector<string> master_flags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
};
const vector<string> tserver_flags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
};
FLAGS_num_tablet_servers = kNumTservers;
FLAGS_num_replicas = kRepFactor;
NO_FATALS(BuildAndStart(tserver_flags, master_flags));
// Keep running only (kRepFactor + 1) tablet servers and shut down the rest.
for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
cluster_->tablet_server(i)->Shutdown();
}
// Wait for the catalog manager to understand that only (kRepFactor + 1)
// tablet servers are available.
SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4));
// Create few tables with their tablet replicas landing only on those
// (kRepFactor + 1) running tablet servers.
KuduSchema client_schema(client::KuduSchemaFromSchema(schema_));
for (auto i = 0; i < kNumTables; ++i) {
const string table_name = Substitute(table_name_pattern, i);
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(table_name)
.schema(&client_schema)
.add_hash_partitions({ "key" }, 3)
.num_replicas(kRepFactor)
.Create());
ASSERT_TOOL_OK(
"perf",
"loadgen",
cluster_->master()->bound_rpc_addr().ToString(),
Substitute("--table_name=$0", table_name),
// Don't need much data in there.
"--num_threads=1",
"--num_rows_per_thread=1",
);
}
for (auto i = kRepFactor + 1; i < kNumTservers; ++i) {
ASSERT_OK(cluster_->tablet_server(i)->Restart());
}
string out;
string err;
const Status s = RunKuduTool({
"cluster",
"rebalance",
cluster_->master()->bound_rpc_addr().ToString(),
Substitute("--move_single_replicas=$0", move_single_replica),
}, &out, &err);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err);
ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced")
<< "stderr: " << err;
if (move_single_replica == "enabled" ||
(move_single_replica == "auto" && is_343_scheme)) {
// Should move appropriate replicas of single-replica tablets.
ASSERT_STR_NOT_CONTAINS(out,
"rebalancing is complete: cluster is balanced (moved 0 replicas)")
<< "stderr: " << err;
ASSERT_STR_NOT_CONTAINS(err, "has single replica, skipping");
} else {
ASSERT_STR_CONTAINS(out,
"rebalancing is complete: cluster is balanced (moved 0 replicas)")
<< "stderr: " << err;
ASSERT_STR_MATCHES(err, "tablet .* of table '.*' (.*) has single replica, skipping");
}
NO_FATALS(cluster_->AssertNoCrashes());
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
}
} // namespace tools
} // namespace kudu