blob: 485dbfc6369f45ef35ea218d80470e7709092eba [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/env.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_bool(allow_unsafe_replication_factor);
DECLARE_bool(enable_tablet_copy);
DECLARE_bool(raft_enable_tombstoned_voting);
using kudu::consensus::MakeOpId;
using kudu::consensus::LeaderStepDownResponsePB;
using kudu::consensus::OpId;
using kudu::consensus::RECEIVED_OPID;
using kudu::consensus::RaftConsensus;
using kudu::consensus::RaftPeerPB;
using kudu::itest::DeleteTablet;
using kudu::itest::RequestVote;
using kudu::itest::TServerDetails;
using kudu::itest::WaitForServersToAgree;
using kudu::tablet::TABLET_DATA_TOMBSTONED;
using kudu::tablet::TabletReplica;
using kudu::tablet::TabletStatePB;
using kudu::tserver::TSTabletManager;
using std::string;
using std::vector;
namespace kudu {
class TombstonedVotingIMCITest : public MiniClusterITestBase {
};
// Ensure that a tombstoned replica cannot vote after we call Shutdown() on it.
TEST_F(TombstonedVotingIMCITest, TestNoVoteAfterShutdown) {
// This test waits for several seconds, so only run it in slow mode.
SKIP_IF_SLOW_NOT_ALLOWED();
FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 50) {
SleepFor(MonoDelta::FromMilliseconds(50));
}
workload.StopAndJoin();
// Figure out the tablet id to mess with.
vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
ASSERT_EQ(1, tablet_ids.size());
const string& tablet_id = tablet_ids[0];
// Ensure all servers are up to date.
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
// Manually tombstone the replica on TS1, start an election on TS0, and wait
// until TS0 gets elected. If TS0 gets elected then TS1 was able to vote
// while tombstoned.
TSTabletManager* ts_tablet_manager = cluster_->mini_tablet_server(1)->server()->tablet_manager();
scoped_refptr<TabletReplica> ts1_replica;
ASSERT_OK(ts_tablet_manager->GetTabletReplica(tablet_id, &ts1_replica));
// Tombstone TS1's replica.
LOG(INFO) << "Tombstoning ts1...";
ASSERT_OK(ts_tablet_manager->DeleteTablet(tablet_id, TABLET_DATA_TOMBSTONED,
boost::none));
ASSERT_EQ(TabletStatePB::STOPPED, ts1_replica->state());
scoped_refptr<TabletReplica> ts0_replica;
ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
tablet_id, &ts0_replica));
LeaderStepDownResponsePB resp;
ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
ASSERT_OK(ts0_replica->consensus()->StartElection(
RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
// Wait until TS0 is leader.
ASSERT_EVENTUALLY([&] {
ASSERT_EQ(RaftPeerPB::LEADER, ts0_replica->consensus()->role());
});
// Now shut down TS1. This will ensure that TS0 cannot get re-elected.
LOG(INFO) << "Shutting down ts1...";
ts1_replica->Shutdown();
// Start another election and wait for some time to see if it can get elected.
ASSERT_OK(ts0_replica->consensus()->StepDown(&resp));
ASSERT_OK(ts0_replica->consensus()->StartElection(
RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
// Wait for some time to ensure TS0 cannot get elected.
MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
while (MonoTime::Now() < deadline) {
ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
SleepFor(MonoDelta::FromMilliseconds(50));
}
}
// Test that a tombstoned replica will vote correctly.
// This is implemented by directly exercising the RPC API with different vote request parameters.
TEST_F(TombstonedVotingIMCITest, TestVotingLogic) {
// This test waits for several seconds, so only run it in slow mode.
SKIP_IF_SLOW_NOT_ALLOWED();
FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 50) {
SleepFor(MonoDelta::FromMilliseconds(50));
}
workload.StopAndJoin();
// Figure out the tablet id to mess with.
vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
ASSERT_EQ(1, tablet_ids.size());
const string& tablet_id = tablet_ids[0];
// Ensure all servers are up to date.
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
// Shut down TS0 so it doesn't interfere with our testing.
cluster_->mini_tablet_server(0)->Shutdown();
// Figure out the last logged opid of TS1.
OpId last_logged_opid;
ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id,
ts_map_[cluster_->mini_tablet_server(1)->uuid()],
RECEIVED_OPID,
kTimeout,
&last_logged_opid));
// Tombstone TS1 (actually, the tablet replica hosted on TS1).
ASSERT_OK(DeleteTablet(ts_map_[cluster_->mini_tablet_server(1)->uuid()],
tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
// Loop this series of tests twice: the first time without restarting the TS,
// the 2nd time after a restart.
for (int i = 0; i < 2; i++) {
if (i == 1) {
// Restart tablet server #1 on the 2nd loop.
LOG(INFO) << "Restarting TS1...";
cluster_->mini_tablet_server(1)->Shutdown();
ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
ASSERT_OK(cluster_->mini_tablet_server(1)->WaitStarted());
}
scoped_refptr<TabletReplica> replica;
ASSERT_OK(cluster_->mini_tablet_server(1)->server()->tablet_manager()->GetTabletReplica(
tablet_id, &replica));
ASSERT_EQ(i == 0 ? tablet::STOPPED : tablet::INITIALIZED, replica->state());
int64_t current_term = replica->consensus()->CurrentTerm();
current_term++;
// Ask TS1 for a vote that should be granted (new term, acceptable opid).
// Note: peers are required to vote regardless of whether they recognize the
// candidate's UUID or not, so the ID used here ("A") is not important.
TServerDetails* ts1_ets = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
ASSERT_OK(RequestVote(ts1_ets, tablet_id, "A", current_term, last_logged_opid,
/*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout));
// Ask TS1 for a vote that should be denied (different candidate, same term).
Status s = RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
/*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
kTimeout);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.ToString(), "Already voted for candidate A in this term");
// Ask TS1 for a vote that should be denied (old term).
s = RequestVote(ts1_ets, tablet_id, "B", current_term - 1, last_logged_opid,
/*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_MATCHES(s.ToString(), "Denying vote to candidate B for earlier term");
// Increment term.
current_term++;
OpId old_opid = MakeOpId(last_logged_opid.term(), last_logged_opid.index() - 1);
// Ask TS1 for a vote that should be denied (old last-logged opid).
s = RequestVote(ts1_ets, tablet_id, "B", current_term, old_opid,
/*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_MATCHES(s.ToString(),
"Denying vote to candidate B.*greater than that of the candidate");
// Ask for a successful vote for candidate B.
ASSERT_OK(RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
/*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout));
}
}
// Disable tombstoned voting and ensure that an election that would require it fails.
TEST_F(TombstonedVotingIMCITest, TestNoVoteIfTombstonedVotingDisabled) {
// This test waits for several seconds, so only run it in slow mode.
SKIP_IF_SLOW_NOT_ALLOWED();
FLAGS_raft_enable_tombstoned_voting = false; // Disable tombstoned voting.
FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 50) {
SleepFor(MonoDelta::FromMilliseconds(50));
}
workload.StopAndJoin();
// Figure out the tablet id to mess with.
vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
ASSERT_EQ(1, tablet_ids.size());
const string& tablet_id = tablet_ids[0];
// Ensure all servers are up to date.
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
// Tombstone TS1 and try to get TS0 to vote for it.
TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
scoped_refptr<TabletReplica> ts0_replica;
ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
tablet_id, &ts0_replica));
LeaderStepDownResponsePB resp;
ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
ASSERT_OK(ts0_replica->consensus()->StartElection(
RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
// Wait for some time to ensure TS0 cannot get elected.
MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
while (MonoTime::Now() < deadline) {
ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
SleepFor(MonoDelta::FromMilliseconds(50));
}
}
// Test that a replica will not vote while tombstoned if it was deleted while
// the last-logged opid was unknown. This may occur if a tablet is tombstoned
// while in a FAILED state.
TEST_F(TombstonedVotingIMCITest, TestNoVoteIfNoLastLoggedOpId) {
SKIP_IF_SLOW_NOT_ALLOWED();
FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 50) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
workload.StopAndJoin();
tserver::MiniTabletServer* ts0 = cluster_->mini_tablet_server(0);
string ts0_uuid = ts0->uuid();
tserver::MiniTabletServer* ts1 = cluster_->mini_tablet_server(1);
string ts1_uuid = ts0->uuid();
// Determine the tablet id.
vector<string> tablet_ids = ts0->ListTablets();
ASSERT_EQ(1, tablet_ids.size());
const string& tablet_id = tablet_ids[0];
// Ensure all servers are in sync.
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
// Shut down each TS, then corrupt the TS0 cmeta.
string ts0_cmeta_path = ts0->server()->fs_manager()->GetConsensusMetadataPath(tablet_id);
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
cluster_->mini_tablet_server(i)->Shutdown();
}
std::unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(ts0_cmeta_path, &file));
ASSERT_OK(file->Append("\0"));
ASSERT_OK(file->Close());
// Restart each TS so it comes back up on the same ports.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ASSERT_OK(cluster_->mini_tablet_server(i)->Restart());
}
// Wait until the tablet is in FAILED state.
ASSERT_OK(itest::WaitUntilTabletInState(ts_map_[ts0_uuid], tablet_id, TabletStatePB::FAILED,
kTimeout));
scoped_refptr<TabletReplica> replica;
ASSERT_TRUE(ts0->server() != nullptr);
ASSERT_TRUE(ts0->server()->tablet_manager() != nullptr);
ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet(tablet_id, &replica));
ASSERT_EQ(tablet::FAILED, replica->state());
// Now tombstone the failed replica on TS0.
ASSERT_OK(DeleteTablet(ts_map_[ts0_uuid], tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
// Wait until TS1 is running.
ASSERT_EVENTUALLY([&] {
TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
ASSERT_EQ(tablet::RUNNING, replica->state());
});
// Ensure that TS1 cannot become leader because TS0 will not vote.
MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
while (MonoTime::Now() < deadline) {
scoped_refptr<TabletReplica> replica;
TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
ASSERT_TRUE(tablet_manager != nullptr);
ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
std::shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
if (consensus) {
ASSERT_EQ(RaftPeerPB::FOLLOWER, consensus->role());
}
}
}
enum RestartAfterTombstone {
kNoRestart,
kRestart,
};
class TsRecoveryTombstonedIMCITest : public MiniClusterITestBase,
public ::testing::WithParamInterface<RestartAfterTombstone> {
};
INSTANTIATE_TEST_SUITE_P(Restart, TsRecoveryTombstonedIMCITest,
::testing::Values(kNoRestart, kRestart));
// Basic tombstoned voting test.
TEST_P(TsRecoveryTombstonedIMCITest, TestTombstonedVoter) {
const RestartAfterTombstone to_restart = GetParam();
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 50) {
SleepFor(MonoDelta::FromMilliseconds(50));
}
workload.StopAndJoin();
// Figure out the tablet id to Tablet Copy.
vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
ASSERT_EQ(1, tablet_ids.size());
const string& tablet_id = tablet_ids[0];
// Ensure all servers are up to date.
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
auto live_ts_map = ts_map_;
ASSERT_EQ(1, live_ts_map.erase(cluster_->mini_tablet_server(1)->uuid()));
// Shut down TS 0 then tombstone TS 1. Restart TS 0.
// TS 0 should get a vote from TS 1 and then make a copy on TS 1, bringing
// the cluster back up to full strength.
LOG(INFO) << "shutting down TS " << cluster_->mini_tablet_server(0)->uuid();
cluster_->mini_tablet_server(0)->Shutdown();
LOG(INFO) << "tombstoning replica on TS " << cluster_->mini_tablet_server(1)->uuid();
TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
if (to_restart == kRestart) {
LOG(INFO) << "restarting tombstoned TS " << cluster_->mini_tablet_server(1)->uuid();
cluster_->mini_tablet_server(1)->Shutdown();
ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
}
LOG(INFO) << "restarting TS " << cluster_->mini_tablet_server(1)->uuid();
ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
// Wait for the tablet copy to complete.
LOG(INFO) << "waiting for leader election and tablet copy to complete...";
ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
LOG(INFO) << "attempting to write a few more rows...";
// Write a little bit more.
int target_rows = workload.rows_inserted() + 100;
workload.Start();
while (workload.rows_inserted() < target_rows) {
SleepFor(MonoDelta::FromMilliseconds(50));
}
workload.StopAndJoin();
// Do a final verification that the servers match.
LOG(INFO) << "waiting for final agreement...";
ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
}
} // namespace kudu