blob: 83b79000136842547be08cde99496e48276cd4f2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/integration-tests/mini_cluster_fs_inspector.h"
#include "kudu/integration-tests/raft_consensus-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/master/master.pb.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/tserver/tablet_server-test-base.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using boost::none;
using kudu::consensus::ADD_PEER;
using kudu::consensus::COMMITTED_OPID;
using kudu::consensus::ConsensusStatePB;
using kudu::consensus::GetRaftConfigMember;
using kudu::consensus::MODIFY_PEER;
using kudu::consensus::RaftPeerAttrsPB;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::REMOVE_PEER;
using kudu::consensus::EXCLUDE_HEALTH_REPORT;
using kudu::itest::BulkChangeConfig;
using kudu::itest::GetTableLocations;
using kudu::itest::TServerDetails;
using kudu::itest::GetConsensusState;
using kudu::tserver::RaftConsensusITestBase;
using kudu::master::VOTER_REPLICA;
using kudu::pb_util::SecureShortDebugString;
using std::string;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace kudu {
class RaftConfigChangeITest : public ExternalMiniClusterITestBase {
// Regression test for KUDU-2147. In this test we cause an initial leader to
// heartbeat to a master with a new configuration change immediately after it
// has lost its leadership, reporting a new configuration with no leader. The
// master should update its record of which replica is the leader after a new
// leader is elected in that term.
// Steps followed in this test:
// 1. Inject latency into TS heartbeats to the master and reduce Raft leader
// heartbeat intervals to followers to be able to carefully control the
// sequence of events in this test.
// 2. Evict a follower from the config.
// 3. Immediately start an election on the remaining follower in the config.
// We know that this follower should win the election because in order to
// commit a removal from 3 voters, the removal op must be replicated to both
// of the two remaining voters in the config.
// 4. The master will get the config change heartbeat from the initial leader,
// which will indicate a new term and no current leader.
// 5. The master will then get a heartbeat from the new leader to report that
// it's now the leader of the config.
// 6. Once the master knows who the new leader is, the master will instruct the
// new leader to add a new replica to its config to bring it back up to 3
// voters. That new replica will be the follower that we evicted in step 2.
// 7. If that succeeds then the leader will replicate the eviction, the
// leader's own no-op, and the adding-back-in of the evicted replica to that
// evicted replica's log.
// 8. Once that process completes, all 3 replicas will have identical logs,
// which is what we wait for at the end of the test.
TEST_F(RaftConfigChangeITest, TestKudu2147) {
if (!AllowSlowTests()) {
// This test injects seconds of latency so can take a while to converge.
LOG(WARNING) << "Skipping test in fast-test mode.";
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
// Slow down leader heartbeats so that in the explicit election below, the
// second leader does not immediately heartbeat to the initial leader. If
// that occurs, the initial leader will not report to the master that there
// is currently no leader.
// Create a new table.
TestWorkload workload(cluster_.get());
ASSERT_GE(workload.batches_completed(), 10);
// The table should have replicas on three tservers.
master::GetTableLocationsResponsePB table_locations;
ASSERT_OK(GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
kTimeout, VOTER_REPLICA, /*table_id=*/none, &table_locations));
ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
ASSERT_EQ(3, table_locations.tablet_locations(0).interned_replicas_size()); // 3 replicas.
string tablet_id = table_locations.tablet_locations(0).tablet_id();
// Wait for all 3 replicas to converge before we start the test.
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
// Retry as needed to counter normal leader election activity.
// Find initial leader.
TServerDetails* leader;
ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader));
ASSERT_OK(WaitForOpFromCurrentTerm(leader, tablet_id, COMMITTED_OPID, kTimeout));
vector<TServerDetails*> followers;
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
// Dynamically set the latency injection flag to induce TS -> master
// heartbeat delays. The leader delays its master heartbeats by 2 sec
// each time; followers delay their master heartbeats by even longer.
int ms_delay = cluster_->tablet_server(i)->uuid() == leader->uuid() ? 2000 : 5000;
Substitute("$0", ms_delay)));
// Keep track of the followers.
if (cluster_->tablet_server(i)->uuid() != leader->uuid()) {
ASSERT_EQ(2, followers.size());
// Now that heartbeat injection is enabled, evict one follower and trigger
// an election on another follower immediately thereafter.
ASSERT_OK(itest::RemoveServer(leader, tablet_id, followers[0], kTimeout));
// Immediately start an election on the remaining follower. This will cause
// the initial leader's term to rev and it will have to step down. When it
// sends a tablet report to the master with the new configuration excluding
// the removed tablet it will report an unknown leader in the new term.
ASSERT_OK(itest::StartElection(followers[1], tablet_id, kTimeout));
// Wait until the master re-adds the evicted replica and it is back up and
// running. If the master hit KUDU-2147, this would fail because the master
// would be unable to add the removed server back, and that replica would be
// missing the config change op that removed it from the config.
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
// Test automatic promotion of a non-voter replica in a 3-4-3 re-replication
// (KUDU-1097) paradigm.
TEST_F(RaftConfigChangeITest, TestNonVoterPromotion) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
// Enable 3-4-3 re-replication.
/*num_tablet_servers=*/ 4));
TestWorkload workload(cluster_.get());
// The table should initially have replicas on three tservers.
master::GetTableLocationsResponsePB table_locations;
ASSERT_OK(GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
kTimeout, VOTER_REPLICA, /*table_id=*/none, &table_locations));
ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
ASSERT_EQ(3, table_locations.tablet_locations(0).interned_replicas_size()); // 3 replicas.
string tablet_id = table_locations.tablet_locations(0).tablet_id();
// Find the TS that does not have a replica.
unordered_set<string> initial_replicas;
for (const auto& replica : table_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = table_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
TServerDetails* new_replica = nullptr;
for (const auto& entry : ts_map_) {
if (!ContainsKey(initial_replicas, entry.first)) {
new_replica = entry.second;
ASSERT_NE(nullptr, new_replica);
TServerDetails* leader_replica = nullptr;
ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_replica));
ASSERT_NE(new_replica, leader_replica);
// Add the 4th replica as a NON_VOTER with promote=true.
RaftPeerAttrsPB attrs;
ASSERT_OK(AddServer(leader_replica, tablet_id, new_replica,
RaftPeerPB::NON_VOTER, kTimeout, attrs));
// Wait for there to be 4 voters in the config.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(/*num_voters=*/ 4,
// Functional test for the BulkChangeConfig RPC API.
TEST_F(RaftConfigChangeITest, TestBulkChangeConfig) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
const int kNumTabletServers = 4;
const int kNumInitialReplicas = 3;
// Create a table.
TestWorkload workload(cluster_.get());
master::GetTableLocationsResponsePB table_locations;
ASSERT_OK(GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
kTimeout, VOTER_REPLICA, /*table_id=*/none, &table_locations));
ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
ASSERT_EQ(kNumInitialReplicas, table_locations.tablet_locations(0).interned_replicas_size());
string tablet_id = table_locations.tablet_locations(0).tablet_id();
unordered_set<int> replica_indexes;
for (const auto& replica : table_locations.tablet_locations(0).interned_replicas()) {
const auto& uuid = table_locations.ts_infos(replica.ts_info_idx()).permanent_uuid();
int idx = cluster_->tablet_server_index_by_uuid(uuid);
tablet_id, kTimeout));
ASSERT_EQ(kNumInitialReplicas, replica_indexes.size());
const int kLeaderIndex = *replica_indexes.begin();
int new_replica_index = -1;
for (int i = 0; i < kNumTabletServers; i++) {
if (!ContainsKey(replica_indexes, i)) {
new_replica_index = i;
ASSERT_NE(-1, new_replica_index);
string leader_uuid = cluster_->tablet_server(kLeaderIndex)->uuid();
auto* leader_replica = ts_map_[leader_uuid];
ASSERT_OK(itest::StartElection(leader_replica, tablet_id, kTimeout));
while (workload.rows_inserted() < 100) {
// We don't want the master interfering with the rest of the test.
struct BulkSpec {
consensus::ChangeConfigType change_type;
int tserver_index;
RaftPeerPB::MemberType member_type;
bool replace;
bool promote;
// Now comes the actual config change testing.
auto bulk_change = [&](const vector<BulkSpec>& changes,
boost::optional<int64_t> cas_config_index = boost::none) {
vector<consensus::BulkChangeConfigRequestPB::ConfigChangeItemPB> changes_pb;
for (const auto& chg : changes) {
const auto& ts_uuid = cluster_->tablet_server(chg.tserver_index)->uuid();
auto* replica = ts_map_[ts_uuid];
consensus::BulkChangeConfigRequestPB::ConfigChangeItemPB change_pb;
RaftPeerPB* peer = change_pb.mutable_peer();
*peer->mutable_last_known_addr() = replica->registration.rpc_addresses(0);
LOG(INFO) << "submitting config change with changes:";
for (const auto& change_pb : changes_pb) {
LOG(INFO) << SecureShortDebugString(change_pb);
return BulkChangeConfig(leader_replica, tablet_id, changes_pb,
kTimeout, cas_config_index);
// 1) Add a voter. Change config to: V, V, V, V.
ASSERT_OK(bulk_change({ { ADD_PEER, new_replica_index, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ false } }));
ConsensusStatePB cstate;
ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size());
ASSERT_EQ(kNumTabletServers, CountVoters(cstate.committed_config()));
// 2) Simultaneous voter modification and attribute modification.
// Change config to: V, V, N, V+p.
// Note: setting a VOTER's attribute promote=true is meaningless.
int replica1_idx = (kLeaderIndex + 1) % kNumTabletServers;
int replica2_idx = (kLeaderIndex + 2) % kNumTabletServers;
ASSERT_OK(bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::NON_VOTER,
/*replace=*/ false, /*promote=*/ false },
{ MODIFY_PEER, replica2_idx, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ true } }));
ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size());
ASSERT_EQ(kNumInitialReplicas, CountVoters(cstate.committed_config()));
RaftPeerPB* peer;
cluster_->tablet_server(replica2_idx)->uuid(), &peer));
ASSERT_EQ(RaftPeerPB::VOTER, peer->member_type());
ASSERT_TRUE(peer->attrs().promote()) << SecureShortDebugString(*peer);
// 3) Single-attribute modification. Change config to: V, V, N+r, V+p.
// Note: at the time of writing, if the master is disabled this
// configuration will not trigger any actions such as promotion or
// eviction.
ASSERT_OK(bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::NON_VOTER,
/*replace=*/ true, /*promote=*/ false } }));
ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size())
<< SecureShortDebugString(cstate);
ASSERT_EQ(kNumInitialReplicas, CountVoters(cstate.committed_config()))
<< SecureShortDebugString(cstate);
cluster_->tablet_server(replica1_idx)->uuid(), &peer));
ASSERT_EQ(RaftPeerPB::NON_VOTER, peer->member_type());
ASSERT_TRUE(peer->attrs().replace()) << SecureShortDebugString(*peer);
// 4) Deny changing config (illegally) from: { V, V, N, V } to: { V, V, V, N }
// because that would be both a promotion and a demotion in one step.
Status s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ false },
{ MODIFY_PEER, replica2_idx, RaftPeerPB::NON_VOTER,
/*replace=*/ false, /*promote=*/ false } });
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "not safe to modify the VOTER status of "
"more than one peer at a time");
// 5) The caller must not be allowed to make the leader a NON_VOTER.
s = bulk_change({ { MODIFY_PEER, kLeaderIndex, RaftPeerPB::NON_VOTER,
/*replace=*/ false, /*promote=*/ false } });
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "Cannot modify member type of peer .* because it is the leader");
// 6) The 'cas_config_index' flag must be respected, if set.
int64_t committed_config_opid_index = cstate.committed_config().opid_index();
s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::NON_VOTER,
/*replace=*/ false, /*promote=*/ true } }, committed_config_opid_index + 1);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "specified cas_config_opid_index of .* but "
"the committed config has opid_index of .*");
// 7) Evict down to 2 voters. We will evict a voter and a non-voter at once.
ASSERT_OK(bulk_change({ { REMOVE_PEER, replica1_idx, RaftPeerPB::UNKNOWN_MEMBER_TYPE,
/*replace=*/ false, /*promote=*/ false },
/*replace=*/ false, /*promote=*/ false } }));
ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
ASSERT_EQ(2, cstate.committed_config().peers_size());
ASSERT_EQ(2, CountVoters(cstate.committed_config()));
// 8) We should reject adding multiple voters at once.
s = bulk_change({ { ADD_PEER, replica1_idx, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ false },
{ ADD_PEER, replica2_idx, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ false } });
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "not safe to modify the VOTER status of "
"more than one peer at a time");
// 9) Add them back one at a time so we get to full strength (4 voters) again.
auto to_restore = { replica1_idx, replica2_idx };
for (auto r : to_restore) {
ASSERT_OK(bulk_change({ { ADD_PEER, r, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ false } }));
ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size());
ASSERT_EQ(kNumTabletServers, CountVoters(cstate.committed_config()));
// 10) We should reject removing multiple voters at once.
s = bulk_change({ { REMOVE_PEER, replica1_idx, RaftPeerPB::UNKNOWN_MEMBER_TYPE,
/*replace=*/ false, /*promote=*/ false },
/*replace=*/ false, /*promote=*/ false } });
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "not safe to modify the VOTER status of "
"more than one peer at a time");
// 11) Reject no-ops.
s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ false } });
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "must modify a field when calling MODIFY_PEER");
// 12) Reject empty bulk change config operations.
s = bulk_change({ });
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "requested configuration change does not "
"actually modify the config");
// 13) Reject multiple changes to the same peer in a single request.
s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
/*replace=*/ true, /*promote=*/ false },
{ MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
/*replace=*/ false, /*promote=*/ true } });
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "only one change allowed per peer");
// KUDU-2800
// Check re-replication during slow tablet replica bootstrap.
class SlowTabletBootstrapTest : public RaftConsensusITestBase {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
// Check that 'added_replica_uuid' was added to the consensus
// and 'removed_replica_uuid' was removed from consensus.
void ValidateConsensusStateChanged(const string& added_replica_uuid,
const string& removed_replica_uuid,
const MonoDelta& timeout) {
TServerDetails* leader = nullptr;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
consensus::ConsensusStatePB cstate;
ASSERT_OK(GetConsensusState(leader, tablet_id_, timeout,
ASSERT_TRUE(IsRaftConfigMember(added_replica_uuid, cstate.committed_config()));
ASSERT_FALSE(IsRaftConfigMember(removed_replica_uuid, cstate.committed_config()));
// Create and start cluster.
// Returns 'any_replica_sever' - UUID of first server with tablet replica,
// and 'no_replica_server' - UUID of server without tablet replica.
void SetUpCluster(string* any_replica_server,
string* no_replica_server) {
vector<string> ts_flags {
// The default value 5 minutes is very long.
// So we set timeout 3 seconds in order to quickly
// remove non-responding replica from consensus
FLAGS_num_tablet_servers = 4;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(ts_flags, {}));
ASSERT_EQ(4, tablet_servers_.size());
// Extra sanity checks.
vector<string> replica_servers = GetServersWithReplica(tablet_id_);
ASSERT_EQ(3, replica_servers.size());
vector<string> no_replica_servers = GetServersWithoutReplica(tablet_id_);
ASSERT_EQ(1, no_replica_servers.size());
*any_replica_server = replica_servers.front();
*no_replica_server = no_replica_servers.front();
// Slow tablet replica is not evicted while bootstrapping
// as long as there are no data modifications in the consensus.
TEST_F(SlowTabletBootstrapTest, TestSlowBootstrap) {
string any_replica_server, no_replica_server;
NO_FATALS(SetUpCluster(&any_replica_server, &no_replica_server));
// Shutdown any tablet server with tablet's replica.
auto ts = cluster_->tablet_server_by_uuid(any_replica_server);
ASSERT_NE(nullptr, ts);
ValidateConsensusStateChanged(any_replica_server, no_replica_server, kTimeout);
// If replica restarts after many data modifications,
// it falls behind and is removed from consensus.
TEST_F(SlowTabletBootstrapTest, TestFallBehind) {
string any_replica_server, no_replica_server;
NO_FATALS(SetUpCluster(&any_replica_server, &no_replica_server));
// Shutdown any tablet server with tablet's replica,
// add data, then restart and cause it to fall behind.
any_replica_server, nullptr, nullptr,
ValidateConsensusStateChanged(no_replica_server, any_replica_server, kTimeout);
// If there many data modifications during slow replica bootstrap,
// it falls behind and is removed from consensus.
TEST_F(SlowTabletBootstrapTest, TestFallBehindSlowBootstrap) {
string any_replica_server, no_replica_server;
NO_FATALS(SetUpCluster(&any_replica_server, &no_replica_server));
// Shutdown any tablet server with tablet replica.
auto ts = cluster_->tablet_server_by_uuid(any_replica_server);
ASSERT_NE(nullptr, ts);
// Inject delay into next tablet replica bootstrap.
// When replica finish bootstrapping, it will find that it was left behind
// and was removed from consensus.
TServerDetails* replica = FindOrDie(tablet_servers_, any_replica_server);
// Find a leader.
TServerDetails* leader = nullptr;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
ASSERT_NE(leader, nullptr);
ASSERT_NE(leader, replica);
int leader_index = cluster_->tablet_server_index_by_uuid(leader->uuid());
TestWorkload workload(cluster_.get());
workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB.
LOG(INFO) << "Waiting until we've written at least 4MB...";
while (workload.rows_inserted() < 8 * 4) {
LOG(INFO) << "Waiting for log GC on " << leader->uuid();
// Some WAL segments must exist, but wal segment 1 must not exist.
leader_index, tablet_id_, { "wal-" }, { "wal-000000001" }));
LOG(INFO) << "Log GC complete on " << leader->uuid();
ValidateConsensusStateChanged(no_replica_server, any_replica_server, kTimeout);
} // namespace kudu