blob: fba0520605bc729515955a2784fe3bc56df1b45c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/optional.hpp>
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include <string>
#include <unordered_map>
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/external_mini_cluster.h"
#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tserver/tablet_copy_client.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pstack_watcher.h"
#include "kudu/util/test_util.h"
DEFINE_int32(test_delete_leader_num_iters, 3,
"Number of iterations to run in TestDeleteLeaderDuringTabletCopyStressTest.");
DEFINE_int32(test_delete_leader_min_rows_per_iter, 20,
"Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
DEFINE_int32(test_delete_leader_payload_bytes, 16 * 1024,
"Payload byte size in TestDeleteLeaderDuringTabletCopyStressTest.");
DEFINE_int32(test_delete_leader_num_writer_threads, 1,
"Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaFromSchema;
using kudu::client::KuduTableCreator;
using kudu::client::sp::shared_ptr;
using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
using kudu::itest::TServerDetails;
using kudu::tablet::TABLET_DATA_DELETED;
using kudu::tablet::TABLET_DATA_TOMBSTONED;
using kudu::tserver::ListTabletsResponsePB;
using kudu::tserver::TabletCopyClient;
using std::string;
using std::unordered_map;
using std::vector;
using strings::Substitute;
METRIC_DECLARE_entity(server);
METRIC_DECLARE_histogram(handler_latency_kudu_consensus_ConsensusService_UpdateConsensus);
METRIC_DECLARE_counter(glog_info_messages);
METRIC_DECLARE_counter(glog_warning_messages);
METRIC_DECLARE_counter(glog_error_messages);
namespace kudu {
class TabletCopyITest : public KuduTest {
public:
virtual void TearDown() OVERRIDE {
if (HasFatalFailure()) {
LOG(INFO) << "Found fatal failure";
for (int i = 0; i < 3; i++) {
if (!cluster_->tablet_server(i)->IsProcessAlive()) {
LOG(INFO) << "Tablet server " << i << " is not running. Cannot dump its stacks.";
continue;
}
LOG(INFO) << "Attempting to dump stacks of TS " << i
<< " with UUID " << cluster_->tablet_server(i)->uuid()
<< " and pid " << cluster_->tablet_server(i)->pid();
WARN_NOT_OK(PstackWatcher::DumpPidStacks(cluster_->tablet_server(i)->pid()),
"Couldn't dump stacks");
}
}
if (cluster_) cluster_->Shutdown();
KuduTest::TearDown();
STLDeleteValues(&ts_map_);
}
protected:
void StartCluster(const vector<string>& extra_tserver_flags = vector<string>(),
const vector<string>& extra_master_flags = vector<string>(),
int num_tablet_servers = 3);
gscoped_ptr<ExternalMiniCluster> cluster_;
gscoped_ptr<itest::ExternalMiniClusterFsInspector> inspect_;
shared_ptr<KuduClient> client_;
unordered_map<string, TServerDetails*> ts_map_;
};
void TabletCopyITest::StartCluster(const vector<string>& extra_tserver_flags,
const vector<string>& extra_master_flags,
int num_tablet_servers) {
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tablet_servers;
opts.extra_tserver_flags = extra_tserver_flags;
opts.extra_tserver_flags.push_back("--never_fsync"); // fsync causes flakiness on EC2.
opts.extra_master_flags = extra_master_flags;
cluster_.reset(new ExternalMiniCluster(opts));
ASSERT_OK(cluster_->Start());
inspect_.reset(new itest::ExternalMiniClusterFsInspector(cluster_.get()));
ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
cluster_->messenger(),
&ts_map_));
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(builder, &client_));
}
// If a rogue (a.k.a. zombie) leader tries to replace a tombstoned
// tablet via Tablet Copy, make sure its term isn't older than the latest term
// we observed. If it is older, make sure we reject the request, to avoid allowing old
// leaders to create a parallel universe. This is possible because config
// change could cause nodes to move around. The term check is reasonable
// because only one node can be elected leader for a given term.
//
// A leader can "go rogue" due to a VM pause, CTRL-z, partition, etc.
TEST_F(TabletCopyITest, TestRejectRogueLeader) {
// This test pauses for at least 10 seconds. Only run in slow-test mode.
if (!AllowSlowTests()) {
LOG(INFO) << "Skipping test in fast-test mode.";
return;
}
vector<string> ts_flags, master_flags;
ts_flags.push_back("--enable_leader_failure_detection=false");
master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
NO_FATALS(StartCluster(ts_flags, master_flags));
const MonoDelta timeout = MonoDelta::FromSeconds(30);
const int kTsIndex = 0; // We'll test with the first TS.
TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
TestWorkload workload(cluster_.get());
workload.Setup();
// Figure out the tablet id of the created tablet.
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
string tablet_id = tablets[0].tablet_status().tablet_id();
// Wait until all replicas are up and running.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
tablet_id, timeout));
}
// Elect a leader for term 1, then run some data through the cluster.
int zombie_leader_index = 1;
string zombie_leader_uuid = cluster_->tablet_server(zombie_leader_index)->uuid();
ASSERT_OK(itest::StartElection(ts_map_[zombie_leader_uuid], tablet_id, timeout));
workload.Start();
while (workload.rows_inserted() < 100) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
workload.StopAndJoin();
ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
// Come out of the blue and try to initiate Tablet Copy from a running server while
// specifying an old term. That running server should reject the request.
// We are essentially masquerading as a rogue leader here.
Status s = itest::StartTabletCopy(ts, tablet_id, zombie_leader_uuid,
HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
0, // Say I'm from term 0.
timeout);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.ToString(), "term 0 lower than last logged term 1");
// Now pause the actual leader so we can bring him back as a zombie later.
ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Pause());
// Trigger TS 2 to become leader of term 2.
int new_leader_index = 2;
string new_leader_uuid = cluster_->tablet_server(new_leader_index)->uuid();
ASSERT_OK(itest::StartElection(ts_map_[new_leader_uuid], tablet_id, timeout));
ASSERT_OK(itest::WaitUntilLeader(ts_map_[new_leader_uuid], tablet_id, timeout));
unordered_map<string, TServerDetails*> active_ts_map = ts_map_;
ASSERT_EQ(1, active_ts_map.erase(zombie_leader_uuid));
// Wait for the NO_OP entry from the term 2 election to propagate to the
// remaining nodes' logs so that we are guaranteed to reject the rogue
// leader's tablet copy request when we bring it back online.
int log_index = workload.batches_completed() + 2; // 2 terms == 2 additional NO_OP entries.
ASSERT_OK(WaitForServersToAgree(timeout, active_ts_map, tablet_id, log_index));
// TODO: Write more rows to the new leader once KUDU-1034 is fixed.
// Now kill the new leader and tombstone the replica on TS 0.
cluster_->tablet_server(new_leader_index)->Shutdown();
ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
// Zombies!!! Resume the rogue zombie leader.
// He should attempt to tablet copy TS 0 but fail.
ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Resume());
// Loop for a few seconds to ensure that the tablet doesn't transition to READY.
MonoTime deadline = MonoTime::Now();
deadline.AddDelta(MonoDelta::FromSeconds(5));
while (MonoTime::Now().ComesBefore(deadline)) {
ASSERT_OK(itest::ListTablets(ts, timeout, &tablets));
ASSERT_EQ(1, tablets.size());
ASSERT_EQ(TABLET_DATA_TOMBSTONED, tablets[0].tablet_status().tablet_data_state());
SleepFor(MonoDelta::FromMilliseconds(10));
}
// Force the rogue leader to step down.
// Then, send a tablet copy start request from a "fake" leader that
// sends an up-to-date term in the RB request but the actual term stored
// in the copy source's consensus metadata would still be old.
LOG(INFO) << "Forcing rogue leader T " << tablet_id << " P " << zombie_leader_uuid
<< " to step down...";
ASSERT_OK(itest::LeaderStepDown(ts_map_[zombie_leader_uuid], tablet_id, timeout));
ExternalTabletServer* zombie_ets = cluster_->tablet_server(zombie_leader_index);
// It's not necessarily part of the API but this could return faliure due to
// rejecting the remote. We intend to make that part async though, so ignoring
// this return value in this test.
ignore_result(itest::StartTabletCopy(ts, tablet_id, zombie_leader_uuid,
HostPort(zombie_ets->bound_rpc_addr()),
2, // Say I'm from term 2.
timeout));
// Wait another few seconds to be sure the tablet copy is rejected.
deadline = MonoTime::Now();
deadline.AddDelta(MonoDelta::FromSeconds(5));
while (MonoTime::Now().ComesBefore(deadline)) {
ASSERT_OK(itest::ListTablets(ts, timeout, &tablets));
ASSERT_EQ(1, tablets.size());
ASSERT_EQ(TABLET_DATA_TOMBSTONED, tablets[0].tablet_status().tablet_data_state());
SleepFor(MonoDelta::FromMilliseconds(10));
}
}
// Start tablet copy session and delete the tablet in the middle.
// It should actually be possible to complete copying in such a case, because
// when a Tablet Copy session is started on the "source" server, all of
// the relevant files are either read or opened, meaning that an in-progress
// Tablet Copy can complete even after a tablet is officially "deleted" on
// the source server. This is also a regression test for KUDU-1009.
TEST_F(TabletCopyITest, TestDeleteTabletDuringTabletCopy) {
MonoDelta timeout = MonoDelta::FromSeconds(10);
const int kTsIndex = 0; // We'll test with the first TS.
NO_FATALS(StartCluster());
// Populate a tablet with some data.
TestWorkload workload(cluster_.get());
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 1000) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
// Figure out the tablet id of the created tablet.
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
string tablet_id = tablets[0].tablet_status().tablet_id();
// Ensure all the servers agree before we proceed.
workload.StopAndJoin();
ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
// Set up an FsManager to use with the TabletCopyClient.
FsManagerOpts opts;
string testbase = GetTestPath("fake-ts");
ASSERT_OK(env_->CreateDir(testbase));
opts.wal_path = JoinPathSegments(testbase, "wals");
opts.data_paths.push_back(JoinPathSegments(testbase, "data-0"));
gscoped_ptr<FsManager> fs_manager(new FsManager(env_.get(), opts));
ASSERT_OK(fs_manager->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager->Open());
{
// Start up a TabletCopyClient and open a tablet copy session.
TabletCopyClient tc_client(tablet_id, fs_manager.get(),
cluster_->messenger());
scoped_refptr<tablet::TabletMetadata> meta;
ASSERT_OK(tc_client.Start(cluster_->tablet_server(kTsIndex)->bound_rpc_hostport(),
&meta));
// Tombstone the tablet on the remote!
ASSERT_OK(itest::DeleteTablet(ts, tablet_id,
TABLET_DATA_TOMBSTONED, boost::none, timeout));
// Now finish copying!
tablet::TabletStatusListener listener(meta);
ASSERT_OK(tc_client.FetchAll(&listener));
ASSERT_OK(tc_client.Finish());
// Run destructor, which closes the remote session.
}
SleepFor(MonoDelta::FromMilliseconds(50)); // Give a little time for a crash (KUDU-1009).
ASSERT_TRUE(cluster_->tablet_server(kTsIndex)->IsProcessAlive());
}
// This test ensures that a leader can Tablet Copy on top of a tombstoned replica
// that has a higher term recorded in the replica's consensus metadata if the
// replica's last-logged opid has the same term (or less) as the leader serving
// as the tablet copy source. When a tablet is tombstoned, its last-logged
// opid is stored in a field its on-disk superblock.
TEST_F(TabletCopyITest, TestTabletCopyFollowerWithHigherTerm) {
vector<string> ts_flags, master_flags;
ts_flags.push_back("--enable_leader_failure_detection=false");
master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
const int kNumTabletServers = 2;
NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
const MonoDelta timeout = MonoDelta::FromSeconds(30);
const int kFollowerIndex = 0;
TServerDetails* follower_ts = ts_map_[cluster_->tablet_server(kFollowerIndex)->uuid()];
TestWorkload workload(cluster_.get());
workload.set_num_replicas(2);
workload.Setup();
// Figure out the tablet id of the created tablet.
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(follower_ts, 1, timeout, &tablets));
string tablet_id = tablets[0].tablet_status().tablet_id();
// Wait until all replicas are up and running.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
tablet_id, timeout));
}
// Elect a leader for term 1, then run some data through the cluster.
const int kLeaderIndex = 1;
TServerDetails* leader_ts = ts_map_[cluster_->tablet_server(kLeaderIndex)->uuid()];
ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout));
workload.Start();
while (workload.rows_inserted() < 100) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
workload.StopAndJoin();
ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
// Pause the leader and increment the term on the follower by starting an
// election on the follower. The election will fail asynchronously but we
// just wait until we see that its term has incremented.
ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Pause());
ASSERT_OK(itest::StartElection(follower_ts, tablet_id, timeout));
int64_t term = 0;
for (int i = 0; i < 1000; i++) {
consensus::ConsensusStatePB cstate;
ASSERT_OK(itest::GetConsensusState(follower_ts, tablet_id, CONSENSUS_CONFIG_COMMITTED,
timeout, &cstate));
term = cstate.current_term();
if (term == 2) break;
SleepFor(MonoDelta::FromMilliseconds(10));
}
ASSERT_EQ(2, term);
// Now tombstone the follower.
ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
timeout));
// Restart the follower's TS so that the leader's TS won't get its queued
// vote request messages. This is a hack but seems to work.
cluster_->tablet_server(kFollowerIndex)->Shutdown();
ASSERT_OK(cluster_->tablet_server(kFollowerIndex)->Restart());
// Now wake the leader. It should detect that the follower needs to be
// copied and proceed to bring it back up to date.
ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Resume());
// Wait for the follower to come back up.
ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
}
// Test that multiple concurrent tablet copys do not cause problems.
// This is a regression test for KUDU-951, in which concurrent sessions on
// multiple tablets between the same tablet copy client host and source host
// could corrupt each other.
TEST_F(TabletCopyITest, TestConcurrentTabletCopys) {
if (!AllowSlowTests()) {
LOG(INFO) << "Skipping test in fast-test mode.";
return;
}
vector<string> ts_flags, master_flags;
ts_flags.push_back("--enable_leader_failure_detection=false");
ts_flags.push_back("--log_cache_size_limit_mb=1");
ts_flags.push_back("--log_segment_size_mb=1");
ts_flags.push_back("--log_async_preallocate_segments=false");
ts_flags.push_back("--log_min_segments_to_retain=100");
ts_flags.push_back("--flush_threshold_mb=0"); // Constantly flush.
ts_flags.push_back("--maintenance_manager_polling_interval_ms=10");
master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
NO_FATALS(StartCluster(ts_flags, master_flags));
const MonoDelta timeout = MonoDelta::FromSeconds(60);
// Create a table with several tablets. These will all be simultaneously
// copied to a single target node from the same leader host.
const int kNumTablets = 10;
KuduSchema client_schema(KuduSchemaFromSchema(GetSimpleTestSchema()));
vector<const KuduPartialRow*> splits;
for (int i = 0; i < kNumTablets - 1; i++) {
KuduPartialRow* row = client_schema.NewRow();
ASSERT_OK(row->SetInt32(0, numeric_limits<int32_t>::max() / kNumTablets * (i + 1)));
splits.push_back(row);
}
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(TestWorkload::kDefaultTableName)
.split_rows(splits)
.schema(&client_schema)
.set_range_partition_columns({ "key" })
.num_replicas(3)
.Create());
const int kTsIndex = 0; // We'll test with the first TS.
TServerDetails* target_ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
// Figure out the tablet ids of the created tablets.
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(target_ts, kNumTablets, timeout, &tablets));
vector<string> tablet_ids;
for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
tablet_ids.push_back(t.tablet_status().tablet_id());
}
// Wait until all replicas are up and running.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
for (const string& tablet_id : tablet_ids) {
ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
tablet_id, timeout));
}
}
// Elect leaders on each tablet for term 1. All leaders will be on TS 1.
const int kLeaderIndex = 1;
const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
for (const string& tablet_id : tablet_ids) {
ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, timeout));
}
TestWorkload workload(cluster_.get());
workload.set_write_timeout_millis(10000);
workload.set_timeout_allowed(true);
workload.set_write_batch_size(10);
workload.set_num_write_threads(10);
workload.Setup();
workload.Start();
while (workload.rows_inserted() < 20000) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
workload.StopAndJoin();
for (const string& tablet_id : tablet_ids) {
ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
}
// Now pause the leader so we can tombstone the tablets.
ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Pause());
for (const string& tablet_id : tablet_ids) {
LOG(INFO) << "Tombstoning tablet " << tablet_id << " on TS " << target_ts->uuid();
ASSERT_OK(itest::DeleteTablet(target_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
MonoDelta::FromSeconds(10)));
}
// Unpause the leader TS and wait for it to initiate Tablet Copy and replace the tombstoned
// tablets, in parallel.
ASSERT_OK(cluster_->tablet_server(kLeaderIndex)->Resume());
for (const string& tablet_id : tablet_ids) {
ASSERT_OK(itest::WaitUntilTabletRunning(target_ts, tablet_id, timeout));
}
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
workload.rows_inserted()));
}
// Test that repeatedly runs a load, tombstones a follower, then tombstones the
// leader while the follower is copying. Regression test for
// KUDU-1047.
TEST_F(TabletCopyITest, TestDeleteLeaderDuringTabletCopyStressTest) {
// This test takes a while due to failure detection.
if (!AllowSlowTests()) {
LOG(INFO) << "Skipping test in fast-test mode.";
return;
}
const MonoDelta timeout = MonoDelta::FromSeconds(60);
NO_FATALS(StartCluster(vector<string>(), vector<string>(), 5));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(5);
workload.set_payload_bytes(FLAGS_test_delete_leader_payload_bytes);
workload.set_num_write_threads(FLAGS_test_delete_leader_num_writer_threads);
workload.set_write_batch_size(1);
workload.set_write_timeout_millis(10000);
workload.set_timeout_allowed(true);
workload.set_not_found_allowed(true);
workload.Setup();
// Figure out the tablet id.
const int kTsIndex = 0;
TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
string tablet_id = tablets[0].tablet_status().tablet_id();
// Wait until all replicas are up and running.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
tablet_id, timeout));
}
int leader_index = -1;
int follower_index = -1;
TServerDetails* leader_ts = nullptr;
TServerDetails* follower_ts = nullptr;
for (int i = 0; i < FLAGS_test_delete_leader_num_iters; i++) {
LOG(INFO) << "Iteration " << (i + 1);
int rows_previously_inserted = workload.rows_inserted();
// Find out who's leader.
ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, timeout, &leader_ts));
leader_index = cluster_->tablet_server_index_by_uuid(leader_ts->uuid());
// Select an arbitrary follower.
follower_index = (leader_index + 1) % cluster_->num_tablet_servers();
follower_ts = ts_map_[cluster_->tablet_server(follower_index)->uuid()];
// Spin up the workload.
workload.Start();
while (workload.rows_inserted() < rows_previously_inserted +
FLAGS_test_delete_leader_min_rows_per_iter) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
// Tombstone the follower.
LOG(INFO) << "Tombstoning follower tablet " << tablet_id << " on TS " << follower_ts->uuid();
ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
timeout));
// Wait for tablet copy to start.
ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(
follower_index, tablet_id,
{ tablet::TABLET_DATA_COPYING, tablet::TABLET_DATA_READY },
timeout));
// Tombstone the leader.
LOG(INFO) << "Tombstoning leader tablet " << tablet_id << " on TS " << leader_ts->uuid();
ASSERT_OK(itest::DeleteTablet(leader_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
timeout));
// Quiesce and rebuild to full strength. This involves electing a new
// leader from the remaining three, which requires a unanimous vote, and
// that leader then copying the old leader.
workload.StopAndJoin();
ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
}
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
workload.rows_inserted()));
}
namespace {
int64_t CountUpdateConsensusCalls(ExternalTabletServer* ets, const string& tablet_id) {
int64_t ret;
CHECK_OK(ets->GetInt64Metric(
&METRIC_ENTITY_server,
"kudu.tabletserver",
&METRIC_handler_latency_kudu_consensus_ConsensusService_UpdateConsensus,
"total_count",
&ret));
return ret;
}
int64_t CountLogMessages(ExternalTabletServer* ets) {
int64_t total = 0;
int64_t count;
CHECK_OK(ets->GetInt64Metric(
&METRIC_ENTITY_server,
"kudu.tabletserver",
&METRIC_glog_info_messages,
"value",
&count));
total += count;
CHECK_OK(ets->GetInt64Metric(
&METRIC_ENTITY_server,
"kudu.tabletserver",
&METRIC_glog_warning_messages,
"value",
&count));
total += count;
CHECK_OK(ets->GetInt64Metric(
&METRIC_ENTITY_server,
"kudu.tabletserver",
&METRIC_glog_error_messages,
"value",
&count));
total += count;
return total;
}
} // anonymous namespace
// Test that if tablet copy is disabled by a flag, we don't get into
// tight loops after a tablet is deleted. This is a regression test for situation
// similar to the bug described in KUDU-821: we were previously handling a missing
// tablet within consensus in such a way that we'd immediately send another RPC.
TEST_F(TabletCopyITest, TestDisableTabletCopy_NoTightLoopWhenTabletDeleted) {
MonoDelta timeout = MonoDelta::FromSeconds(10);
vector<string> ts_flags, master_flags;
ts_flags.push_back("--enable_leader_failure_detection=false");
ts_flags.push_back("--enable_tablet_copy=false");
master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
NO_FATALS(StartCluster(ts_flags, master_flags));
TestWorkload workload(cluster_.get());
workload.set_write_batch_size(1);
workload.Setup();
// Figure out the tablet id of the created tablet.
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ExternalTabletServer* replica_ets = cluster_->tablet_server(1);
TServerDetails* replica_ts = ts_map_[replica_ets->uuid()];
ASSERT_OK(WaitForNumTabletsOnTS(replica_ts, 1, timeout, &tablets));
string tablet_id = tablets[0].tablet_status().tablet_id();
// Wait until all replicas are up and running.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
tablet_id, timeout));
}
// Elect a leader (TS 0)
ExternalTabletServer* leader_ts = cluster_->tablet_server(0);
ASSERT_OK(itest::StartElection(ts_map_[leader_ts->uuid()], tablet_id, timeout));
// Start writing, wait for some rows to be inserted.
workload.Start();
while (workload.rows_inserted() < 100) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
// Tombstone the tablet on one of the servers (TS 1)
ASSERT_OK(itest::DeleteTablet(replica_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
timeout));
// Ensure that, if we sleep for a second while still doing writes to the leader:
// a) we don't spew logs on the leader side
// b) we don't get hit with a lot of UpdateConsensus calls on the replica.
int64_t num_update_rpcs_initial = CountUpdateConsensusCalls(replica_ets, tablet_id);
int64_t num_logs_initial = CountLogMessages(leader_ts);
SleepFor(MonoDelta::FromSeconds(1));
int64_t num_update_rpcs_after_sleep = CountUpdateConsensusCalls(replica_ets, tablet_id);
int64_t num_logs_after_sleep = CountLogMessages(leader_ts);
// Calculate rate per second of RPCs and log messages
int64_t update_rpcs_per_second = num_update_rpcs_after_sleep - num_update_rpcs_initial;
EXPECT_LT(update_rpcs_per_second, 20);
int64_t num_logs_per_second = num_logs_after_sleep - num_logs_initial;
EXPECT_LT(num_logs_per_second, 20);
}
// Test that if a Tablet Copy is taking a long time but the client peer is still responsive,
// the leader won't mark it as failed.
TEST_F(TabletCopyITest, TestSlowCopyDoesntFail) {
MonoDelta timeout = MonoDelta::FromSeconds(30);
vector<string> ts_flags, master_flags;
ts_flags.push_back("--enable_leader_failure_detection=false");
ts_flags.push_back("--tablet_copy_dowload_file_inject_latency_ms=5000");
ts_flags.push_back("--follower_unavailable_considered_failed_sec=2");
master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
NO_FATALS(StartCluster(ts_flags, master_flags));
TestWorkload workload(cluster_.get());
workload.set_write_batch_size(1);
workload.Setup();
// Figure out the tablet id of the created tablet.
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ExternalTabletServer* replica_ets = cluster_->tablet_server(1);
TServerDetails* replica_ts = ts_map_[replica_ets->uuid()];
ASSERT_OK(WaitForNumTabletsOnTS(replica_ts, 1, timeout, &tablets));
string tablet_id = tablets[0].tablet_status().tablet_id();
// Wait until all replicas are up and running.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
tablet_id, timeout));
}
// Elect a leader (TS 0)
ExternalTabletServer* leader_ts = cluster_->tablet_server(0);
ASSERT_OK(itest::StartElection(ts_map_[leader_ts->uuid()], tablet_id, timeout));
// Start writing, wait for some rows to be inserted.
workload.Start();
while (workload.rows_inserted() < 100) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
// Tombstone the follower.
LOG(INFO) << "Tombstoning follower tablet " << tablet_id << " on TS " << replica_ts->uuid();
ASSERT_OK(itest::DeleteTablet(replica_ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
timeout));
// Wait for tablet copy to start.
ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(1, tablet_id,
{ tablet::TABLET_DATA_COPYING }, timeout));
workload.StopAndJoin();
ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
workload.rows_inserted()));
}
// Attempting to start Tablet Copy on a tablet that was deleted with
// TABLET_DATA_DELETED should fail. This behavior helps avoid thrashing when
// a follower tablet is deleted and the leader notices before it has processed
// its own DeleteTablet RPC, thinking that it needs to bring its follower back.
TEST_F(TabletCopyITest, TestTabletCopyingDeletedTabletFails) {
// Delete the leader with TABLET_DATA_DELETED.
// Attempt to manually copy a replica to the leader from a follower.
// Should get an error saying it's illegal.
MonoDelta kTimeout = MonoDelta::FromSeconds(30);
NO_FATALS(StartCluster({"--enable_leader_failure_detection=false"},
{"--catalog_manager_wait_for_new_tablets_to_elect_leader=false"}));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(3);
workload.Setup();
TServerDetails* leader = ts_map_[cluster_->tablet_server(0)->uuid()];
// Figure out the tablet id of the created tablet.
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(leader, 1, kTimeout, &tablets));
string tablet_id = tablets[0].tablet_status().tablet_id();
// Wait until all replicas are up and running.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
tablet_id, kTimeout));
}
// Elect a leader for term 1, then run some data through the cluster.
ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
// Now delete the leader with TABLET_DATA_DELETED. We should not be able to
// bring back the leader after that until restarting the process.
ASSERT_OK(itest::DeleteTablet(leader, tablet_id, TABLET_DATA_DELETED, boost::none, kTimeout));
Status s = itest::StartTabletCopy(leader, tablet_id,
cluster_->tablet_server(1)->uuid(),
HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
1, // We are in term 1.
kTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Cannot transition from state TABLET_DATA_DELETED");
// Restart the server so that it won't remember the tablet was permanently
// deleted and we can tablet copy the server again.
cluster_->tablet_server(0)->Shutdown();
ASSERT_OK(cluster_->tablet_server(0)->Restart());
ASSERT_OK(itest::StartTabletCopy(leader, tablet_id,
cluster_->tablet_server(1)->uuid(),
HostPort(cluster_->tablet_server(1)->bound_rpc_addr()),
1, // We are in term 1.
kTimeout));
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
}
} // namespace kudu