blob: 1f6a74cd208f177fac843a6b23cf7b42d18a2e79 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/log_verifier.h"
#include "kudu/integration-tests/mini_cluster_fs_inspector.h"
#include "kudu/integration-tests/raft_consensus-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/master/master.pb.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tserver/tablet_server-test-base.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/atomic.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int32(client_inserts_per_thread);
DECLARE_int32(client_num_batches_per_thread);
DECLARE_int32(consensus_rpc_timeout_ms);
DECLARE_int32(num_client_threads);
DECLARE_int32(num_replicas);
DECLARE_int32(num_tablet_servers);
DECLARE_int32(rpc_timeout);
METRIC_DECLARE_entity(server);
METRIC_DECLARE_entity(tablet);
METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
METRIC_DECLARE_gauge_int64(time_since_last_leader_heartbeat);
METRIC_DECLARE_gauge_int64(failed_elections_since_stable_leader);
METRIC_DECLARE_gauge_uint64(hybrid_clock_timestamp);
using kudu::client::KuduInsert;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalDaemonOptions;
using kudu::cluster::ExternalTabletServer;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::consensus::ConsensusRequestPB;
using kudu::consensus::ConsensusResponsePB;
using kudu::consensus::ConsensusServiceProxy;
using kudu::consensus::EXCLUDE_HEALTH_REPORT;
using kudu::consensus::LeaderStepDownRequestPB;
using kudu::consensus::LeaderStepDownResponsePB;
using kudu::consensus::MajoritySize;
using kudu::consensus::MakeOpId;
using kudu::consensus::OpId;
using kudu::consensus::RaftPeerAttrsPB;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::ReplicateMsg;
using kudu::itest::AddServer;
using kudu::itest::DONT_WAIT_FOR_LEADER;
using kudu::itest::GetInt64Metric;
using kudu::itest::LeaderStepDown;
using kudu::itest::RemoveServer;
using kudu::itest::StartElection;
using kudu::itest::TServerDetails;
using kudu::itest::TabletServerMap;
using kudu::itest::WAIT_FOR_LEADER;
using kudu::itest::WaitForNumTabletsOnTS;
using kudu::itest::WaitForOpFromCurrentTerm;
using kudu::itest::WaitForReplicasReportedToMaster;
using kudu::itest::WaitForServersToAgree;
using kudu::itest::WaitUntilCommittedOpIdIndexIs;
using kudu::itest::WaitUntilLeader;
using kudu::itest::WaitUntilTabletInState;
using kudu::itest::WriteSimpleTestRow;
using kudu::master::TabletLocationsPB;
using kudu::master::VOTER_REPLICA;
using kudu::env_util::ListFilesInDir;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::RpcController;
using kudu::server::SetFlagRequestPB;
using kudu::server::SetFlagResponsePB;
using std::string;
using std::thread;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tserver {
static const int kConsensusRpcTimeoutForTests = 50;
static const int kTestRowKey = 1234;
static const int kTestRowIntVal = 5678;
// Integration test for the raft consensus implementation.
// Uses the whole tablet server stack with ExternalMiniCluster.
class RaftConsensusITest : public RaftConsensusITestBase {
public:
RaftConsensusITest() {
}
// Gets the current timestamp on the given server.
int64_t GetTimestampOnServer(TServerDetails* tserver) const;
// Scan the given replica in a loop until the number of rows
// is 'expected_count'. If it takes more than 10 seconds, then
// fails the test.
void WaitForRowCount(TabletServerServiceProxy* replica_proxy,
int expected_count,
vector<string>* results);
// Add an Insert operation to the given consensus request.
// The row to be inserted is generated based on the OpId with a timestamp
// that is greater than 'base_ts'.
void AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req);
void AddOpWithTypeAndKey(const OpId& id, int64_t base_ts,
RowOperationsPB::Type op_type,
int32_t key,
ConsensusRequestPB* req);
string DumpToString(TServerDetails* leader,
const vector<string>& leader_results,
TServerDetails* replica,
const vector<string>& replica_results);
// Insert 'num_rows' rows starting with row key 'start_row'.
// Each row will have size 'payload_size'. A short (100ms) timeout is
// used. If the flush generates any errors they will be ignored.
void InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size);
// Brings Chaos to a MiniTabletServer by introducing random delays. Does this by
// pausing the daemon a random amount of time.
void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec);
// Thread which loops until '*finish' becomes true, trying to insert a row
// on the given tablet server identified by 'replica_idx'.
void StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish);
// Stops the current leader of the configuration, runs leader election and then brings it back.
// Before stopping the leader this pauses all follower nodes in regular intervals so that
// we get an increased chance of stuff being pending.
void StopOrKillLeaderAndElectNewOne();
// Writes 'num_writes' operations to the current leader. Each of the operations
// has a payload of around 128KB. Causes a gtest failure on error.
void Write128KOpsToLeader(int num_writes);
// Ensure that a majority of servers is required for elections and writes.
// This is done by pausing a majority and asserting that writes and elections fail,
// then unpausing the majority and asserting that elections and writes succeed.
// If fails, throws a gtest assertion.
// Note: This test assumes all tablet servers listed in tablet_servers are voters.
void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers,
const string& leader_uuid);
void CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags = {});
void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert);
// Prepare for a test where a single replica of a 3-server cluster is left
// running as a follower.
void SetupSingleReplicaTest(TServerDetails** replica_ts);
protected:
shared_ptr<KuduTable> table_;
vector<thread> threads_;
};
int64_t RaftConsensusITest::GetTimestampOnServer(TServerDetails* tserver) const {
int64_t ret;
ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(tserver->uuid());
CHECK_OK(GetInt64Metric(ets->bound_http_hostport(), &METRIC_ENTITY_server,
nullptr, &METRIC_hybrid_clock_timestamp, "value", &ret));
return ret;
}
void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy,
int expected_count,
vector<string>* results) {
LOG(INFO) << "Waiting for row count " << expected_count << "...";
MonoTime start = MonoTime::Now();
MonoTime deadline = start + MonoDelta::FromSeconds(90);
while (true) {
results->clear();
NO_FATALS(ScanReplica(replica_proxy, results));
if (results->size() == expected_count) {
return;
}
SleepFor(MonoDelta::FromMilliseconds(10));
if (MonoTime::Now() >= deadline) {
break;
}
}
FAIL() << "Did not reach expected row count " << expected_count
<< " after " << (MonoTime::Now() - start).ToString()
<< ": rows: " << *results;
}
void RaftConsensusITest::AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req) {
AddOpWithTypeAndKey(id, base_ts, RowOperationsPB::INSERT,
id.index() * 10000 + id.term(), req);
}
void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id, int64_t base_ts,
RowOperationsPB::Type op_type,
int32_t key,
ConsensusRequestPB* req) {
ReplicateMsg* msg = req->add_ops();
msg->mutable_id()->CopyFrom(id);
// Set a somewhat realistic timestamp such that it is monotonically
// increasing per op and starts off higher than 1. This is required, as some
// test cases test the scenario where the WAL is replayed and no-ops and
// writes are expected to have monotonically increasing timestamps.
msg->set_timestamp(base_ts + id.index() * 10000 + id.term());
msg->set_op_type(consensus::WRITE_OP);
WriteRequestPB* write_req = msg->mutable_write_request();
CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
write_req->set_tablet_id(tablet_id_);
AddTestRowToPB(op_type, schema_, key, id.term(),
SecureShortDebugString(id), write_req->mutable_row_operations());
}
string RaftConsensusITest::DumpToString(TServerDetails* leader,
const vector<string>& leader_results,
TServerDetails* replica,
const vector<string>& replica_results) {
string ret = Substitute("Replica results did not match the leaders."
"\nLeader: $0\nReplica: $1. Results size "
"L: $2 R: $3",
leader->ToString(),
replica->ToString(),
leader_results.size(),
replica_results.size());
StrAppend(&ret, "Leader Results: \n");
for (const string& result : leader_results) {
StrAppend(&ret, result, "\n");
}
StrAppend(&ret, "Replica Results: \n");
for (const string& result : replica_results) {
StrAppend(&ret, result, "\n");
}
return ret;
}
void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row,
int num_rows,
int payload_size) {
shared_ptr<KuduTable> table;
CHECK_OK(client_->OpenTable(kTableId, &table));
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(100);
CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
string payload(payload_size, 'x');
for (int i = 0; i < num_rows; i++) {
unique_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
CHECK_OK(row->SetInt32(0, i + start_row));
CHECK_OK(row->SetInt32(1, 0));
CHECK_OK(row->SetStringCopy(2, payload));
CHECK_OK(session->Apply(insert.release()));
ignore_result(session->Flush());
}
}
void RaftConsensusITest::DelayInjectorThread(
ExternalTabletServer* tablet_server, int timeout_msec) {
while (inserters_.count() > 0) {
// Adjust the value obtained from the normalized gauss. dist. so that we steal the lock
// longer than the the timeout a small (~5%) percentage of the times.
// (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution).
double sleep_time_usec = 1000 *
((random_.Normal(0, 1) * timeout_msec) / 1.64485);
if (sleep_time_usec < 0) sleep_time_usec = 0;
// Additionally only cause timeouts at all 50% of the time, otherwise sleep.
double val = (rand() * 1.0) / RAND_MAX;
if (val < 0.5) {
SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
continue;
}
ASSERT_OK(tablet_server->Pause());
LOG_IF(INFO, sleep_time_usec > 0.0)
<< "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid()
<< " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec...";
SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
ASSERT_OK(tablet_server->Resume());
}
}
void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) {
vector<TServerDetails*> servers;
AppendValuesFromMap(tablet_servers_, &servers);
CHECK_LT(replica_idx, servers.size());
TServerDetails* ts = servers[replica_idx];
// Manually construct an RPC to our target replica. We expect most of the calls
// to fail either with an "already present" or an error because we are writing
// to a follower. That's OK, though - what we care about for this test is
// just that the operations Apply() in the same order everywhere (even though
// in this case the result will just be an error).
WriteRequestPB req;
WriteResponsePB resp;
RpcController rpc;
req.set_tablet_id(tablet_id_);
ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
"hello world", req.mutable_row_operations());
while (!finish->Load()) {
resp.Clear();
rpc.Reset();
rpc.set_timeout(MonoDelta::FromSeconds(10));
ignore_result(ts->tserver_proxy->Write(req, &resp, &rpc));
VLOG(1) << "Response from server " << replica_idx << ": "
<< SecureShortDebugString(resp);
}
}
void RaftConsensusITest::StopOrKillLeaderAndElectNewOne() {
TServerDetails* leader;
vector<TServerDetails*> followers;
CHECK_OK(GetTabletLeaderAndFollowers(tablet_id_, &leader, &followers));
ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
for (const auto* ts : followers) {
ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
CHECK_OK(ets->Pause());
SleepFor(MonoDelta::FromMilliseconds(100));
}
// When all are paused also pause or kill the current leader. Since we've waited a bit
// the old leader is likely to have operations that must be aborted.
const bool do_kill = rand() % 2 == 0;
if (do_kill) {
leader_ets->Shutdown();
} else {
CHECK_OK(leader_ets->Pause());
}
// Resume the replicas.
for (const auto* ts : followers) {
ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
CHECK_OK(ets->Resume());
}
// Get the new leader.
TServerDetails* new_leader;
CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader));
// Bring the old leader back.
if (do_kill) {
CHECK_OK(leader_ets->Restart());
// Wait until we have the same number of followers.
const auto initial_followers = followers.size();
do {
GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
} while (followers.size() < initial_followers);
} else {
CHECK_OK(leader_ets->Resume());
}
}
void RaftConsensusITest::Write128KOpsToLeader(int num_writes) {
TServerDetails* leader = nullptr;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
WriteRequestPB req;
req.set_tablet_id(tablet_id_);
ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
RowOperationsPB* data = req.mutable_row_operations();
WriteResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(10000));
int key = 0;
// generate a 128Kb dummy payload
string test_payload(128 * 1024, '0');
for (int i = 0; i < num_writes; i++) {
rpc.Reset();
data->Clear();
AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, key,
test_payload, data);
key++;
ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
}
}
void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
const TabletServerMap& tablet_servers, const string& leader_uuid) {
static const auto kTimeout = MonoDelta::FromSeconds(20);
TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid);
// Calculate number of servers to leave unpaused (minority).
// This math is a little unintuitive but works for cluster sizes including 2 and 1.
// Note: We assume all of these TSes are voters.
int config_size = tablet_servers.size();
int minority_to_retain = MajoritySize(config_size) - 1;
// Only perform this part of the test if we have some servers to pause, else
// the failure assertions will throw.
if (config_size > 1) {
// Pause enough replicas to prevent a majority.
int num_to_pause = config_size - minority_to_retain;
LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size;
vector<string> paused_uuids;
for (const TabletServerMap::value_type& entry : tablet_servers) {
if (paused_uuids.size() == num_to_pause) {
continue;
}
const string& replica_uuid = entry.first;
if (replica_uuid == leader_uuid) {
// Always leave this one alone.
continue;
}
ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
ASSERT_OK(replica_ts->Pause());
paused_uuids.push_back(replica_uuid);
}
// Ensure writes timeout while only a minority is alive.
Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
kTestRowKey, kTestRowIntVal, "foo",
MonoDelta::FromMilliseconds(100));
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// Step down. Scenarios which use this method turn off leader failure
// detection, so the leader role cannot fluctuate among tablet replicas.
ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, kTimeout));
// Assert that elections time out without a live majority.
// We specify a very short timeout here to keep the tests fast.
ASSERT_OK(StartElection(initial_leader, tablet_id_, kTimeout));
s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString();
// Resume the paused servers.
LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size;
for (const string& replica_uuid : paused_uuids) {
ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
ASSERT_OK(replica_ts->Resume());
}
}
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers, tablet_id_, 1));
// Now an election should succeed.
ASSERT_OK(StartElection(initial_leader, tablet_id_, kTimeout));
ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, kTimeout));
LOG(INFO) << "Successful election with full config of size " << config_size;
// And a write should also succeed.
ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size),
kTimeout));
}
void RaftConsensusITest::CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags) {
if (AllowSlowTests()) {
FLAGS_num_tablet_servers = 7;
FLAGS_num_replicas = 7;
}
vector<string> ts_flags;
// Crash 5% of the time just before sending an RPC. With 7 servers,
// this means we crash about 30% of the time before we've fully
// replicated the NO_OP at the start of the term.
ts_flags.emplace_back("--fault_crash_on_leader_request_fraction=0.05");
// Inject latency to encourage the replicas to fall out of sync
// with each other.
ts_flags.emplace_back("--log_inject_latency");
ts_flags.emplace_back("--log_inject_latency_ms_mean=30");
ts_flags.emplace_back("--log_inject_latency_ms_stddev=60");
// Make leader elections faster so we get through more cycles of leaders.
ts_flags.emplace_back("--raft_heartbeat_interval_ms=100");
// Avoid preallocating segments since bootstrap is a little bit
// faster if it doesn't have to scan forward through the preallocated
// log area.
ts_flags.emplace_back("--log_preallocate_segments=false");
ts_flags.insert(ts_flags.end(), extra_ts_flags.begin(), extra_ts_flags.end());
NO_FATALS(CreateCluster("raft_consensus-itest-crashy-nodes-cluster",
std::move(ts_flags)));
}
void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert) {
int crashes_to_cause = 3;
if (AllowSlowTests()) {
crashes_to_cause = 15;
}
workload->set_num_replicas(FLAGS_num_replicas);
// Set a really high write timeout so that even in the presence of many failures we
// can verify an exact number of rows in the end, thanks to exactly once semantics.
workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
workload->set_num_write_threads(10);
workload->set_num_read_threads(2);
workload->Setup();
workload->Start();
int num_crashes = 0;
while (num_crashes < crashes_to_cause &&
workload->rows_inserted() < max_rows_to_insert) {
num_crashes += RestartAnyCrashedTabletServers();
SleepFor(MonoDelta::FromMilliseconds(10));
}
// Writers are likely ongoing. To have some chance of completing all writes,
// restart the tablets servers, otherwise they'll keep crashing and the writes
// can never complete.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ExternalTabletServer* ts = cluster_->tablet_server(i);
vector<string>* flags = ts->mutable_flags();
bool removed_flag = false;
for (auto it = flags->begin(); it != flags->end(); ++it) {
if (HasPrefixString(*it, "--fault_crash")) {
flags->erase(it);
removed_flag = true;
break;
}
}
ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i
<< "\nFlags:\n" << *flags;
ts->Shutdown();
ASSERT_OK(ts->Restart());
}
workload->StopAndJoin();
// Ensure that the replicas converge.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(workload->table_name(),
ClusterVerifier::EXACTLY,
workload->rows_inserted() - workload->rows_deleted()));
}
void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
};
FLAGS_num_replicas = 3;
FLAGS_num_tablet_servers = 3;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
// Kill all the servers but one.
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(3, tservers.size());
// Elect server 2 as leader and wait for log index 1 to propagate to all servers.
ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
*replica_ts = tservers[0];
LOG(INFO) << "================================== Cluster setup complete.";
}
// Test that we can retrieve the permanent uuid of a server running
// consensus service via RPC.
TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
NO_FATALS(BuildAndStart());
RaftPeerPB peer;
TServerDetails* leader = nullptr;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
peer.mutable_last_known_addr()->CopyFrom(leader->registration.rpc_addresses(0));
const string expected_uuid = leader->instance_id.permanent_uuid();
rpc::MessengerBuilder builder("test builder");
builder.set_num_reactors(1);
std::shared_ptr<rpc::Messenger> messenger;
ASSERT_OK(builder.Build(&messenger));
auto resolver = std::make_shared<DnsResolver>();
ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(
messenger, resolver.get(), &peer));
ASSERT_EQ(expected_uuid, peer.permanent_uuid());
}
// TODO allow the scan to define an operation id, fetch the last id
// from the leader and then use that id to make the replica wait
// until it is done. This will avoid the sleeps below.
TEST_F(RaftConsensusITest, TestInsertAndMutateThroughConsensus) {
NO_FATALS(BuildAndStart());
int num_iters = AllowSlowTests() ? 10 : 1;
for (int i = 0; i < num_iters; i++) {
InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread,
FLAGS_client_inserts_per_thread,
FLAGS_client_num_batches_per_thread);
}
NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters));
}
TEST_F(RaftConsensusITest, TestFailedOp) {
NO_FATALS(BuildAndStart());
// Wait until we have a stable leader.
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
tablet_id_, 1));
WriteRequestPB req;
req.set_tablet_id(tablet_id_);
ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
RowOperationsPB* data = req.mutable_row_operations();
data->set_rows("some gibberish!");
WriteResponsePB resp;
RpcController controller;
controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
TServerDetails* leader = nullptr;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller));
ASSERT_TRUE(resp.has_error());
// Add a proper row so that we can verify that all of the replicas continue
// to process ops after a failure. Additionally, this allows us to wait for
// all of the replicas to finish processing ops before shutting down,
// avoiding a potential stall as we currently can't abort ops (see KUDU-341).
data->Clear();
AddTestRowToPB(RowOperationsPB::INSERT, schema_, 0, 0, "original0", data);
controller.Reset();
controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller));
SCOPED_TRACE(SecureShortDebugString(resp));
ASSERT_FALSE(resp.has_error());
NO_FATALS(AssertAllReplicasAgree(1));
}
// Inserts rows through consensus and also starts one delay injecting thread
// that steals consensus peer locks for a while. This is meant to test that
// even with timeouts and repeated requests consensus still works.
TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
NO_FATALS(BuildAndStart());
if (500 == FLAGS_client_inserts_per_thread) {
if (AllowSlowTests()) {
FLAGS_client_inserts_per_thread = FLAGS_client_inserts_per_thread * 10;
FLAGS_client_num_batches_per_thread = FLAGS_client_num_batches_per_thread * 10;
}
}
int num_threads = FLAGS_num_client_threads;
for (int i = 0; i < num_threads; i++) {
threads_.emplace_back([this, i]() {
this->InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread,
FLAGS_client_inserts_per_thread,
FLAGS_client_num_batches_per_thread);
});
}
for (int i = 0; i < FLAGS_num_replicas; i++) {
auto* ts = cluster_->tablet_server(i);
threads_.emplace_back([this, ts]() {
this->DelayInjectorThread(ts, kConsensusRpcTimeoutForTests);
});
}
for (auto& t : threads_) {
t.join();
}
NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
}
TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
NO_FATALS(BuildAndStart());
// Wait for the initial leader election to complete.
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
tablet_id_, 1));
// Manually construct a write RPC to a replica and make sure it responds
// with the correct error code.
WriteRequestPB req;
WriteResponsePB resp;
RpcController rpc;
req.set_tablet_id(tablet_id_);
ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
"hello world via RPC", req.mutable_row_operations());
// Get the leader.
vector<TServerDetails*> followers;
GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(resp.has_error());
Status s = StatusFromPB(resp.error().status());
EXPECT_TRUE(s.IsIllegalState());
ASSERT_STR_CONTAINS(s.ToString(),
"is not leader of this config: current role FOLLOWER");
// TODO(unknown): need to change the error code to be something like REPLICA_NOT_LEADER
// so that the client can properly handle this case! plumbing this is a little difficult
// so not addressing at the moment.
NO_FATALS(AssertAllReplicasAgree(0));
}
// Test that when a follower is stopped for a long time, the log cache
// properly evicts operations, but still allows the follower to catch
// up when it comes back.
//
// Also asserts that the other replicas retain logs for the stopped
// follower to catch up from.
TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
const vector<string> kTsFlags = {
"--log_cache_size_limit_mb=1",
"--consensus_max_batch_size_bytes=500000",
// Use short and synchronous rolls so that we can test log segment retention.
"--log_segment_size_mb=1",
"--log_async_preallocate_segments=false",
// Run the maintenance manager frequently and flush quickly,
// so that we don't have to wait long for GC.
"--maintenance_manager_polling_interval_ms=100",
"--flush_threshold_secs=3",
// We write 128KB cells in this test, so bump the limit.
"--max_cell_size_bytes=1000000",
// And disable WAL compression so the 128KB cells don't get compressed away.
"--log_compression_codec=no_compression"
};
NO_FATALS(BuildAndStart(kTsFlags));
TServerDetails* replica = (*tablet_replicas_.begin()).second;
ASSERT_TRUE(replica != nullptr);
ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
// Pause a replica
ASSERT_OK(replica_ets->Pause());
LOG(INFO)<< "Paused one of the replicas, starting to write.";
// Insert 5MB worth of data.
const int kNumWrites = 40;
NO_FATALS(Write128KOpsToLeader(kNumWrites));
// Sleep a bit to give the maintenance manager time to GC logs, if it were
// going to.
SleepFor(MonoDelta::FromSeconds(1));
// Check that the leader and non-paused follower have not GCed any logs (since
// the third peer needs them to catch up).
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
int num_wals = inspect_->CountFilesInWALDirForTS(i, tablet_id_, "wal-*");
if (cluster_->tablet_server(i) == replica_ets) {
ASSERT_EQ(1, num_wals) << "Replica should have only one segment";
} else {
ASSERT_EQ(6, num_wals)
<< "Other nodes should retain segments for the frozen replica to catch up";
}
}
// Now unpause the replica, the lagging replica should eventually catch back up.
ASSERT_OK(replica_ets->Resume());
NO_FATALS(AssertAllReplicasAgree(kNumWrites));
// Once the follower has caught up, all replicas should eventually GC the earlier
// log segments that they were retaining.
ASSERT_EVENTUALLY([&]() {
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
SCOPED_TRACE(Substitute("TS $0", i));
int num_wals = inspect_->CountFilesInWALDirForTS(i, tablet_id_, "wal-*");
ASSERT_EQ(1, num_wals);
}
});
}
// Test that the leader doesn't crash if one of its followers has
// fallen behind so far that the logs necessary to catch it up
// have been GCed.
//
// In a real cluster, this will eventually cause the follower to be
// evicted/replaced. In any case, the leader should not crash.
//
// We also ensure that, when the leader stops writing to the follower,
// the follower won't disturb the other nodes when it attempts to elect
// itself.
//
// This is a regression test for KUDU-775 and KUDU-562.
TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
vector<string> ts_flags = {
// Disable follower eviction to maintain the original intent of this test.
"--evict_failed_followers=false",
};
AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
NO_FATALS(BuildAndStart(ts_flags));
string leader_uuid;
int64_t orig_term;
string follower_uuid;
NO_FATALS(CauseFollowerToFallBehindLogGC(
tablet_servers_, &leader_uuid, &orig_term, &follower_uuid));
SCOPED_TRACE(Substitute("leader: $0 follower: $1", leader_uuid, follower_uuid));
// Wait for remaining majority to agree.
TabletServerMap active_tablet_servers = tablet_servers_;
ASSERT_EQ(3, active_tablet_servers.size());
ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, tablet_id_,
1));
if (AllowSlowTests()) {
// Sleep long enough that the "abandoned" server's leader election interval
// will trigger several times. Then, verify that the term has not increased
// on any of the servers.
// This ensures that the other servers properly reject the pre-election requests
// from the abandoned node, and that the abandoned node doesn't bump its term
// either, since that would cause spurious leader elections upon the node coming back
// to life.
SleepFor(MonoDelta::FromSeconds(5));
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ExternalTabletServer* ts = cluster_->tablet_server(i);
SCOPED_TRACE(ts->uuid());
int64_t term_from_metric = -1;
ASSERT_OK(GetTermMetricValue(ts, &term_from_metric));
ASSERT_EQ(term_from_metric, orig_term);
}
OpId op_id;
TServerDetails* leader = tablet_servers_[leader_uuid];
ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID,
MonoDelta::FromSeconds(10), &op_id));
ASSERT_EQ(orig_term, op_id.term())
<< "expected the leader to have not advanced terms but has op " << op_id;
}
}
// This test starts several tablet servers, and configures them with
// fault injection so that the leaders frequently crash just before
// sending RPCs to followers.
//
// This can result in various scenarios where leaders crash right after
// being elected and never succeed in replicating their first operation.
// For example, KUDU-783 reproduces from this test approximately 5% of the
// time on a slow-test debug build.
TEST_F(RaftConsensusITest, InsertUniqueKeysWithCrashyNodes) {
NO_FATALS(CreateClusterForCrashyNodesTests());
TestWorkload workload(cluster_.get());
workload.set_write_batch_size(1);
NO_FATALS(DoTestCrashyNodes(&workload, 100));
}
// The same crashy nodes test as above but inserts many duplicate keys.
// This emulates cases where there are many duplicate keys which, due to two phase
// locking, may cause deadlocks and other anomalies that cannot be observed when
// keys are unique.
TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) {
NO_FATALS(CreateClusterForCrashyNodesTests());
TestWorkload workload(cluster_.get());
workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
// Increase the number of rows per batch to get a higher chance of key collision.
workload.set_write_batch_size(3);
NO_FATALS(DoTestCrashyNodes(&workload, 300));
}
// The same crashy nodes test as above but the keys will be deleted after insertion.
TEST_F(RaftConsensusITest, InsertAndDeleteWithCrashyNodes) {
vector<string> extra_ts_flags = {
"--flush_threshold_mb=0",
"--flush_threshold_secs=1",
"--maintenance_manager_polling_interval_ms=10",
"--heartbeat_interval_ms=10",
"--update_tablet_stats_interval_ms=10",
};
NO_FATALS(CreateClusterForCrashyNodesTests(std::move(extra_ts_flags)));
// If the AllowSlowTests is true, test the scenario that deleting data on DRS.
// Otherwise, test deleting data on MRS.
int32_t write_interval_millis = 0;
int32_t write_batch_size = 5;
if (AllowSlowTests()) {
// Wait for MRS to be flushed.
write_interval_millis = 1000;
// Decrease the number of rows per batch to generate more DRSs.
write_batch_size = 1;
}
TestWorkload workload(cluster_.get());
workload.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS_WITH_DELETE);
workload.set_write_interval_millis(write_interval_millis);
workload.set_write_batch_size(write_batch_size);
NO_FATALS(DoTestCrashyNodes(&workload, 100));
}
TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
int kNumElections = FLAGS_num_replicas;
if (AllowSlowTests()) {
FLAGS_num_tablet_servers = 7;
FLAGS_num_replicas = 7;
kNumElections = 3 * FLAGS_num_replicas;
}
// Reset consensus rpc timeout to the default value or the election might fail often.
FLAGS_consensus_rpc_timeout_ms = 1000;
// Start a 7 node configuration cluster (since we can't bring leaders back we start with a
// higher replica count so that we kill more leaders).
NO_FATALS(BuildAndStart());
OverrideFlagForSlowTests(
"client_inserts_per_thread",
Substitute("$0", (FLAGS_client_inserts_per_thread * 100)));
OverrideFlagForSlowTests(
"client_num_batches_per_thread",
Substitute("$0", (FLAGS_client_num_batches_per_thread * 100)));
int num_threads = FLAGS_num_client_threads;
int total_num_rows = num_threads * FLAGS_client_inserts_per_thread;
// We create 2 * (kNumReplicas - 1) latches so that we kill the same node at least
// twice.
vector<unique_ptr<CountDownLatch>> latches;
latches.reserve(kNumElections);
for (int i = 1; i < kNumElections; i++) {
latches.emplace_back(new CountDownLatch((i * total_num_rows) / kNumElections));
}
for (int i = 0; i < num_threads; i++) {
threads_.emplace_back([this, i, &latches]() {
this->InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread,
FLAGS_client_inserts_per_thread,
FLAGS_client_num_batches_per_thread, latches);
});
}
for (const auto& latch : latches) {
NO_FATALS(cluster_->AssertNoCrashes());
latch->Wait();
StopOrKillLeaderAndElectNewOne();
}
for (auto& t : threads_) {
t.join();
}
NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
}
// Regression test for KUDU-597, an issue where we could mis-order operations on
// a machine if the following sequence occurred:
// 1) Replica is a FOLLOWER
// 2) A client request hits the machine
// 3) It receives some operations from the current leader
// 4) It gets elected LEADER
// In this scenario, it would incorrectly sequence the client request's PREPARE phase
// before the operations received in step (3), even though the correct behavior would be
// to either reject them or sequence them after those operations, because the operation
// index is higher.
//
// The test works by setting up three replicas and manually hammering them with write
// requests targeting a single row. If the bug exists, then OpOrderVerifier
// will trigger an assertion because the prepare order and the op indexes will become
// misaligned.
TEST_F(RaftConsensusITest, TestKUDU_597) {
FLAGS_num_replicas = 3;
FLAGS_num_tablet_servers = 3;
NO_FATALS(BuildAndStart());
AtomicBool finish(false);
for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
threads_.emplace_back([this, i, &finish]() {
this->StubbornlyWriteSameRowThread(i, &finish);
});
}
SCOPED_CLEANUP({
finish.Store(true);
for (auto& t : threads_) {
t.join();
}
});
const int num_loops = AllowSlowTests() ? 10 : 1;
for (int i = 0; i < num_loops; i++) {
StopOrKillLeaderAndElectNewOne();
SleepFor(MonoDelta::FromSeconds(1));
ASSERT_OK(CheckTabletServersAreAlive(FLAGS_num_tablet_servers));
}
}
// Regression test for KUDU-1775: when a replica is restarted, and the first
// request it receives from a leader results in a LMP mismatch error, the
// replica should still respond with the correct 'last_committed_idx'.
TEST_F(RaftConsensusITest, TestLMPMismatchOnRestartedReplica) {
TServerDetails* replica_ts;
NO_FATALS(SetupSingleReplicaTest(&replica_ts));
auto* replica_ets = cluster_->tablet_server_by_uuid(replica_ts->uuid());
ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get());
ConsensusRequestPB req;
ConsensusResponsePB resp;
RpcController rpc;
req.set_tablet_id(tablet_id_);
req.set_dest_uuid(replica_ts->uuid());
req.set_caller_uuid("fake_caller");
req.set_caller_term(2);
req.set_all_replicated_index(0);
req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
int64_t base_ts = GetTimestampOnServer(replica_ts);
// Send operations 2.1 through 2.3, committing through 2.2.
AddOp(MakeOpId(2, 1), base_ts, &req);
AddOp(MakeOpId(2, 2), base_ts, &req);
AddOp(MakeOpId(2, 3), base_ts, &req);
req.set_committed_index(2);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// The COMMIT messages end up in the WAL asynchronously, so loop reading the
// tablet server's WAL until it shows up.
int replica_idx = cluster_->tablet_server_index_by_uuid(replica_ets->uuid());
ASSERT_EVENTUALLY([&]() {
LogVerifier lv(cluster_.get());
OpId commit;
ASSERT_OK(lv.ScanForHighestCommittedOpIdInLog(replica_idx, tablet_id_, &commit));
ASSERT_EQ("2.2", OpIdToString(commit));
});
// Restart the replica.
replica_ets->Shutdown();
ASSERT_OK(replica_ets->Restart());
// Send an operation 3.4 with preceding OpId 3.3.
// We expect an LMP mismatch, since the replica has operation 2.3.
// We use 'ASSERT_EVENTUALLY' here because the replica
// may need a few retries while it's in BOOTSTRAPPING state.
req.set_caller_term(3);
req.mutable_preceding_id()->CopyFrom(MakeOpId(3, 3));
req.clear_ops();
AddOp(MakeOpId(3, 4), base_ts, &req);
ASSERT_EVENTUALLY([&]() {
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_EQ(resp.status().error().code(),
consensus::ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH)
<< SecureDebugString(resp);
});
SCOPED_TRACE(SecureDebugString(resp));
EXPECT_EQ(2, resp.status().last_committed_idx());
EXPECT_EQ("0.0", OpIdToString(resp.status().last_received_current_leader()));
// Even though the replica previously received operations through 2.3, the LMP mismatch
// above causes us to truncate operation 2.3, so 2.2 remains.
EXPECT_EQ("2.2", OpIdToString(resp.status().last_received()));
}
// Test a scenario where a replica has pending operations with lock
// dependencies on each other:
// 2.2: UPSERT row 1
// 2.3: UPSERT row 1
// 2.4: UPSERT row 1
// ...and a new leader tries to abort 2.4 in order to replace it with a new
// operation. Because the operations have a lock dependency, operation 2.4
// will be 'stuck' in the Prepare queue. This verifies that we can abort an
// operation even if it's stuck in the queue.
TEST_F(RaftConsensusITest, TestReplaceOperationStuckInPrepareQueue) {
TServerDetails* replica_ts;
NO_FATALS(SetupSingleReplicaTest(&replica_ts));
ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get());
ConsensusRequestPB req;
ConsensusResponsePB resp;
RpcController rpc;
req.set_tablet_id(tablet_id_);
req.set_dest_uuid(replica_ts->uuid());
req.set_caller_uuid("fake_caller");
req.set_caller_term(2);
req.set_all_replicated_index(0);
req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
int64_t base_ts = GetTimestampOnServer(replica_ts);
AddOpWithTypeAndKey(MakeOpId(2, 2), base_ts, RowOperationsPB::UPSERT, 1, &req);
AddOpWithTypeAndKey(MakeOpId(2, 3), base_ts, RowOperationsPB::UPSERT, 1, &req);
AddOpWithTypeAndKey(MakeOpId(2, 4), base_ts, RowOperationsPB::UPSERT, 1, &req);
req.set_committed_index(2);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// Replace operation 2.4 with 3.4, add 3.5 (upsert of a new key)
req.set_caller_term(3);
req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 3));
req.clear_ops();
AddOpWithTypeAndKey(MakeOpId(3, 4), base_ts, RowOperationsPB::UPSERT, 1, &req);
AddOpWithTypeAndKey(MakeOpId(3, 5), base_ts, RowOperationsPB::UPSERT, 2, &req);
rpc.Reset();
rpc.set_timeout(MonoDelta::FromSeconds(5));
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// Commit all ops.
req.clear_ops();
req.set_committed_index(5);
req.mutable_preceding_id()->CopyFrom(MakeOpId(3, 5));
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// Ensure we can read the data.
// We need to ASSERT_EVENTUALLY here because otherwise it's possible to read the old value
// of row '1', if the operation is still in flight.
ASSERT_EVENTUALLY([&]() {
vector<string> results;
NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results));
ASSERT_EQ("(int32 key=1, int32 int_val=3, string string_val=\"term: 3 index: 4\")",
results[0]);
ASSERT_EQ("(int32 key=2, int32 int_val=3, string string_val=\"term: 3 index: 5\")",
results[1]);
});
}
// Regression test for KUDU-644:
// Triggers some complicated scenarios on the replica involving aborting and
// replacing ops.
TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
TServerDetails* replica_ts;
NO_FATALS(SetupSingleReplicaTest(&replica_ts));
// Check that the 'term' metric is correctly exposed.
{
int64_t term_from_metric = -1;
ASSERT_OK(GetTermMetricValue(cluster_->tablet_server_by_uuid(replica_ts->uuid()),
&term_from_metric));
ASSERT_EQ(term_from_metric, 1);
}
ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get());
ConsensusRequestPB req;
ConsensusResponsePB resp;
RpcController rpc;
// Send a simple request with no ops.
req.set_tablet_id(tablet_id_);
req.set_dest_uuid(replica_ts->uuid());
req.set_caller_uuid("fake_caller");
req.set_caller_term(2);
req.set_all_replicated_index(0);
req.set_committed_index(1);
req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// Send some operations, but don't advance the commit index.
// They should not commit.
int64_t base_ts = GetTimestampOnServer(replica_ts);
AddOp(MakeOpId(2, 2), base_ts, &req);
AddOp(MakeOpId(2, 3), base_ts, &req);
AddOp(MakeOpId(2, 4), base_ts, &req);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// We shouldn't read anything yet, because the ops should be pending.
{
vector<string> results;
NO_FATALS(ScanReplica(replica_ts->tserver_proxy.get(), &results));
ASSERT_EQ(0, results.size()) << results;
}
// Send op 2.6, but set preceding OpId to 2.4. This is an invalid
// request, and the replica should reject it.
req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
req.clear_ops();
AddOp(MakeOpId(2, 6), base_ts, &req);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp);
ASSERT_EQ(resp.error().status().message(),
"New operation's index does not follow the previous op's index. "
"Current: 2.6. Previous: 2.4");
resp.Clear();
req.clear_ops();
// Send ops 3.5 and 2.6, then commit up to index 6, the replica
// should fail because of the out-of-order terms.
req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
AddOp(MakeOpId(3, 5), base_ts, &req);
AddOp(MakeOpId(2, 6), base_ts, &req);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp);
ASSERT_EQ(resp.error().status().message(),
"New operation's term is not >= than the previous op's term."
" Current: 2.6. Previous: 3.5");
// Regression test for KUDU-639: if we send a valid request, but the
// current commit index is higher than the data we're sending, we shouldn't
// commit anything higher than the last op sent by the leader.
//
// To test, we re-send operation 2.3, with the correct preceding ID 2.2,
// but we set the committed index to 2.4. This should only commit
// 2.2 and 2.3.
resp.Clear();
req.clear_ops();
req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2));
AddOp(MakeOpId(2, 3), base_ts, &req);
req.set_committed_index(4);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// Verify only 2.2 and 2.3 are committed.
{
vector<string> results;
NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results));
ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2");
ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3");
}
resp.Clear();
req.clear_ops();
// Now send some more ops, and commit the earlier ones.
req.set_committed_index(4);
req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
AddOp(MakeOpId(2, 5), base_ts, &req);
AddOp(MakeOpId(2, 6), base_ts, &req);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
// Verify they are committed.
{
vector<string> results;
NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 3, &results));
ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2");
ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3");
ASSERT_STR_CONTAINS(results[2], "term: 2 index: 4");
}
// At this point, we still have two operations which aren't committed. If we
// try to perform a snapshot-consistent scan, we should time out rather than
// hanging the RPC service thread.
{
ScanRequestPB req;
ScanResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(100));
NewScanRequestPB* scan = req.mutable_new_scan_request();
scan->set_tablet_id(tablet_id_);
scan->set_read_mode(READ_AT_SNAPSHOT);
ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
// Send the call. We expect to get a timeout passed back from the server side
// (i.e. not an RPC timeout)
req.set_batch_size_bytes(0);
SCOPED_TRACE(SecureDebugString(req));
ASSERT_OK(replica_ts->tserver_proxy->Scan(req, &resp, &rpc));
SCOPED_TRACE(SecureDebugString(resp));
string err_str = StatusFromPB(resp.error().status()).ToString();
ASSERT_STR_CONTAINS(err_str, "Timed out waiting for ts:");
ASSERT_STR_CONTAINS(err_str, "to be safe");
}
resp.Clear();
req.clear_ops();
int leader_term = 2;
const int kNumTerms = AllowSlowTests() ? 10000 : 100;
while (leader_term < kNumTerms) {
leader_term++;
// Now pretend to be a new leader (term 3) and replace the earlier ops
// without committing the new replacements.
req.set_caller_term(leader_term);
req.set_caller_uuid("new_leader");
req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
req.clear_ops();
AddOp(MakeOpId(leader_term, 5), base_ts, &req);
AddOp(MakeOpId(leader_term, 6), base_ts, &req);
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << "Req: " << SecureShortDebugString(req)
<< " Resp: " << SecureDebugString(resp);
}
// Send an empty request from the newest term which should commit
// the earlier ops.
{
req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6));
req.set_committed_index(6);
req.clear_ops();
rpc.Reset();
ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
}
// Verify the new rows are committed.
{
vector<string> results;
NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 5, &results));
SCOPED_TRACE(results);
ASSERT_STR_CONTAINS(results[3], Substitute("term: $0 index: 5", leader_term));
ASSERT_STR_CONTAINS(results[4], Substitute("term: $0 index: 6", leader_term));
}
}
// Basic test of adding and removing servers from a configuration.
TEST_F(RaftConsensusITest, TestAddRemoveServer) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
"--master_add_server_when_underreplicated=false",
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
// Elect server 0 as leader and wait for log index 1 to propagate to all servers.
TServerDetails* leader_tserver = tservers[0];
const string& leader_uuid = tservers[0]->uuid();
ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, kTimeout));
// Make sure the server rejects removal of itself from the configuration.
Status s = RemoveServer(leader_tserver, tablet_id_, leader_tserver, kTimeout);
ASSERT_TRUE(s.IsInvalidArgument()) << "Should not be able to remove self from config: "
<< s.ToString();
// Insert the row that we will update throughout the test.
ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_, RowOperationsPB::INSERT,
kTestRowKey, kTestRowIntVal, "initial insert", kTimeout));
// Kill the master, so we can change the config without interference.
cluster_->master()->Shutdown();
TabletServerMap active_tablet_servers = tablet_servers_;
// Do majority correctness check for 3 servers.
NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
OpId opid;
ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout,
&opid));
int64_t cur_log_index = opid.index();
// Go from 3 tablet servers down to 1 in the configuration.
vector<int> remove_list = { 2, 1 };
for (int to_remove_idx : remove_list) {
int num_servers = active_tablet_servers.size();
LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas";
TServerDetails* tserver_to_remove = tservers[to_remove_idx];
LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid();
ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, kTimeout));
ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid()));
ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
// Do majority correctness check for each incremental decrease.
NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout,
&opid));
cur_log_index = opid.index();
}
// Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration.
vector<int> add_list = { 1, 2 };
for (int to_add_idx : add_list) {
int num_servers = active_tablet_servers.size();
LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas";
TServerDetails* tserver_to_add = tservers[to_add_idx];
LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add,
RaftPeerPB::VOTER, kTimeout));
InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add);
ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
// Do majority correctness check for each incremental increase.
NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout,
&opid));
cur_log_index = opid.index();
}
}
// Regression test for KUDU-1169: a crash when a Config Change operation is replaced
// by a later leader.
TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
"--master_add_server_when_underreplicated=false",
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
// Elect server 0 as leader and wait for log index 1 to propagate to all servers.
TServerDetails* leader_tserver = tservers[0];
TabletServerMap original_followers = tablet_servers_;
ASSERT_EQ(1, original_followers.erase(leader_tserver->uuid()));
ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
// Shut down servers 1 and 2, so that server 0 can't replicate anything.
cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
// Now try to replicate a ChangeConfig operation. This should get stuck and time out
// because the server can't replicate any operations.
Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1],
MonoDelta::FromSeconds(1), -1);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// Pause the leader, and restart the other servers.
ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Pause());
ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Restart());
ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Restart());
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), original_followers, tablet_id_, 1));
// Elect one of the other servers.
ASSERT_OK(StartElection(tservers[1], tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitUntilLeader(tservers[1], tablet_id_, MonoDelta::FromSeconds(10)));
leader_tserver = tservers[1];
// Resume the original leader. Its change-config operation will now be aborted
// since it was never replicated to the majority, and the new leader will have
// replaced the operation.
ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Resume());
// Insert some data and verify that it propagates to all servers.
NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1));
NO_FATALS(AssertAllReplicasAgree(10));
// Try another config change.
// This acts as a regression test for KUDU-1338, in which aborting the original
// config change didn't properly unset the 'pending' configuration.
ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tservers[2],
MonoDelta::FromSeconds(5), -1));
NO_FATALS(InsertTestRowsRemoteThread(10, 10, 1));
}
// Test the atomic CAS arguments to ChangeConfig() add server and remove server.
TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) {
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
"--master_add_server_when_underreplicated=false",
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
// Elect server 0 as leader and wait for log index 1 to propagate to all servers.
TServerDetails* leader_tserver = tservers[0];
ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_,
MonoDelta::FromSeconds(10)));
int64_t cur_log_index = 1;
TabletServerMap active_tablet_servers = tablet_servers_;
TServerDetails* follower_ts = tservers[2];
// Initial committed config should have opid_index == -1.
// Server should reject request to change config from opid other than this.
TabletServerErrorPB::Code error_code;
Status s = RemoveServer(leader_tserver, tablet_id_, follower_ts,
MonoDelta::FromSeconds(10), 7, &error_code);
ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code);
ASSERT_STR_CONTAINS(s.ToString(), "of 7 but the committed config has opid_index of -1");
// Specifying the correct committed opid index should work.
int64_t committed_opid_index = -1;
ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, follower_ts,
MonoDelta::FromSeconds(10), committed_opid_index));
ASSERT_EQ(1, active_tablet_servers.erase(follower_ts->uuid()));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
active_tablet_servers, tablet_id_, ++cur_log_index));
// Now, add the server back. Again, specifying something other than the
// latest committed_opid_index should fail.
s = AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER,
MonoDelta::FromSeconds(10), {}, -1, &error_code);
ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code);
ASSERT_STR_CONTAINS(s.ToString(), "of -1 but the committed config has opid_index of 2");
// Specifying the correct committed opid index should work.
// The previous config change op is the latest entry in the log.
committed_opid_index = cur_log_index;
ASSERT_OK(AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER,
MonoDelta::FromSeconds(10), {}, committed_opid_index));
InsertOrDie(&active_tablet_servers, follower_ts->uuid(), follower_ts);
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
active_tablet_servers, tablet_id_, ++cur_log_index));
}
// Writes test rows in ascending order to a single tablet server.
// Essentially a poor-man's version of TestWorkload that only operates on a
// single tablet. Does not batch, does not tolerate timeouts, and does not
// interact with the Master. 'rows_inserted' is used to determine row id and is
// incremented prior to each successful insert. Since a write failure results in
// a crash, as long as there is no crash then 'rows_inserted' will have a
// correct count at the end of the run.
// Crashes on any failure, so 'write_timeout' should be high.
static void DoWriteTestRows(const TServerDetails* leader_tserver,
const string& tablet_id,
const MonoDelta& write_timeout,
AtomicInt<int32_t>* rows_inserted,
const AtomicBool* finish) {
while (!finish->Load()) {
int row_key = rows_inserted->Increment();
CHECK_OK(WriteSimpleTestRow(leader_tserver, tablet_id, RowOperationsPB::INSERT,
row_key, row_key, Substitute("key=$0", row_key),
write_timeout));
}
}
// Test that config change works while running a workload.
TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) {
const auto kTimeout = MonoDelta::FromSeconds(10);
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
"--master_add_server_when_underreplicated=false",
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 3;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
// Elect server 0 as leader and wait for log index 1 to propagate to all servers.
TServerDetails* leader_tserver = tservers[0];
ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout));
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
ASSERT_OK(WaitForOpFromCurrentTerm(leader_tserver, tablet_id_,
consensus::COMMITTED_OPID, kTimeout));
// Start a write workload.
LOG(INFO) << "Starting write workload...";
vector<thread> threads;
threads.reserve(FLAGS_num_client_threads);
AtomicInt<int32_t> rows_inserted(0);
AtomicBool finish(false);
for (auto i = 0; i < FLAGS_num_client_threads; i++) {
threads.emplace_back([this, leader_tserver, kTimeout, &rows_inserted, &finish]() {
DoWriteTestRows(leader_tserver, this->tablet_id_, kTimeout, &rows_inserted, &finish);
});
}
auto thread_join_func = [&]() {
for (auto& t : threads) {
t.join();
}
};
auto thread_joiner = MakeScopedCleanup(thread_join_func);
// The scenario modifies the Raft configuration while adding and removing
// tablet replicas. To successfully modify the configuration, the leader has
// to commit at least one operation on its current term.
ASSERT_OK(WaitForOpFromCurrentTerm(leader_tserver, tablet_id_,
consensus::COMMITTED_OPID, kTimeout));
TabletServerMap active_tablet_servers = tablet_servers_;
LOG(INFO) << "Removing servers...";
// Go from 3 tablet servers down to 1 in the configuration.
vector<int> remove_list = { 2, 1 };
for (int to_remove_idx : remove_list) {
int num_servers = active_tablet_servers.size();
LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas";
TServerDetails* tserver_to_remove = tservers[to_remove_idx];
LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid();
ASSERT_OK(WaitForLeaderWithCommittedOp(tablet_id_, kTimeout, &leader_tserver));
ASSERT_OK(RemoveServer(
leader_tserver, tablet_id_, tserver_to_remove, kTimeout));
ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid()));
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
leader_tserver, tablet_id_,
kTimeout));
}
LOG(INFO) << "Adding servers...";
// Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration.
vector<int> add_list = { 1, 2 };
for (int to_add_idx : add_list) {
int num_servers = active_tablet_servers.size();
LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas";
TServerDetails* tserver_to_add = tservers[to_add_idx];
LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
ASSERT_OK(WaitForLeaderWithCommittedOp(tablet_id_, kTimeout, &leader_tserver));
ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add,
RaftPeerPB::VOTER, kTimeout));
InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add);
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
leader_tserver, tablet_id_,
kTimeout));
}
LOG(INFO) << "Joining writer threads...";
finish.Store(true);
thread_join_func();
thread_joiner.cancel();
LOG(INFO) << "Waiting for replicas to agree...";
// Wait for all servers to replicate everything up through the last write op.
// Since we don't batch, there should be at least # rows inserted log entries,
// plus the initial leader's no-op, plus 2 for the removed servers, plus 2 for
// the added servers for a total of 5.
int min_log_index = rows_inserted.Load() + 5;
ASSERT_OK(WaitForServersToAgree(
kTimeout, active_tablet_servers, tablet_id_, min_log_index));
LOG(INFO) << "Number of rows inserted: " << rows_inserted.Load();
NO_FATALS(AssertAllReplicasAgree(rows_inserted.Load()));
}
TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
MonoDelta timeout = MonoDelta::FromSeconds(30);
const vector<string> kMasterFlags = {
"--master_add_server_when_underreplicated=false",
"--allow_unsafe_replication_factor=true",
// If running with the 3-4-3 replication scheme, the catalog manager removes
// excess replicas, so it's necessary to disable that default behavior
// since we want the newly added replica to stay.
"--catalog_manager_evict_excess_replicas=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 2;
NO_FATALS(BuildAndStart({}, kMasterFlags));
LOG(INFO) << "Finding tablet leader and waiting for things to start...";
string tablet_id = tablet_replicas_.begin()->first;
// Determine the list of tablet servers currently in the config.
TabletServerMap active_tablet_servers;
for (itest::TabletReplicaMap::const_iterator iter = tablet_replicas_.find(tablet_id);
iter != tablet_replicas_.end(); ++iter) {
InsertOrDie(&active_tablet_servers, iter->second->uuid(), iter->second);
}
// Determine the server to add to the config.
string uuid_to_add;
for (const TabletServerMap::value_type& entry : tablet_servers_) {
if (!ContainsKey(active_tablet_servers, entry.second->uuid())) {
uuid_to_add = entry.second->uuid();
}
}
ASSERT_FALSE(uuid_to_add.empty());
// Get a baseline config reported to the master.
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::GetTabletLocationsResponsePB tablet_locations;
bool has_leader;
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
2, tablet_id, timeout,
WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id, timeout, &leader_ts));
ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id, timeout));
// Change the config.
TServerDetails* tserver_to_add = tablet_servers_[uuid_to_add];
LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
ASSERT_OK(AddServer(leader_ts, tablet_id_, tserver_to_add, RaftPeerPB::VOTER,
timeout));
ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 2));
// Wait for the master to be notified of the config change.
// It should continue to have the same leader, even without waiting.
LOG(INFO) << "Waiting for Master to see config change...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
3, tablet_id, timeout,
DONT_WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
// Change the config again.
LOG(INFO) << "Removing tserver with uuid " << tserver_to_add->uuid();
ASSERT_OK(RemoveServer(leader_ts, tablet_id_, tserver_to_add, timeout));
active_tablet_servers = tablet_servers_;
ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_add->uuid()));
ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 3));
// Wait for the master to be notified of the removal.
LOG(INFO) << "Waiting for Master to see config change...";
ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
2, tablet_id, timeout,
DONT_WAIT_FOR_LEADER, VOTER_REPLICA,
&has_leader, &tablet_locations));
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
}
// Test that even with memory pressure, a replica will still commit pending
// operations that the leader has committed.
TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
// Enough operations to put us over our memory limit (defined below).
const int kNumOps = 10000;
// Set up a 3-node configuration with only one live follower so that we can
// manipulate it directly via RPC.
const vector<string> kTsFlags = {
// Very low memory limit to ease testing.
// When using tcmalloc, we set it to 30MB, since we can get accurate process memory
// usage statistics. Otherwise, set to only 4MB, since we'll only be throttling based
// on our tracked memory.
// Since part of the point of the test is to have memory pressure, don't
// insist the block cache capacity is small compared to the memory pressure
// threshold.
"--force_block_cache_capacity",
#ifdef TCMALLOC_ENABLED
"--memory_limit_hard_bytes=30000000",
#else
"--memory_limit_hard_bytes=4194304",
#endif
"--enable_leader_failure_detection=false",
// Don't let op memory tracking get in the way.
"--tablet_transaction_memory_limit_mb=-1",
};
// If failure detection were on, a follower could be elected as leader after
// we kill the leader below.
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
// Elect server 2 as leader, then kill it and server 1, leaving behind
// server 0 as the sole follower.
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(3, tservers.size());
ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
TServerDetails *replica_ts = tservers[0];
cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
// Pretend to be the leader and send a request to replicate some operations.
ConsensusRequestPB req;
ConsensusResponsePB resp;
RpcController rpc;
req.set_dest_uuid(replica_ts->uuid());
req.set_tablet_id(tablet_id_);
req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid());
req.set_caller_term(1);
req.set_committed_index(1);
req.set_all_replicated_index(0);
req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
int64_t base_ts = GetTimestampOnServer(replica_ts);
for (int i = 0; i < kNumOps; i++) {
AddOp(MakeOpId(1, 2 + i), base_ts, &req);
}
OpId last_opid = MakeOpId(1, 2 + kNumOps - 1);
ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc));
// At the time that the follower received our request it was still under the
// tiny memory limit defined above, so the request should have succeeded.
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
ASSERT_TRUE(resp.has_status());
ASSERT_TRUE(resp.status().has_last_committed_idx());
ASSERT_EQ(last_opid.index(), resp.status().last_received().index());
ASSERT_EQ(1, resp.status().last_committed_idx());
// But no operations have been applied yet; there should be no data.
vector<string> rows;
WaitForRowCount(replica_ts->tserver_proxy.get(), 0, &rows);
// Try again, but this time:
// 1. Replicate just one new operation.
// 2. Tell the follower that the previous set of operations were committed.
req.mutable_preceding_id()->CopyFrom(last_opid);
req.set_committed_index(last_opid.index());
req.mutable_ops()->Clear();
AddOp(MakeOpId(1, last_opid.index() + 1), base_ts, &req);
rpc.Reset();
Status s = replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc);
// Our memory limit was truly tiny, so we should be over it by now...
ASSERT_TRUE(s.IsRemoteError());
ASSERT_STR_CONTAINS(s.ToString(), "Soft memory limit exceeded");
// ...but despite rejecting the request, we should have committed the
// previous set of operations. That is, we should be able to see those rows.
WaitForRowCount(replica_ts->tserver_proxy.get(), kNumOps, &rows);
}
// Test that we can create (vivify) a new tablet via tablet copy.
TEST_F(RaftConsensusITest, TestAutoCreateReplica) {
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
"--log_cache_size_limit_mb=1",
"--log_segment_size_mb=1",
"--log_async_preallocate_segments=false",
"--flush_threshold_mb=1",
"--maintenance_manager_polling_interval_ms=300",
};
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
"--allow_unsafe_replication_factor=true",
// If running with the 3-4-3 replication scheme, the catalog manager removes
// excess replicas, so it's necessary to disable that default behavior
// since we want the newly added replica to stay.
"--catalog_manager_evict_excess_replicas=false",
};
FLAGS_num_tablet_servers = 3;
FLAGS_num_replicas = 2;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
// 50K is enough to cause flushes & log rolls.
const int num_rows_to_write = AllowSlowTests() ? 150000 : 50000;
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
TabletServerMap active_tablet_servers;
TabletServerMap::const_iterator iter = tablet_replicas_.find(tablet_id_);
TServerDetails* leader = iter->second;
TServerDetails* follower = (++iter)->second;
InsertOrDie(&active_tablet_servers, leader->uuid(), leader);
InsertOrDie(&active_tablet_servers, follower->uuid(), follower);
TServerDetails* new_node = nullptr;
for (TServerDetails* ts : tservers) {
if (!ContainsKey(active_tablet_servers, ts->uuid())) {
new_node = ts;
break;
}
}
ASSERT_TRUE(new_node != nullptr);
// Elect the leader (still only a consensus config size of 2).
ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers,
tablet_id_, 1));
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_num_replicas(FLAGS_num_replicas);
workload.set_num_write_threads(10);
workload.set_num_read_threads(2);
workload.set_write_batch_size(100);
workload.Setup();
LOG(INFO) << "Starting write workload...";
workload.Start();
while (true) {
int rows_inserted = workload.rows_inserted();
if (rows_inserted >= num_rows_to_write) {
break;
}
LOG(INFO) << "Only inserted " << rows_inserted << " rows so far, sleeping for 100ms";
SleepFor(MonoDelta::FromMilliseconds(100));
}
LOG(INFO) << "Adding tserver with uuid " << new_node->uuid() << " as VOTER...";
ASSERT_OK(AddServer(leader, tablet_id_, new_node, RaftPeerPB::VOTER,
MonoDelta::FromSeconds(10)));
InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node);
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
leader, tablet_id_,
MonoDelta::FromSeconds(10)));
workload.StopAndJoin();
int num_batches = workload.batches_completed();
LOG(INFO) << "Waiting for replicas to agree...";
// Wait for all servers to replicate everything up through the last write op.
// Since we don't batch, there should be at least # rows inserted log entries,
// plus the initial leader's no-op, plus 1 for
// the added replica for a total == #rows + 2.
int min_log_index = num_batches + 2;
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(120),
active_tablet_servers, tablet_id_,
min_log_index));
int rows_inserted = workload.rows_inserted();
LOG(INFO) << "Number of rows inserted: " << rows_inserted;
NO_FATALS(AssertAllReplicasAgree(rows_inserted));
}
TEST_F(RaftConsensusITest, TestMemoryRemainsConstantDespiteTwoDeadFollowers) {
const int64_t kMinRejections = 100;
const MonoDelta kMaxWaitTime = MonoDelta::FromSeconds(60);
NO_FATALS(BuildAndStart({
// Start the cluster with a low per-tablet op memory limit, so that the
// test can complete faster.
"--tablet_transaction_memory_limit_mb=2",
// Make the validator of 'RPC vs op memory size' happy.
"--rpc_max_message_size=2097152",
}));
// Kill both followers.
TServerDetails* details;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &details));
int num_shutdown = 0;
int leader_ts_idx = -1;
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ExternalTabletServer* ts = cluster_->tablet_server(i);
if (ts->instance_id().permanent_uuid() != details->uuid()) {
ts->Shutdown();
num_shutdown++;
} else {
leader_ts_idx = i;
}
}
ASSERT_EQ(2, num_shutdown);
ASSERT_NE(-1, leader_ts_idx);
// Because the majority of the cluster is dead and because of this workload's
// timeout behavior, more and more wedged ops will accumulate in the leader.
// To prevent memory usage from skyrocketing, the leader will eventually
// reject new ops. That's what we're testing for here.
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_timeout_allowed(true);
workload.set_write_timeout_millis(150);
workload.set_payload_bytes(30000);
workload.Setup();
workload.Start();
// Run until the leader has rejected several ops.
MonoTime deadline = MonoTime::Now() + kMaxWaitTime;
while (true) {
int64_t num_rejections = 0;
ASSERT_OK(GetInt64Metric(
cluster_->tablet_server(leader_ts_idx)->bound_http_hostport(),
&METRIC_ENTITY_tablet,
nullptr,
&METRIC_transaction_memory_pressure_rejections,
"value",
&num_rejections));
if (num_rejections >= kMinRejections) {
break;
} else if (deadline < MonoTime::Now()) {
FAIL() << "Ran for " << kMaxWaitTime.ToString() << ", deadline expired";
}
SleepFor(MonoDelta::FromMilliseconds(200));
}
}
static void EnableLogLatency(server::GenericServiceProxy* proxy) {
const unordered_map<string, string> kFlags = {
{ "log_inject_latency", "true" },
{ "log_inject_latency_ms_mean", "1000" },
};
for (const auto& e : kFlags) {
SetFlagRequestPB req;
req.set_flag(e.first);
req.set_value(e.second);
SetFlagResponsePB resp;
RpcController rpc;
ASSERT_OK(proxy->SetFlag(req, &resp, &rpc));
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_EQ(SetFlagResponsePB::SUCCESS, resp.result());
}
}
// Run a regular workload with a leader that's writing to its WAL slowly.
TEST_F(RaftConsensusITest, TestSlowLeader) {
if (!AllowSlowTests()) return;
NO_FATALS(BuildAndStart());
TServerDetails* leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
NO_FATALS(EnableLogLatency(leader->generic_proxy.get()));
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_num_read_threads(2);
workload.Setup();
workload.Start();
SleepFor(MonoDelta::FromSeconds(60));
}
// Test write batches just below the maximum limit.
TEST_F(RaftConsensusITest, TestLargeBatches) {
const vector<string> kTsFlags = {
// We write 128KB cells in this test, so bump the limit, and disable compression.
"--max_cell_size_bytes=1000000",
"--log_segment_size_mb=1",
"--log_compression_codec=no_compression",
"--log_min_segments_to_retain=100", // disable GC of logs.
};
NO_FATALS(BuildAndStart(kTsFlags));
const int64_t kBatchSize = 40; // Write 40 * 128kb = 5MB per batch.
const int64_t kNumBatchesToWrite = 100;
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB.
workload.set_write_batch_size(kBatchSize);
workload.set_num_write_threads(1);
workload.Setup();
workload.Start();
LOG(INFO) << "Waiting until we've written enough data...";
while (workload.rows_inserted() < kBatchSize * kNumBatchesToWrite) {
SleepFor(MonoDelta::FromMilliseconds(100));
}
workload.StopAndJoin();
// Verify replication.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(workload.table_name(),
ClusterVerifier::EXACTLY,
workload.rows_inserted()));
int num_wals = inspect_->CountFilesInWALDirForTS(0, tablet_id_, "wal-*");
int num_batches = workload.rows_inserted() / kBatchSize;
// The number of WALs should be similar to 'num_batches'. We can't make
// an exact assertion because async preallocation may take a small amount
// of time, in which case it's possible to put more than one batch in a
// single WAL.
ASSERT_GE(num_wals, num_batches / 2);
ASSERT_LE(num_wals, num_batches + 2);
}
// Regression test for KUDU-1469, a case in which a leader and follower could get "stuck"
// in a tight RPC loop, in which the leader would repeatedly send a batch of ops that the
// follower already had, the follower would fully de-dupe them, and yet the leader would
// never advance to the next batch.
//
// The 'perfect storm' reproduced here consists of:
// - the commit index has fallen far behind due to a slow log on the leader
// and one of the three replicas being inaccessible
// - the other replica elects itself
// - before the old leader notices it has been ousted, it writes at least one more
// operation to its local log.
// - before the replica can replicate anything to the old leader, it receives
// more writes, such that the first batch's preceding_op_id is ahead of
// the old leader's last written
//
// See the detailed comments below for more details.
TEST_F(RaftConsensusITest, TestCommitIndexFarBehindAfterLeaderElection) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
if (!AllowSlowTests()) return;
// Set the batch size low so that, after the new leader takes
// over below, the ops required to catch up from the committed index
// to the newly replicated index don't fit into a single batch.
NO_FATALS(BuildAndStart({"--consensus_max_batch_size_bytes=50000"}));
// Get the leader and the two replica tablet servers.
// These will have the following roles in this test:
// 1) 'first_leader_ts' is the initial leader.
// 2) 'second_leader_ts' will be forced to be elected as the second leader
// 3) 'only_vote_ts' will simulate a heavily overloaded (or corrupted) TS
// which is far enough behind (or failed) such that it only participates
// by voting.
TServerDetails* leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
ExternalTabletServer* first_leader_ts = cluster_->tablet_server_by_uuid(leader->uuid());
ExternalTabletServer* second_leader_ts = nullptr;
ExternalTabletServer* only_vote_ts = nullptr;
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ExternalTabletServer* ts = cluster_->tablet_server(i);
if (ts->instance_id().permanent_uuid() != leader->uuid()) {
if (second_leader_ts == nullptr) {
second_leader_ts = ts;
} else {
only_vote_ts = ts;
}
}
}
// The 'only_vote' tablet server doesn't participate in replication.
ASSERT_OK(cluster_->SetFlag(only_vote_ts, "follower_reject_update_consensus_requests", "true"));
// Inject a long delay in the log of the first leader, and write 10 operations.
// This delay ensures that it will replicate them to both itself and its follower,
// but due to its log sync not completing, it won't know that it is safe to advance its
// commit index until long after it has lost its leadership.
ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency_ms_mean", "6000"));
ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency", "true"));
InsertPayloadIgnoreErrors(0, 10, 10000);
// Write one more operation to the leader, but disable consensus on the follower so that
// it doesn't get replicated.
ASSERT_OK(cluster_->SetFlag(
second_leader_ts, "follower_reject_update_consensus_requests", "true"));
InsertPayloadIgnoreErrors(10, 1, 10000);
// Pause the initial leader and wait for the replica to elect itself. The third TS participates
// here by voting.
ASSERT_OK(first_leader_ts->Pause());
ASSERT_OK(WaitUntilLeader(tablet_servers_[second_leader_ts->uuid()], tablet_id_, kTimeout));
// The voter TS has done its duty. Shut it down to avoid log spam where it tries to run
// elections.
only_vote_ts->Shutdown();
// Perform one insert on the new leader. The new leader has not yet replicated its NO_OP to
// the old leader, since the old leader is still paused.
NO_FATALS(CreateClient(&client_));
InsertPayloadIgnoreErrors(13, 1, 10000);
// Now we expect to have the following logs:
//
// first_leader_ts second_leader_ts
// ------------------- ------------
// 1.1 NO_OP 1.1 NO_OP
// 1.2 WRITE_OP 1.2 WRITE_OP
// ................................
// 1.11 WRITE_OP 1.11 WRITE_OP
// 1.12 WRITE_OP 2.12 NO_OP
// 2.13 WRITE_OP
//
// Both servers should have a committed_idx of 1.1 since the log was delayed.
// Now, when we resume the original leader, we expect them to recover properly.
// Previously this triggered KUDU-1469.
ASSERT_OK(first_leader_ts->Resume());
TabletServerMap active_tservers = tablet_servers_;
active_tservers.erase(only_vote_ts->uuid());
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60),
active_tservers,
tablet_id_, 13));
}
// Run a regular workload with one follower that's writing to its WAL slowly.
TEST_F(RaftConsensusITest, TestSlowFollower) {
if (!AllowSlowTests()) return;
NO_FATALS(BuildAndStart());
TServerDetails* leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
int num_reconfigured = 0;
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
ExternalTabletServer* ts = cluster_->tablet_server(i);
if (ts->instance_id().permanent_uuid() != leader->uuid()) {
TServerDetails* follower;
follower = GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid());
ASSERT_TRUE(follower);
NO_FATALS(EnableLogLatency(follower->generic_proxy.get()));
num_reconfigured++;
break;
}
}
ASSERT_EQ(1, num_reconfigured);
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_num_read_threads(2);
workload.Setup();
workload.Start();
SleepFor(MonoDelta::FromSeconds(60));
}
// Run a special workload that constantly updates a single row on a cluster
// where every replica is writing to its WAL slowly.
TEST_F(RaftConsensusITest, TestHammerOneRow) {
if (!AllowSlowTests()) return;
NO_FATALS(BuildAndStart());
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
const ExternalTabletServer* ts = cluster_->tablet_server(i);
const TServerDetails* replica = GetReplicaWithUuidOrNull(
tablet_id_, ts->instance_id().permanent_uuid());
ASSERT_NE(nullptr, replica);
NO_FATALS(EnableLogLatency(replica->generic_proxy.get()));
}
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_write_pattern(TestWorkload::UPDATE_ONE_ROW);
workload.set_write_timeout_millis(60000);
workload.set_num_write_threads(20);
workload.Setup();
workload.Start();
SleepFor(MonoDelta::FromSeconds(60));
workload.StopAndJoin();
// Ensure that the replicas converge.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
}
// Test that followers that fall behind the leader's log GC threshold are
// evicted from the config.
TEST_F(RaftConsensusITest, TestEvictAbandonedFollowers) {
const vector<string> master_flags = {
"--master_add_server_when_underreplicated=false",
// This test is specific for the 3-2-3 replica management scheme.
"--raft_prepare_replacement_before_eviction=false",
};
vector<string> ts_flags = {
// This test is specific for the 3-2-3 replica management scheme.
"--raft_prepare_replacement_before_eviction=false",
};
AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
NO_FATALS(BuildAndStart(ts_flags, master_flags));
MonoDelta timeout = MonoDelta::FromSeconds(30);
TabletServerMap active_tablet_servers = tablet_servers_;
ASSERT_EQ(3, active_tablet_servers.size());
string leader_uuid;
int64_t orig_term;
string follower_uuid;
NO_FATALS(CauseFollowerToFallBehindLogGC(
tablet_servers_, &leader_uuid, &orig_term, &follower_uuid));
// Wait for the abandoned follower to be evicted.
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, tablet_servers_[leader_uuid],
tablet_id_, timeout));
ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 2));
}
// Test that, after followers are evicted from the config, the master re-adds a new
// replica for that follower and it eventually catches back up.
TEST_F(RaftConsensusITest, TestMasterReplacesEvictedFollowers) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
const int kReplicasNum = 3;
FLAGS_num_replicas = kReplicasNum;
// Need an extra tablet server to place the replacement replica in case of the
// 3-4-3 replication scheme.
if (FLAGS_raft_prepare_replacement_before_eviction) {
FLAGS_num_tablet_servers = kReplicasNum + 1;
}
vector<string> ts_flags;
AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
NO_FATALS(BuildAndStart(ts_flags));
TabletServerMap active_tablet_servers = tablet_servers_;
ASSERT_EQ(FLAGS_num_tablet_servers, active_tablet_servers.size());
if (FLAGS_raft_prepare_replacement_before_eviction) {
auto no_replica_uuids = GetServersWithoutReplica(tablet_id_);
ASSERT_EQ(1, no_replica_uuids.size());
ASSERT_EQ(1, active_tablet_servers.erase(no_replica_uuids.front()));
}
string leader_uuid;
int64_t orig_term;
string follower_uuid;
NO_FATALS(CauseFollowerToFallBehindLogGC(
active_tablet_servers, &leader_uuid, &orig_term, &follower_uuid));
if (FLAGS_raft_prepare_replacement_before_eviction) {
// Wait for the new replacement replica to be added and the replica that
// fell behind to be evicted. Since the 3-4-3 replication scheme adds a new
// non-voter replica before evicting the failed voter replica, the tablet
// server which used to host the failed replica will no longer be hosting
// any replica after the replacement completes.
ASSERT_EQ(kReplicasNum + 1, tablet_servers_.size());
ASSERT_EVENTUALLY([&] {
NO_FATALS(WaitForReplicasAndUpdateLocations(kTableId));
const auto uuids_no_replica = GetServersWithoutReplica(tablet_id_);
ASSERT_EQ(1, uuids_no_replica.size());
ASSERT_EQ(follower_uuid, uuids_no_replica.front());
});
} else {
// The follower will be evicted. Now wait for the master to cause it to be
// copied.
ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 2));
}
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::AT_LEAST, 1));
}
// Test that a ChangeConfig() request is rejected unless the leader has
// replicated one of its own log entries during the current term.
// This is required for correctness of Raft config change. For details,
// see https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) {
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
MonoDelta timeout = MonoDelta::FromSeconds(30);
int kLeaderIndex = 0;
TServerDetails* leader_ts = tablet_servers_[cluster_->tablet_server(kLeaderIndex)->uuid()];
// Prevent followers from accepting UpdateConsensus requests from the leader,
// even though they will vote. This will allow us to get the distributed
// system into a state where there is a valid leader (based on winning an
// election) but that leader will be unable to commit any entries from its
// own term, making it illegal to accept ChangeConfig() requests.
for (int i = 1; i <= 2; i++) {
ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i),
"follower_reject_update_consensus_requests", "true"));
}
// Elect the leader.
ASSERT_OK(StartElection(leader_ts, tablet_id_, timeout));
ASSERT_OK(WaitUntilLeader(leader_ts, tablet_id_, timeout));
// Now attempt to do a config change. It should be rejected because there
// have not been any ops (notably the initial NO_OP) from the leader's term
// that have been committed yet.
Status s = RemoveServer(leader_ts, tablet_id_,
tablet_servers_[cluster_->tablet_server(1)->uuid()],
timeout);
ASSERT_TRUE(!s.ok()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Leader has not yet committed an operation in its own term");
}
class RaftConsensusParamReplicationModesITest :
public RaftConsensusITest,
public ::testing::WithParamInterface<bool> {
};
INSTANTIATE_TEST_CASE_P(, RaftConsensusParamReplicationModesITest,
::testing::Bool());
// Regression test for KUDU-1735, a crash in the case where a pending
// config change operation is aborted during tablet deletion when that config
// change was in fact already persisted to disk.
TEST_P(RaftConsensusParamReplicationModesITest, Test_KUDU_1735) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
const bool is_3_4_3 = GetParam();
const vector<string> kTsFlags = {
// This scenario uses 'manual election' mode and assumes no leader change.
"--enable_leader_failure_detection=false",
// The test runs in both 3-2-3 and 3-4-3 replication modes.
Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3),
};
const vector<string> kMasterFlags = {
// This scenario uses 'manual election' mode.
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
// The test runs in both 3-2-3 and 3-4-3 replication modes.
Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3),
};
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
ASSERT_EQ(3, tablet_servers_.size());
TServerDetails* leader_tserver =
tablet_servers_[cluster_->tablet_server(0)->uuid()];
TServerDetails* evicted_tserver =
tablet_servers_[cluster_->tablet_server(1)->uuid()];
// Elect leader replica.
ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout));
ASSERT_OK(WaitUntilLeader(leader_tserver, tablet_id_, kTimeout));
// Wait for at least one operation to be committed in current term by the
// leader replica. Attempting to perform a config change (RemoveServer()
// in this scenario) prior to committing at least one operation in current
// term leads to an error since Kudu's Raft implementation enforces that
// invariant.
consensus::OpId opid;
ASSERT_OK(WaitForOpFromCurrentTerm(
leader_tserver, tablet_id_, consensus::COMMITTED_OPID, kTimeout, &opid));
// Wait for the committed index to propagate to all involved tablet servers.
ASSERT_OK(WaitForServersToAgree(
kTimeout, tablet_servers_, tablet_id_, opid.index(),
consensus::COMMITTED_OPID));
// Make follower tablet servers crash before writing a commit message.
for (const auto& e : tablet_servers_) {
const auto& server_uuid = e.first;
if (server_uuid == leader_tserver->uuid()) {
continue;
}
auto* ts = cluster_->tablet_server_by_uuid(server_uuid);
ASSERT_OK(cluster_->SetFlag(ts, "fault_crash_before_append_commit", "1.0"));
}
// Run a config change. This will cause the other servers to crash with
// pending config change operations due to the above fault injection.
auto num_crashed_servers = 0;
ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, evicted_tserver, kTimeout));
for (const auto& e : tablet_servers_) {
const auto& server_uuid = e.first;
if (server_uuid == leader_tserver->uuid()) {
// The tablet server with the leader replica will not crash.
continue;
}
// One of the remaining followers will crash while trying to commit the
// config change corresponding to the eviction of the other follower.
//
// As for the other (gone-and-back) follower, the behavior depends on
// the replica management scheme, being a bit more subtle in the case of
// the 3-4-3 scheme.
//
// In case of the 3-2-3 scheme, the gone-and-back server will crash while
// trying to add the COMMIT entry into the WAL. That change corresponds to
// the op which adds the server as a voting member of the resulting Raft
// configuration.
//
// In the 3-4-3 case, the catalog manager sends DeleteTablet() and ADD_PEER
// Raft configuration change requests upon detecting a change in the
// tablet's configuration reported by leader replica (the latter request is
// to keep the target replication factor). Those requests are scheduled
// to be sent asynchronously around the same time. Due to the concurrency of
// sending and processing those two requests, there are two possible
// outcomes:
// 1. If the tablet server completes processing the DeleteTablet() request
// before receiving the ADD_PEER config change request, the ADD_PEER
// config change to add a new non-voter will remain pending since
// only one voter (the leader replica of the tablet) is alive. Since
// the configuration change cannot be committed, no commit message is
// written to the WAL in this case. So, the tablet server will not hit
// the injected crash.
// 2. If the DeleteTablet() request is delayed and the leader replica
// re-discovers the evicted replica as a new peer with LMP_MISMATCH
// status after processing the ADD_PEER configuration change,
// the leader replica will send the missing updates to the lingering
// one. The lingering replica will try to replay the first missing
// update (REMOVE_PEER) and will crash upon adding corresponding
// record into its WAL.
auto* ts = cluster_->tablet_server_by_uuid(server_uuid);
auto s = ts->WaitForInjectedCrash(MonoDelta::FromSeconds(5));
if (server_uuid == evicted_tserver->uuid() && is_3_4_3 && !s.ok()) {
ASSERT_TRUE(s.IsTimedOut());
continue;
}
ASSERT_TRUE(s.ok()) << s.ToString();
++num_crashed_servers;
}
if (!is_3_4_3) {
ASSERT_EQ(2, num_crashed_servers);
} else {
ASSERT_GE(num_crashed_servers, 1);
ASSERT_LE(num_crashed_servers, 2);
vector<decltype(leader_tserver)> servers = { leader_tserver };
if (num_crashed_servers == 1) {
servers.push_back(evicted_tserver);
}
// In case of the 3-4-3 scheme, make sure the configuration change to
// add the removed server back as a non-voter hasn't been committed.
ASSERT_EVENTUALLY([&] {
for (const auto* server : servers) {
consensus::ConsensusStatePB cstate;
ASSERT_OK(itest::GetConsensusState(
server, tablet_id_, kTimeout, EXCLUDE_HEALTH_REPORT, &cstate));
ASSERT_TRUE(cstate.has_pending_config());
}
});
}
// Delete the table, so that when we restart the crashed servers, they'll get RPCs to
// delete tablets while config changes are pending.
ASSERT_OK(client_->DeleteTable(kTableId));
// Restart tablet servers with follower replicas and wait for them to delete
// their replicas.
for (const auto& e : tablet_servers_) {
const auto& server_uuid = e.first;
if (server_uuid == leader_tserver->uuid()) {
// The leader should not crash, no need to restart it.
continue;
}
auto* ts = cluster_->tablet_server_by_uuid(server_uuid);
ts->Shutdown();
ASSERT_OK(ts->Restart());
}
for (const auto& e : tablet_servers_) {
ASSERT_OK(WaitForNumTabletsOnTS(e.second, 0, kTimeout, nullptr));
}
}
// Test that if for some reason none of the ops can be prepared, that it will
// come back as an error in UpdateConsensus().
TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) {
const int kNumOps = 10;
vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(3, tservers.size());
// Shutdown the other servers so they don't get chatty.
cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
// Configure the first server to fail all on prepare.
TServerDetails *replica_ts = tservers[0];
ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(replica_ts->uuid()),
"follower_fail_all_prepare", "true"));
// Pretend to be the leader and send a request that should return an error.
ConsensusRequestPB req;
ConsensusResponsePB resp;
RpcController rpc;
req.set_dest_uuid(replica_ts->uuid());
req.set_tablet_id(tablet_id_);
req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid());
req.set_caller_term(0);
req.set_committed_index(0);
req.set_all_replicated_index(0);
req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0));
int64_t base_ts = GetTimestampOnServer(replica_ts);
for (int i = 0; i < kNumOps; i++) {
AddOp(MakeOpId(0, 1 + i), base_ts, &req);
}
ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc));
LOG(INFO) << SecureShortDebugString(resp);
ASSERT_TRUE(resp.status().has_error());
ASSERT_EQ(consensus::ConsensusErrorPB::CANNOT_PREPARE, resp.status().error().code());
ASSERT_STR_CONTAINS(SecureShortDebugString(resp), "Could not prepare a single op");
}
// Test that, if the raft metadata on a replica is corrupt, then the server
// doesn't crash, but instead marks the tablet as failed.
TEST_F(RaftConsensusITest, TestCorruptReplicaMetadata) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
// Start cluster and wait until we have a stable leader.
// Switch off tombstoning of evicted replicas to observe the failed tablet state.
const vector<string> kTserverFlags = {
// Ensure we are safe to evict.
"--consensus_rpc_timeout_ms=10000",
};
const vector<string> kMasterFlags = {
// This is to spot the FAILED status of the corresponding tablet.
"--master_tombstone_evicted_tablet_replicas=false",
};
FLAGS_num_tablet_servers = FLAGS_num_replicas + 1;
NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
const auto uuids_no_replica = GetServersWithoutReplica(tablet_id_);
ASSERT_EQ(1, uuids_no_replica.size());
const auto uuids_with_replica = GetServersWithReplica(tablet_id_);
ASSERT_EQ(FLAGS_num_replicas, uuids_with_replica.size());
// Wait until the tablet servers hosting the replica agree on the log index.
TabletServerMap tablet_servers;
for (const auto& e : tablet_replicas_) {
if (e.first == tablet_id_) {
tablet_servers.emplace(e.second->uuid(), e.second);
}
}
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers,
tablet_id_, 1));
// Shut down one of the tablet servers, and then muck
// with its consensus metadata to corrupt it.
const auto ts_idx = cluster_->tablet_server_index_by_uuid(uuids_with_replica.front());
auto* ts = cluster_->tablet_server(ts_idx);
ts->Shutdown();
consensus::ConsensusMetadataPB cmeta_pb;
ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(ts_idx, tablet_id_, &cmeta_pb));
cmeta_pb.set_current_term(-2);
ASSERT_OK(inspect_->WriteConsensusMetadataOnTS(ts_idx, tablet_id_, cmeta_pb));
ASSERT_OK(ts->Restart());
// The server should come up with a 'FAILED' status because of the corrupt
// metadata.
ASSERT_OK(WaitUntilTabletInState(tablet_servers_[ts->uuid()],
tablet_id_,
tablet::FAILED,
kTimeout));
// A new good copy should get created automatically.
ASSERT_OK(WaitUntilTabletInState(tablet_servers_[uuids_no_replica.front()],
tablet_id_,
tablet::RUNNING,
kTimeout));
}
int64_t GetFailedElectionsSinceStableLeader(const ExternalTabletServer* ets,
const std::string& tablet_id) {
int64_t ret;
CHECK_OK(GetInt64Metric(
ets->bound_http_hostport(),
&METRIC_ENTITY_tablet,
tablet_id.c_str(),
&METRIC_failed_elections_since_stable_leader,
"value",
&ret));
return ret;
}
int64_t GetTimeSinceLastLeaderHeartbeat(const ExternalTabletServer* ets,
const std::string& tablet_id) {
int64_t ret;
CHECK_OK(GetInt64Metric(
ets->bound_http_hostport(),
&METRIC_ENTITY_tablet,
tablet_id.c_str(),
&METRIC_time_since_last_leader_heartbeat,
"value",
&ret));
return ret;
}
// Test for election-related metrics with the leader failure detection disabled.
// This avoids inadvertent leader elections that might race with the tests
// of the metric's behavior.
TEST_F(RaftConsensusITest, TestElectionMetricsFailureDetectionDisabled) {
constexpr auto kNumReplicas = 3;
constexpr auto kNumTservers = 3;
const vector<string> kTsFlags = {
// Make leader elections faster so we can test
// failed_elections_since_stable_leader faster.
"--raft_heartbeat_interval_ms=100",
// For stability reasons, this scenario uses the 'manual election' mode
// and assumes no leader change before restarting tablet servers later on.
"--enable_leader_failure_detection=false",
};
const vector<string> kMasterFlags = {
// Corresponding flag for the tserver's '--enable_leader_failure_detection'.
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
const auto kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_replicas = kNumReplicas;
FLAGS_num_tablet_servers = kNumTservers;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
const auto* leader = tablet_servers_.begin()->second;
const auto& leader_uuid = leader->uuid();
ASSERT_EVENTUALLY([&]() {
const auto kLeaderTimeout = MonoDelta::FromSeconds(10);
ASSERT_OK(StartElection(leader, tablet_id_, kLeaderTimeout));
TServerDetails* elected_leader;
ASSERT_OK(WaitForLeaderWithCommittedOp(
tablet_id_, kLeaderTimeout, &elected_leader));
ASSERT_EQ(leader->uuid(), elected_leader->uuid());
});
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
auto* leader_srv = cluster_->tablet_server_by_uuid(leader_uuid);
auto* follower_srv = cluster_->tablet_server(
(cluster_->tablet_server_index_by_uuid(leader_uuid) + 1) % kNumTservers);
// Leader should always report 0 since last leader heartbeat.
EXPECT_EQ(0, GetTimeSinceLastLeaderHeartbeat(leader_srv, tablet_id_));
EXPECT_EQ(0, GetFailedElectionsSinceStableLeader(leader_srv, tablet_id_));
// Let's shut down all tablet servers except our chosen follower to make sure
// we don't have a leader.
for (auto i = 0; i < kNumTservers; ++i) {
if (cluster_->tablet_server(i) != follower_srv) {
cluster_->tablet_server(i)->Shutdown();
}
}
// Get two measurements with 500 ms sleep between them and see if the
// difference between them is at least 500ms.
const int64_t time_before_wait = GetTimeSinceLastLeaderHeartbeat(follower_srv,
tablet_id_);
SleepFor(MonoDelta::FromMilliseconds(500));
const int64_t time_after_wait = GetTimeSinceLastLeaderHeartbeat(follower_srv,
tablet_id_);
ASSERT_TRUE(time_after_wait >= time_before_wait + 500)
<< "time_before_wait: " << time_before_wait << "; "
<< "time_after_wait: " << time_after_wait;
// The follower doesn't start elections on itself because of
// --enable_leader_failure_detection=false flag, so the metric should read 0.
ASSERT_EQ(0, GetFailedElectionsSinceStableLeader(follower_srv, tablet_id_));
}
// Test for election-related metrics with the leader failure detection enabled.
TEST_F(RaftConsensusITest, TestElectionMetricsFailureDetectionEnabled) {
constexpr auto kNumReplicas = 3;
constexpr auto kNumTservers = 3;
const vector<string> kTsFlags = {
// Make leader elections faster so we can test
// failed_elections_since_stable_leader faster.
"--raft_heartbeat_interval_ms=100",
// For the stability of the test, make leader election less likely in case
// of intermittent network failures and latency spikes. Otherwise, in case
// of on-going re-elections the metric value would not stabilize for a long
// time.
"--leader_failure_max_missed_heartbeat_periods=20",
};
const auto kTimeout = MonoDelta::FromSeconds(30);
FLAGS_num_replicas = kNumReplicas;
FLAGS_num_tablet_servers = kNumTservers;
NO_FATALS(BuildAndStart(kTsFlags));
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
// Verify failed_elections_since_stable_leader is reset to 0 in the majority
// of all replicas after the cluster is in stable state, i.e. the leader
// is elected and functional.
ASSERT_EVENTUALLY([&]() {
for (auto i = 0; i < kNumTservers; ++i) {
ASSERT_EQ(0, GetFailedElectionsSinceStableLeader(
cluster_->tablet_server(i), tablet_id_));
}
});
auto* srv_ts = tablet_servers_.begin()->second;
auto* srv_ext = cluster_->tablet_server_by_uuid(srv_ts->uuid());
for (auto i = 0; i < kNumTservers; ++i) {
auto* s = cluster_->tablet_server(i);
if (s == srv_ext) {
continue;
}
s->Shutdown();
}
// If the server which is left alive hosts the leader replica for the tablet,
// make it step down. Otherwise, it's a no-op.
ignore_result(LeaderStepDown(srv_ts, tablet_id_, kTimeout));
// Verify failed_elections_since_stable_leader is advanced eventually.
ASSERT_EVENTUALLY([&]() {
ASSERT_GT(GetFailedElectionsSinceStableLeader(srv_ext, tablet_id_), 0);
});
// Restart the rest of tablet servers.
for (auto i = 0; i < kNumTservers; ++i) {
auto* s = cluster_->tablet_server(i);
if (s == srv_ext) {
continue;
}
ASSERT_OK(s->Restart());
}
ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
// Verify failed_elections_since_stable_leader is reset to 0 eventually.
ASSERT_EVENTUALLY([&]() {
for (auto i = 0; i < kNumTservers; ++i) {
ASSERT_EQ(0, GetFailedElectionsSinceStableLeader(
cluster_->tablet_server(i), tablet_id_));
}
});
}
// Test that an IOError when writing to the write-ahead log is a fatal error.
// First, we test that failed replicates are fatal. Then, we test that failed
// commits are fatal.
TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) {
const vector<string> kTsFlags = {
"--enable_leader_failure_detection=false",
// Disable core dumps since we will inject FATAL errors, and dumping
// core can take a long time.
"--disable_core_dumps",
};
const vector<string> kMasterFlags = {
"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
};
FLAGS_num_replicas = 3;
FLAGS_num_tablet_servers = 3;
NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(3, tservers.size());
vector<ExternalTabletServer*> ext_tservers;
for (auto* details : tservers) {
ext_tservers.push_back(cluster_->tablet_server_by_uuid(details->uuid()));
}
// Test failed replicates.
// Elect server 2 as leader and wait for log index 1 to propagate to all servers.
ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
// Inject an IOError the next time servers 1 and 2 write to their WAL.
// Then, cause server 0 to start and win a leader election.
// This will cause servers 0 and 1 to crash.
for (int i = 1; i <= 2; i++) {
ASSERT_OK(cluster_->SetFlag(ext_tservers[i],
"log_inject_io_error_on_append_fraction", "1.0"));
}
ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
for (int i = 1; i <= 2; i++) {
ASSERT_OK(ext_tservers[i]->WaitForFatal(MonoDelta::FromSeconds(10)));
}
// Now we know followers crash when they write to their log.
// Let's verify the same for the leader (server 0).
ASSERT_OK(cluster_->SetFlag(ext_tservers[0],
"log_inject_io_error_on_append_fraction", "1.0"));
// Attempt to write to the leader, but with a short timeout.
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
workload.set_timeout_allowed(true);
workload.set_write_timeout_millis(100);
workload.set_num_write_threads(1);
workload.set_write_batch_size(1);
workload.Setup();
workload.Start();
// Leader should crash as well.
ASSERT_OK(ext_tservers[0]->WaitForFatal(MonoDelta::FromSeconds(10)));
workload.StopAndJoin();
LOG(INFO) << "Everything crashed!";
// Test failed commits.
cluster_->Shutdown();
ASSERT_OK(cluster_->Restart());
NO_FATALS(WaitForTSAndReplicas());
tservers.clear();
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(3, tservers.size());
// Elect server 0 as leader, wait until writes are going through.
ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
workload.Start();
int64_t prev_inserted = workload.rows_inserted();
while (workload.rows_inserted() == prev_inserted) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
workload.StopAndJoin();
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
// Now shutdown servers 1 and 2 so that writes cannot commit. Write to the
// leader, set flags so that commits crash the server, then bring the
// followers back up.
for (int i = 1; i <= 2; i++) {
ext_tservers[i]->Shutdown();
}
OpId prev_opid, cur_opid;
ASSERT_OK(GetLastOpIdForReplica(tablet_id_, tservers[0], consensus::RECEIVED_OPID,
MonoDelta::FromSeconds(10), &prev_opid));
VLOG(1) << "Previous OpId on server 0: " << OpIdToString(prev_opid);
workload.Start();
// Wait until we've got (uncommitted) entries into the leader's log.
do {
ASSERT_OK(GetLastOpIdForReplica(tablet_id_, tservers[0], consensus::RECEIVED_OPID,
MonoDelta::FromSeconds(10), &cur_opid));
VLOG(1) << "Current OpId on server 0: " << OpIdToString(cur_opid);
} while (consensus::OpIdEquals(prev_opid, cur_opid));
workload.StopAndJoin();
ASSERT_OK(cluster_->SetFlag(ext_tservers[0],
"log_inject_io_error_on_append_fraction", "1.0"));
for (int i = 1; i <= 2; i++) {
ASSERT_OK(ext_tservers[i]->Restart());
}
// Leader will crash.
ASSERT_OK(ext_tservers[0]->WaitForFatal(MonoDelta::FromSeconds(10)));
}
// KUDU-1613: Test that when we reset and restart a tablet server, with a new
// uuid but with the same host and port, replicas that were hosted by the
// previous incarnation are correctly detected as failed and eventually
// re-replicated.
TEST_P(RaftConsensusParamReplicationModesITest, TestRestartWithDifferentUUID) {
// Start a cluster and insert data.
const bool kPrepareReplacementBeforeEviction = GetParam();
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = kPrepareReplacementBeforeEviction ? 4 : 3;
opts.extra_tserver_flags = {
// Set a low timeout. If we can't re-replicate in a reasonable amount of
// time, it means we're not evicting at all.
"--follower_unavailable_considered_failed_sec=10",
Substitute("--raft_prepare_replacement_before_eviction=$0",
kPrepareReplacementBeforeEviction),
};
opts.extra_master_flags = {
Substitute("--raft_prepare_replacement_before_eviction=$0",
kPrepareReplacementBeforeEviction),
};
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
ASSERT_OK(cluster_->Start());
// Write some data. In writing many tablets, we're making it more likely that
// all tablet servers will have some tablets on them.
TestWorkload writes(cluster_.get());
writes.set_num_tablets(5);
writes.set_timeout_allowed(true);
writes.Setup();
writes.Start();
const auto wait_for_some_inserts = [&] {
const auto rows_init = writes.rows_inserted();
ASSERT_EVENTUALLY([&] {
ASSERT_GT(writes.rows_inserted() - rows_init, 1000);
});
};
wait_for_some_inserts();
// Completely shut down one of the tablet servers, keeping its ports so we
// can take over with another tablet server.
ExternalTabletServer* ts = cluster_->tablet_server(0);
vector<HostPort> master_hostports;
for (int i = 0; i < cluster_->num_masters(); i++) {
master_hostports.push_back(cluster_->master(i)->bound_rpc_hostport());
}
ExternalDaemonOptions ts_opts = ts->opts();
LOG(INFO) << Substitute("Storing port $0 for use by new tablet server",
ts->bound_rpc_hostport().ToString());
ts_opts.rpc_bind_address = ts->bound_rpc_hostport();
ts->Shutdown();
// With one server down, we should be able to accept writes.
wait_for_some_inserts();
writes.StopAndJoin();
// Start up a new server with a new UUID bound to the old RPC port.
ts_opts.wal_dir = Substitute("$0-new", ts_opts.wal_dir);
ts_opts.data_dirs = { Substitute("$0-new", ts_opts.data_dirs[0]) };
ASSERT_OK(env_->CreateDir(ts_opts.wal_dir));
ASSERT_OK(env_->CreateDir(ts_opts.data_dirs[0]));
scoped_refptr<ExternalTabletServer> new_ts =
new ExternalTabletServer(ts_opts, master_hostports);
ASSERT_OK(new_ts->Start());
// Eventually the new server should be copied to.
ASSERT_EVENTUALLY([&] {
vector<string> files_in_wal_dir;
ASSERT_OK(ListFilesInDir(env_, JoinPathSegments(ts_opts.wal_dir, "wals"), &files_in_wal_dir));
ASSERT_FALSE(files_in_wal_dir.empty());
});
}
// Designating graceful leadership transfer to a follower that cannot catch up
// should eventually fail.
TEST_F(RaftConsensusITest, TestLeaderTransferWhenFollowerFallsBehindLeaderGC) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
}
const auto kTimeout = MonoDelta::FromSeconds(30);
vector<string> ts_flags = {
// Disable follower eviction.
"--evict_failed_followers=false",
};
vector<string> master_flags = {
// Prevent the master from evicting unrecoverably failed followers.
"--catalog_manager_evict_excess_replicas=false",
};
AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
NO_FATALS(BuildAndStart(ts_flags, master_flags));
string leader_uuid;
int64_t orig_term;
string follower_uuid;
NO_FATALS(CauseFollowerToFallBehindLogGC(
tablet_servers_, &leader_uuid, &orig_term, &follower_uuid));
SCOPED_TRACE(Substitute("leader: $0 follower: $1", leader_uuid, follower_uuid));
// Wait for remaining majority to agree.
TabletServerMap active_tablet_servers = tablet_servers_;
ASSERT_EQ(3, active_tablet_servers.size());
ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 1));
// Try to transfer leadership to the follower that has fallen behind log GC.
// The leader role might fluctuate among tablet replicas: LeaderStepDown()
// request will be retried in such rare cases because of ASSERT_EVENTUALLY().
ASSERT_EVENTUALLY([&] {
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(
tablet_servers_, tablet_id_, kTimeout, &leader_ts));
LeaderStepDownRequestPB req;
req.set_dest_uuid(leader_ts->uuid());
req.set_tablet_id(tablet_id_);
req.set_mode(consensus::LeaderStepDownMode::GRACEFUL);
req.set_new_leader_uuid(follower_uuid);
// The request should succeed.
ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(leader_ts->consensus_proxy.get());
RpcController ctl;
LeaderStepDownResponsePB resp;
ASSERT_OK(c_proxy->LeaderStepDown(req, &resp, &ctl));
ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
});
// However, the leader will not be able to transfer leadership to the lagging
// follower, and eventually will resume normal operation. We check this by
// waiting for a write to succeed.
ASSERT_EVENTUALLY([&] {
WriteRequestPB w_req;
w_req.set_tablet_id(tablet_id_);
ASSERT_OK(SchemaToPB(schema_, w_req.mutable_schema()));
AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
"hello world", w_req.mutable_row_operations());
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(
tablet_servers_, tablet_id_, kTimeout, &leader_ts));
RpcController ctl;
ctl.set_timeout(kTimeout);
WriteResponsePB w_resp;
ASSERT_OK(leader_ts->tserver_proxy->Write(w_req, &w_resp, &ctl));
});
}
} // namespace tserver
} // namespace kudu