| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/row_operations.pb.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus.proxy.h" |
| #include "kudu/consensus/consensus_peers.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/strcat.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/log_verifier.h" |
| #include "kudu/integration-tests/mini_cluster_fs_inspector.h" |
| #include "kudu/integration-tests/raft_consensus-itest-base.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/server/server_base.pb.h" |
| #include "kudu/server/server_base.proxy.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tserver/tablet_server-test-base.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_service.proxy.h" |
| #include "kudu/util/atomic.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/env_util.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/dns_resolver.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(raft_prepare_replacement_before_eviction); |
| DECLARE_int32(client_inserts_per_thread); |
| DECLARE_int32(client_num_batches_per_thread); |
| DECLARE_int32(consensus_rpc_timeout_ms); |
| DECLARE_int32(num_client_threads); |
| DECLARE_int32(num_replicas); |
| DECLARE_int32(num_tablet_servers); |
| DECLARE_int32(rpc_timeout); |
| DECLARE_bool(encrypt_data_at_rest); |
| |
| METRIC_DECLARE_entity(server); |
| METRIC_DECLARE_entity(tablet); |
| METRIC_DECLARE_counter(transaction_memory_pressure_rejections); |
| METRIC_DECLARE_gauge_int64(time_since_last_leader_heartbeat); |
| METRIC_DECLARE_gauge_int64(failed_elections_since_stable_leader); |
| METRIC_DECLARE_gauge_uint64(hybrid_clock_timestamp); |
| |
| using kudu::client::KuduInsert; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduTable; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalDaemonOptions; |
| using kudu::cluster::ExternalTabletServer; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using kudu::consensus::ConsensusRequestPB; |
| using kudu::consensus::ConsensusResponsePB; |
| using kudu::consensus::ConsensusServiceProxy; |
| using kudu::consensus::EXCLUDE_HEALTH_REPORT; |
| using kudu::consensus::LeaderStepDownRequestPB; |
| using kudu::consensus::LeaderStepDownResponsePB; |
| using kudu::consensus::MajoritySize; |
| using kudu::consensus::MakeOpId; |
| using kudu::consensus::OpId; |
| using kudu::consensus::RaftPeerAttrsPB; |
| using kudu::consensus::RaftPeerPB; |
| using kudu::consensus::ReplicateMsg; |
| using kudu::itest::AddServer; |
| using kudu::itest::DONT_WAIT_FOR_LEADER; |
| using kudu::itest::GetInt64Metric; |
| using kudu::itest::LeaderStepDown; |
| using kudu::itest::RemoveServer; |
| using kudu::itest::StartElection; |
| using kudu::itest::TServerDetails; |
| using kudu::itest::TabletServerMap; |
| using kudu::itest::WAIT_FOR_LEADER; |
| using kudu::itest::WaitForNumTabletsOnTS; |
| using kudu::itest::WaitForOpFromCurrentTerm; |
| using kudu::itest::WaitForReplicasReportedToMaster; |
| using kudu::itest::WaitForServersToAgree; |
| using kudu::itest::WaitUntilCommittedOpIdIndexIs; |
| using kudu::itest::WaitUntilLeader; |
| using kudu::itest::WaitUntilTabletInState; |
| using kudu::itest::WriteSimpleTestRow; |
| using kudu::master::TabletLocationsPB; |
| using kudu::master::VOTER_REPLICA; |
| using kudu::env_util::ListFilesInDir; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::RpcController; |
| using kudu::server::SetFlagRequestPB; |
| using kudu::server::SetFlagResponsePB; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tserver { |
| |
| static const int kConsensusRpcTimeoutForTests = 50; |
| |
| static const int kTestRowKey = 1234; |
| static const int kTestRowIntVal = 5678; |
| |
| // Integration test for the raft consensus implementation. |
| // Uses the whole tablet server stack with ExternalMiniCluster. |
| class RaftConsensusITest : public RaftConsensusITestBase { |
| public: |
| RaftConsensusITest() { |
| } |
| |
| // Gets the current timestamp on the given server. |
| int64_t GetTimestampOnServer(TServerDetails* tserver) const; |
| |
| // Scan the given replica in a loop until the number of rows |
| // is 'expected_count'. If it takes more than 10 seconds, then |
| // fails the test. |
| void WaitForRowCount(TabletServerServiceProxy* replica_proxy, |
| int expected_count, |
| vector<string>* results); |
| |
| // Add an Insert operation to the given consensus request. |
| // The row to be inserted is generated based on the OpId with a timestamp |
| // that is greater than 'base_ts'. |
| void AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req); |
| void AddOpWithTypeAndKey(const OpId& id, int64_t base_ts, |
| RowOperationsPB::Type op_type, |
| int32_t key, |
| ConsensusRequestPB* req); |
| |
| string DumpToString(TServerDetails* leader, |
| const vector<string>& leader_results, |
| TServerDetails* replica, |
| const vector<string>& replica_results); |
| |
| // Insert 'num_rows' rows starting with row key 'start_row'. |
| // Each row will have size 'payload_size'. A short (100ms) timeout is |
| // used. If the flush generates any errors they will be ignored. |
| void InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size); |
| |
| // Brings Chaos to a MiniTabletServer by introducing random delays. Does this by |
| // pausing the daemon a random amount of time. |
| void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec); |
| |
| // Thread which loops until '*finish' becomes true, trying to insert a row |
| // on the given tablet server identified by 'replica_idx'. |
| void StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish); |
| |
| // Stops the current leader of the configuration, runs leader election and then brings it back. |
| // Before stopping the leader this pauses all follower nodes in regular intervals so that |
| // we get an increased chance of stuff being pending. |
| void StopOrKillLeaderAndElectNewOne(); |
| |
| // Writes 'num_writes' operations to the current leader. Each of the operations |
| // has a payload of around 128KB. Causes a gtest failure on error. |
| void Write128KOpsToLeader(int num_writes); |
| |
| // Ensure that a majority of servers is required for elections and writes. |
| // This is done by pausing a majority and asserting that writes and elections fail, |
| // then unpausing the majority and asserting that elections and writes succeed. |
| // If fails, throws a gtest assertion. |
| // Note: This test assumes all tablet servers listed in tablet_servers are voters. |
| void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers, |
| const string& leader_uuid); |
| |
| void CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags = {}); |
| void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert); |
| |
| // Prepare for a test where a single replica of a 3-server cluster is left |
| // running as a follower. |
| void SetupSingleReplicaTest(TServerDetails** replica_ts); |
| |
| protected: |
| shared_ptr<KuduTable> table_; |
| vector<thread> threads_; |
| }; |
| |
| int64_t RaftConsensusITest::GetTimestampOnServer(TServerDetails* tserver) const { |
| int64_t ret; |
| ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(tserver->uuid()); |
| CHECK_OK(GetInt64Metric(ets->bound_http_hostport(), &METRIC_ENTITY_server, |
| nullptr, &METRIC_hybrid_clock_timestamp, "value", &ret)); |
| return ret; |
| } |
| |
| void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy, |
| int expected_count, |
| vector<string>* results) { |
| LOG(INFO) << "Waiting for row count " << expected_count << "..."; |
| MonoTime start = MonoTime::Now(); |
| MonoTime deadline = start + MonoDelta::FromSeconds(90); |
| while (true) { |
| results->clear(); |
| NO_FATALS(ScanReplica(replica_proxy, results)); |
| if (results->size() == expected_count) { |
| return; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| if (MonoTime::Now() >= deadline) { |
| break; |
| } |
| } |
| FAIL() << "Did not reach expected row count " << expected_count |
| << " after " << (MonoTime::Now() - start).ToString() |
| << ": rows: " << *results; |
| } |
| |
| void RaftConsensusITest::AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req) { |
| AddOpWithTypeAndKey(id, base_ts, RowOperationsPB::INSERT, |
| id.index() * 10000 + id.term(), req); |
| } |
| |
| void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id, int64_t base_ts, |
| RowOperationsPB::Type op_type, |
| int32_t key, |
| ConsensusRequestPB* req) { |
| ReplicateMsg* msg = req->add_ops(); |
| msg->mutable_id()->CopyFrom(id); |
| // Set a somewhat realistic timestamp such that it is monotonically |
| // increasing per op and starts off higher than 1. This is required, as some |
| // test cases test the scenario where the WAL is replayed and no-ops and |
| // writes are expected to have monotonically increasing timestamps. |
| msg->set_timestamp(base_ts + id.index() * 10000 + id.term()); |
| msg->set_op_type(consensus::WRITE_OP); |
| WriteRequestPB* write_req = msg->mutable_write_request(); |
| CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema())); |
| write_req->set_tablet_id(tablet_id_); |
| AddTestRowToPB(op_type, schema_, key, id.term(), |
| SecureShortDebugString(id), write_req->mutable_row_operations()); |
| } |
| |
| string RaftConsensusITest::DumpToString(TServerDetails* leader, |
| const vector<string>& leader_results, |
| TServerDetails* replica, |
| const vector<string>& replica_results) { |
| string ret = Substitute("Replica results did not match the leaders." |
| "\nLeader: $0\nReplica: $1. Results size " |
| "L: $2 R: $3", |
| leader->ToString(), |
| replica->ToString(), |
| leader_results.size(), |
| replica_results.size()); |
| |
| StrAppend(&ret, "Leader Results: \n"); |
| for (const string& result : leader_results) { |
| StrAppend(&ret, result, "\n"); |
| } |
| |
| StrAppend(&ret, "Replica Results: \n"); |
| for (const string& result : replica_results) { |
| StrAppend(&ret, result, "\n"); |
| } |
| |
| return ret; |
| } |
| |
| void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row, |
| int num_rows, |
| int payload_size) { |
| shared_ptr<KuduTable> table; |
| CHECK_OK(client_->OpenTable(kTableId, &table)); |
| shared_ptr<KuduSession> session = client_->NewSession(); |
| session->SetTimeoutMillis(100); |
| CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| string payload(payload_size, 'x'); |
| for (int i = 0; i < num_rows; i++) { |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| KuduPartialRow* row = insert->mutable_row(); |
| CHECK_OK(row->SetInt32(0, i + start_row)); |
| CHECK_OK(row->SetInt32(1, 0)); |
| CHECK_OK(row->SetStringCopy(2, payload)); |
| CHECK_OK(session->Apply(insert.release())); |
| ignore_result(session->Flush()); |
| } |
| } |
| |
| void RaftConsensusITest::DelayInjectorThread( |
| ExternalTabletServer* tablet_server, int timeout_msec) { |
| |
| while (inserters_.count() > 0) { |
| // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock |
| // longer than the the timeout a small (~5%) percentage of the times. |
| // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution). |
| double sleep_time_usec = 1000 * |
| ((random_.Normal(0, 1) * timeout_msec) / 1.64485); |
| |
| if (sleep_time_usec < 0) sleep_time_usec = 0; |
| |
| // Additionally only cause timeouts at all 50% of the time, otherwise sleep. |
| double val = (rand() * 1.0) / RAND_MAX; |
| if (val < 0.5) { |
| SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec)); |
| continue; |
| } |
| |
| ASSERT_OK(tablet_server->Pause()); |
| LOG_IF(INFO, sleep_time_usec > 0.0) |
| << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid() |
| << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec..."; |
| SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec)); |
| ASSERT_OK(tablet_server->Resume()); |
| } |
| } |
| |
| void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) { |
| vector<TServerDetails*> servers; |
| AppendValuesFromMap(tablet_servers_, &servers); |
| CHECK_LT(replica_idx, servers.size()); |
| TServerDetails* ts = servers[replica_idx]; |
| |
| // Manually construct an RPC to our target replica. We expect most of the calls |
| // to fail either with an "already present" or an error because we are writing |
| // to a follower. That's OK, though - what we care about for this test is |
| // just that the operations Apply() in the same order everywhere (even though |
| // in this case the result will just be an error). |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController rpc; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal, |
| "hello world", req.mutable_row_operations()); |
| |
| while (!finish->Load()) { |
| resp.Clear(); |
| rpc.Reset(); |
| rpc.set_timeout(MonoDelta::FromSeconds(10)); |
| ignore_result(ts->tserver_proxy->Write(req, &resp, &rpc)); |
| VLOG(1) << "Response from server " << replica_idx << ": " |
| << SecureShortDebugString(resp); |
| } |
| } |
| |
| void RaftConsensusITest::StopOrKillLeaderAndElectNewOne() { |
| TServerDetails* leader; |
| vector<TServerDetails*> followers; |
| CHECK_OK(GetTabletLeaderAndFollowers(tablet_id_, &leader, &followers)); |
| ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid()); |
| |
| for (const auto* ts : followers) { |
| ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid()); |
| CHECK_OK(ets->Pause()); |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| |
| // When all are paused also pause or kill the current leader. Since we've waited a bit |
| // the old leader is likely to have operations that must be aborted. |
| const bool do_kill = rand() % 2 == 0; |
| if (do_kill) { |
| leader_ets->Shutdown(); |
| } else { |
| CHECK_OK(leader_ets->Pause()); |
| } |
| |
| // Resume the replicas. |
| for (const auto* ts : followers) { |
| ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid()); |
| CHECK_OK(ets->Resume()); |
| } |
| |
| // Get the new leader. |
| TServerDetails* new_leader; |
| CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader)); |
| |
| // Bring the old leader back. |
| if (do_kill) { |
| CHECK_OK(leader_ets->Restart()); |
| // Wait until we have the same number of followers. |
| const auto initial_followers = followers.size(); |
| do { |
| GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
| } while (followers.size() < initial_followers); |
| } else { |
| CHECK_OK(leader_ets->Resume()); |
| } |
| } |
| |
| void RaftConsensusITest::Write128KOpsToLeader(int num_writes) { |
| TServerDetails* leader = nullptr; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| |
| WriteRequestPB req; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| RowOperationsPB* data = req.mutable_row_operations(); |
| WriteResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromMilliseconds(10000)); |
| int key = 0; |
| |
| // generate a 128Kb dummy payload |
| string test_payload(128 * 1024, '0'); |
| for (int i = 0; i < num_writes; i++) { |
| rpc.Reset(); |
| data->Clear(); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, key, |
| test_payload, data); |
| key++; |
| ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc)); |
| |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| } |
| } |
| |
| void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites( |
| const TabletServerMap& tablet_servers, const string& leader_uuid) { |
| static const auto kTimeout = MonoDelta::FromSeconds(20); |
| |
| TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid); |
| |
| // Calculate number of servers to leave unpaused (minority). |
| // This math is a little unintuitive but works for cluster sizes including 2 and 1. |
| // Note: We assume all of these TSes are voters. |
| int config_size = tablet_servers.size(); |
| int minority_to_retain = MajoritySize(config_size) - 1; |
| |
| // Only perform this part of the test if we have some servers to pause, else |
| // the failure assertions will throw. |
| if (config_size > 1) { |
| // Pause enough replicas to prevent a majority. |
| int num_to_pause = config_size - minority_to_retain; |
| LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size; |
| vector<string> paused_uuids; |
| for (const TabletServerMap::value_type& entry : tablet_servers) { |
| if (paused_uuids.size() == num_to_pause) { |
| continue; |
| } |
| const string& replica_uuid = entry.first; |
| if (replica_uuid == leader_uuid) { |
| // Always leave this one alone. |
| continue; |
| } |
| ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid); |
| ASSERT_OK(replica_ts->Pause()); |
| paused_uuids.push_back(replica_uuid); |
| } |
| |
| // Ensure writes timeout while only a minority is alive. |
| Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE, |
| kTestRowKey, kTestRowIntVal, "foo", |
| MonoDelta::FromMilliseconds(100)); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| // Step down. Scenarios which use this method turn off leader failure |
| // detection, so the leader role cannot fluctuate among tablet replicas. |
| ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, kTimeout)); |
| |
| // Assert that elections time out without a live majority. |
| // We specify a very short timeout here to keep the tests fast. |
| ASSERT_OK(StartElection(initial_leader, tablet_id_, kTimeout)); |
| s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100)); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString(); |
| |
| // Resume the paused servers. |
| LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size; |
| for (const string& replica_uuid : paused_uuids) { |
| ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid); |
| ASSERT_OK(replica_ts->Resume()); |
| } |
| } |
| |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers, tablet_id_, 1)); |
| |
| // Now an election should succeed. |
| ASSERT_OK(StartElection(initial_leader, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, kTimeout)); |
| LOG(INFO) << "Successful election with full config of size " << config_size; |
| |
| // And a write should also succeed. |
| ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE, |
| kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size), |
| kTimeout)); |
| } |
| |
| void RaftConsensusITest::CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags) { |
| if (AllowSlowTests()) { |
| FLAGS_num_tablet_servers = 7; |
| FLAGS_num_replicas = 7; |
| } |
| |
| vector<string> ts_flags; |
| |
| // Crash 5% of the time just before sending an RPC. With 7 servers, |
| // this means we crash about 30% of the time before we've fully |
| // replicated the NO_OP at the start of the term. |
| ts_flags.emplace_back("--fault_crash_on_leader_request_fraction=0.05"); |
| |
| // Inject latency to encourage the replicas to fall out of sync |
| // with each other. |
| ts_flags.emplace_back("--log_inject_latency"); |
| ts_flags.emplace_back("--log_inject_latency_ms_mean=30"); |
| ts_flags.emplace_back("--log_inject_latency_ms_stddev=60"); |
| |
| // Make leader elections faster so we get through more cycles of leaders. |
| ts_flags.emplace_back("--raft_heartbeat_interval_ms=100"); |
| |
| // Avoid preallocating segments since bootstrap is a little bit |
| // faster if it doesn't have to scan forward through the preallocated |
| // log area. |
| ts_flags.emplace_back("--log_preallocate_segments=false"); |
| |
| ts_flags.insert(ts_flags.end(), extra_ts_flags.begin(), extra_ts_flags.end()); |
| |
| NO_FATALS(CreateCluster("raft_consensus-itest-crashy-nodes-cluster", |
| std::move(ts_flags))); |
| } |
| |
| void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert) { |
| int crashes_to_cause = 3; |
| if (AllowSlowTests()) { |
| crashes_to_cause = 15; |
| } |
| |
| workload->set_num_replicas(FLAGS_num_replicas); |
| // Set a really high write timeout so that even in the presence of many failures we |
| // can verify an exact number of rows in the end, thanks to exactly once semantics. |
| workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */); |
| workload->set_num_write_threads(10); |
| workload->set_num_read_threads(2); |
| workload->Setup(); |
| workload->Start(); |
| |
| int num_crashes = 0; |
| while (num_crashes < crashes_to_cause && |
| workload->rows_inserted() < max_rows_to_insert) { |
| num_crashes += RestartAnyCrashedTabletServers(); |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| // Writers are likely ongoing. To have some chance of completing all writes, |
| // restart the tablets servers, otherwise they'll keep crashing and the writes |
| // can never complete. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| vector<string>* flags = ts->mutable_flags(); |
| bool removed_flag = false; |
| for (auto it = flags->begin(); it != flags->end(); ++it) { |
| if (HasPrefixString(*it, "--fault_crash")) { |
| flags->erase(it); |
| removed_flag = true; |
| break; |
| } |
| } |
| ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i |
| << "\nFlags:\n" << *flags; |
| ts->Shutdown(); |
| ASSERT_OK(ts->Restart()); |
| } |
| |
| workload->StopAndJoin(); |
| |
| // Ensure that the replicas converge. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(workload->table_name(), |
| ClusterVerifier::EXACTLY, |
| workload->rows_inserted() - workload->rows_deleted())); |
| } |
| |
| void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) { |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" |
| }; |
| |
| FLAGS_num_replicas = 3; |
| FLAGS_num_tablet_servers = 3; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| // Kill all the servers but one. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| |
| // Elect server 2 as leader and wait for log index 1 to propagate to all servers. |
| ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| *replica_ts = tservers[0]; |
| LOG(INFO) << "================================== Cluster setup complete."; |
| } |
| |
| // Test that we can retrieve the permanent uuid of a server running |
| // consensus service via RPC. |
| TEST_F(RaftConsensusITest, TestGetPermanentUuid) { |
| NO_FATALS(BuildAndStart()); |
| |
| RaftPeerPB peer; |
| TServerDetails* leader = nullptr; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| peer.mutable_last_known_addr()->CopyFrom(leader->registration.rpc_addresses(0)); |
| const string expected_uuid = leader->instance_id.permanent_uuid(); |
| |
| rpc::MessengerBuilder builder("test builder"); |
| builder.set_num_reactors(1); |
| std::shared_ptr<rpc::Messenger> messenger; |
| ASSERT_OK(builder.Build(&messenger)); |
| |
| auto resolver = std::make_shared<DnsResolver>(); |
| ASSERT_OK(consensus::SetPermanentUuidForRemotePeer( |
| messenger, resolver.get(), &peer)); |
| ASSERT_EQ(expected_uuid, peer.permanent_uuid()); |
| } |
| |
| // TODO allow the scan to define an operation id, fetch the last id |
| // from the leader and then use that id to make the replica wait |
| // until it is done. This will avoid the sleeps below. |
| TEST_F(RaftConsensusITest, TestInsertAndMutateThroughConsensus) { |
| NO_FATALS(BuildAndStart()); |
| |
| int num_iters = AllowSlowTests() ? 10 : 1; |
| |
| for (int i = 0; i < num_iters; i++) { |
| InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread, |
| FLAGS_client_inserts_per_thread, |
| FLAGS_client_num_batches_per_thread); |
| } |
| NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters)); |
| } |
| |
| TEST_F(RaftConsensusITest, TestFailedOp) { |
| NO_FATALS(BuildAndStart()); |
| |
| // Wait until we have a stable leader. |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
| tablet_id_, 1)); |
| |
| WriteRequestPB req; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| RowOperationsPB* data = req.mutable_row_operations(); |
| data->set_rows("some gibberish!"); |
| |
| WriteResponsePB resp; |
| RpcController controller; |
| controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
| |
| TServerDetails* leader = nullptr; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| |
| ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller)); |
| ASSERT_TRUE(resp.has_error()); |
| |
| // Add a proper row so that we can verify that all of the replicas continue |
| // to process ops after a failure. Additionally, this allows us to wait for |
| // all of the replicas to finish processing ops before shutting down, |
| // avoiding a potential stall as we currently can't abort ops (see KUDU-341). |
| data->Clear(); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 0, 0, "original0", data); |
| |
| controller.Reset(); |
| controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
| |
| ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureShortDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| |
| NO_FATALS(AssertAllReplicasAgree(1)); |
| } |
| |
| // Inserts rows through consensus and also starts one delay injecting thread |
| // that steals consensus peer locks for a while. This is meant to test that |
| // even with timeouts and repeated requests consensus still works. |
| TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) { |
| NO_FATALS(BuildAndStart()); |
| |
| if (500 == FLAGS_client_inserts_per_thread) { |
| if (AllowSlowTests()) { |
| FLAGS_client_inserts_per_thread = FLAGS_client_inserts_per_thread * 10; |
| FLAGS_client_num_batches_per_thread = FLAGS_client_num_batches_per_thread * 10; |
| } |
| } |
| |
| int num_threads = FLAGS_num_client_threads; |
| for (int i = 0; i < num_threads; i++) { |
| threads_.emplace_back([this, i]() { |
| this->InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread, |
| FLAGS_client_inserts_per_thread, |
| FLAGS_client_num_batches_per_thread); |
| }); |
| } |
| for (int i = 0; i < FLAGS_num_replicas; i++) { |
| auto* ts = cluster_->tablet_server(i); |
| threads_.emplace_back([this, ts]() { |
| this->DelayInjectorThread(ts, kConsensusRpcTimeoutForTests); |
| }); |
| } |
| for (auto& t : threads_) { |
| t.join(); |
| } |
| |
| NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads)); |
| } |
| |
| TEST_F(RaftConsensusITest, TestInsertOnNonLeader) { |
| NO_FATALS(BuildAndStart()); |
| |
| // Wait for the initial leader election to complete. |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
| tablet_id_, 1)); |
| |
| // Manually construct a write RPC to a replica and make sure it responds |
| // with the correct error code. |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController rpc; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal, |
| "hello world via RPC", req.mutable_row_operations()); |
| |
| // Get the leader. |
| vector<TServerDetails*> followers; |
| GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
| |
| ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error().status()); |
| EXPECT_TRUE(s.IsIllegalState()); |
| ASSERT_STR_CONTAINS(s.ToString(), |
| "is not leader of this config: current role FOLLOWER"); |
| // TODO(unknown): need to change the error code to be something like REPLICA_NOT_LEADER |
| // so that the client can properly handle this case! plumbing this is a little difficult |
| // so not addressing at the moment. |
| NO_FATALS(AssertAllReplicasAgree(0)); |
| } |
| |
| class RaftConsensusParamEncryptionITest : |
| public RaftConsensusITest, |
| public ::testing::WithParamInterface<bool> { |
| }; |
| INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, RaftConsensusParamEncryptionITest, |
| ::testing::Values(false, true)); |
| |
| // Test that when a follower is stopped for a long time, the log cache |
| // properly evicts operations, but still allows the follower to catch |
| // up when it comes back. |
| // |
| // Also asserts that the other replicas retain logs for the stopped |
| // follower to catch up from. |
| TEST_P(RaftConsensusParamEncryptionITest, TestCatchupAfterOpsEvicted) { |
| vector<string> kTsFlags = { |
| "--log_cache_size_limit_mb=1", |
| "--consensus_max_batch_size_bytes=500000", |
| // Use short and synchronous rolls so that we can test log segment retention. |
| "--log_segment_size_mb=1", |
| "--log_async_preallocate_segments=false", |
| // Run the maintenance manager frequently and flush quickly, |
| // so that we don't have to wait long for GC. |
| "--maintenance_manager_polling_interval_ms=100", |
| "--flush_threshold_secs=3", |
| // We write 128KB cells in this test, so bump the limit. |
| "--max_cell_size_bytes=1000000", |
| // And disable WAL compression so the 128KB cells don't get compressed away. |
| "--log_compression_codec=no_compression" |
| }; |
| |
| if (GetParam()) { |
| // We need to enable encryption both in the mini-cluster and in the current |
| // process, as both of them access encrypted files. |
| SetEncryptionFlags(true); |
| kTsFlags.emplace_back("--encrypt_data_at_rest=true"); |
| string server_key; |
| string server_key_iv; |
| string server_key_version; |
| GetEncryptionKey(&server_key, &server_key_iv, &server_key_version); |
| kTsFlags.emplace_back("--test_server_key=" + server_key); |
| kTsFlags.emplace_back("--test_server_key_iv=" + server_key_iv); |
| kTsFlags.emplace_back("--test_server_key_version=" + server_key_version); |
| } |
| |
| NO_FATALS(BuildAndStart(kTsFlags)); |
| TServerDetails* replica = (*tablet_replicas_.begin()).second; |
| ASSERT_TRUE(replica != nullptr); |
| ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
| |
| // Pause a replica |
| ASSERT_OK(replica_ets->Pause()); |
| LOG(INFO)<< "Paused one of the replicas, starting to write."; |
| |
| // Insert 5MB worth of data. |
| const int kNumWrites = 40; |
| NO_FATALS(Write128KOpsToLeader(kNumWrites)); |
| |
| // Sleep a bit to give the maintenance manager time to GC logs, if it were |
| // going to. |
| SleepFor(MonoDelta::FromSeconds(1)); |
| |
| // Check that the leader and non-paused follower have not GCed any logs (since |
| // the third peer needs them to catch up). |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| int num_wals = inspect_->CountFilesInWALDirForTS(i, tablet_id_, "wal-*"); |
| if (cluster_->tablet_server(i) == replica_ets) { |
| ASSERT_EQ(1, num_wals) << "Replica should have only one segment"; |
| } else { |
| ASSERT_EQ(6, num_wals) |
| << "Other nodes should retain segments for the frozen replica to catch up"; |
| } |
| } |
| |
| // Now unpause the replica, the lagging replica should eventually catch back up. |
| ASSERT_OK(replica_ets->Resume()); |
| NO_FATALS(AssertAllReplicasAgree(kNumWrites)); |
| |
| // Once the follower has caught up, all replicas should eventually GC the earlier |
| // log segments that they were retaining. |
| ASSERT_EVENTUALLY([&]() { |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| SCOPED_TRACE(Substitute("TS $0", i)); |
| int num_wals = inspect_->CountFilesInWALDirForTS(i, tablet_id_, "wal-*"); |
| ASSERT_EQ(1, num_wals); |
| } |
| }); |
| } |
| |
| // Test that the leader doesn't crash if one of its followers has |
| // fallen behind so far that the logs necessary to catch it up |
| // have been GCed. |
| // |
| // In a real cluster, this will eventually cause the follower to be |
| // evicted/replaced. In any case, the leader should not crash. |
| // |
| // We also ensure that, when the leader stops writing to the follower, |
| // the follower won't disturb the other nodes when it attempts to elect |
| // itself. |
| // |
| // This is a regression test for KUDU-775 and KUDU-562. |
| TEST_P(RaftConsensusParamEncryptionITest, TestFollowerFallsBehindLeaderGC) { |
| vector<string> ts_flags = { |
| // Disable follower eviction to maintain the original intent of this test. |
| "--evict_failed_followers=false", |
| }; |
| if (GetParam()) { |
| // We need to enable encryption both in the mini-cluster and in the current |
| // process, as both of them access encrypted files. |
| ts_flags.emplace_back("--encrypt_data_at_rest=true"); |
| FLAGS_encrypt_data_at_rest = true; |
| } |
| AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC(). |
| NO_FATALS(BuildAndStart(ts_flags)); |
| |
| string leader_uuid; |
| int64_t orig_term; |
| string follower_uuid; |
| NO_FATALS(CauseFollowerToFallBehindLogGC( |
| tablet_servers_, &leader_uuid, &orig_term, &follower_uuid)); |
| SCOPED_TRACE(Substitute("leader: $0 follower: $1", leader_uuid, follower_uuid)); |
| |
| // Wait for remaining majority to agree. |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(3, active_tablet_servers.size()); |
| ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid)); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, tablet_id_, |
| 1)); |
| |
| if (AllowSlowTests()) { |
| // Sleep long enough that the "abandoned" server's leader election interval |
| // will trigger several times. Then, verify that the term has not increased |
| // on any of the servers. |
| // This ensures that the other servers properly reject the pre-election requests |
| // from the abandoned node, and that the abandoned node doesn't bump its term |
| // either, since that would cause spurious leader elections upon the node coming back |
| // to life. |
| SleepFor(MonoDelta::FromSeconds(5)); |
| |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| SCOPED_TRACE(ts->uuid()); |
| int64_t term_from_metric = -1; |
| ASSERT_OK(GetTermMetricValue(ts, &term_from_metric)); |
| ASSERT_EQ(term_from_metric, orig_term); |
| } |
| OpId op_id; |
| TServerDetails* leader = tablet_servers_[leader_uuid]; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, |
| MonoDelta::FromSeconds(10), &op_id)); |
| ASSERT_EQ(orig_term, op_id.term()) |
| << "expected the leader to have not advanced terms but has op " << op_id; |
| } |
| } |
| |
| // This test starts several tablet servers, and configures them with |
| // fault injection so that the leaders frequently crash just before |
| // sending RPCs to followers. |
| // |
| // This can result in various scenarios where leaders crash right after |
| // being elected and never succeed in replicating their first operation. |
| // For example, KUDU-783 reproduces from this test approximately 5% of the |
| // time on a slow-test debug build. |
| TEST_F(RaftConsensusITest, InsertUniqueKeysWithCrashyNodes) { |
| NO_FATALS(CreateClusterForCrashyNodesTests()); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_write_batch_size(1); |
| |
| NO_FATALS(DoTestCrashyNodes(&workload, 100)); |
| } |
| |
| // The same crashy nodes test as above but inserts many duplicate keys. |
| // This emulates cases where there are many duplicate keys which, due to two phase |
| // locking, may cause deadlocks and other anomalies that cannot be observed when |
| // keys are unique. |
| TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) { |
| NO_FATALS(CreateClusterForCrashyNodesTests()); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS); |
| // Increase the number of rows per batch to get a higher chance of key collision. |
| workload.set_write_batch_size(3); |
| |
| NO_FATALS(DoTestCrashyNodes(&workload, 300)); |
| } |
| |
| // The same crashy nodes test as above but the keys will be deleted after insertion. |
| TEST_F(RaftConsensusITest, InsertAndDeleteWithCrashyNodes) { |
| vector<string> extra_ts_flags = { |
| "--flush_threshold_mb=0", |
| "--flush_threshold_secs=1", |
| "--maintenance_manager_polling_interval_ms=10", |
| "--heartbeat_interval_ms=10", |
| "--update_tablet_stats_interval_ms=10", |
| }; |
| NO_FATALS(CreateClusterForCrashyNodesTests(std::move(extra_ts_flags))); |
| |
| // If the AllowSlowTests is true, test the scenario that deleting data on DRS. |
| // Otherwise, test deleting data on MRS. |
| int32_t write_interval_millis = 0; |
| int32_t write_batch_size = 5; |
| if (AllowSlowTests()) { |
| // Wait for MRS to be flushed. |
| write_interval_millis = 1000; |
| // Decrease the number of rows per batch to generate more DRSs. |
| write_batch_size = 1; |
| } |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS_WITH_DELETE); |
| workload.set_write_interval_millis(write_interval_millis); |
| workload.set_write_batch_size(write_batch_size); |
| NO_FATALS(DoTestCrashyNodes(&workload, 100)); |
| } |
| |
| TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) { |
| int kNumElections = FLAGS_num_replicas; |
| |
| if (AllowSlowTests()) { |
| FLAGS_num_tablet_servers = 7; |
| FLAGS_num_replicas = 7; |
| kNumElections = 3 * FLAGS_num_replicas; |
| } |
| |
| // Reset consensus rpc timeout to the default value or the election might fail often. |
| FLAGS_consensus_rpc_timeout_ms = 1000; |
| |
| // Start a 7 node configuration cluster (since we can't bring leaders back we start with a |
| // higher replica count so that we kill more leaders). |
| |
| NO_FATALS(BuildAndStart()); |
| |
| OverrideFlagForSlowTests( |
| "client_inserts_per_thread", |
| Substitute("$0", (FLAGS_client_inserts_per_thread * 100))); |
| OverrideFlagForSlowTests( |
| "client_num_batches_per_thread", |
| Substitute("$0", (FLAGS_client_num_batches_per_thread * 100))); |
| |
| int num_threads = FLAGS_num_client_threads; |
| int total_num_rows = num_threads * FLAGS_client_inserts_per_thread; |
| |
| // We create 2 * (kNumReplicas - 1) latches so that we kill the same node at least |
| // twice. |
| vector<unique_ptr<CountDownLatch>> latches; |
| latches.reserve(kNumElections); |
| for (int i = 1; i < kNumElections; i++) { |
| latches.emplace_back(new CountDownLatch((i * total_num_rows) / kNumElections)); |
| } |
| |
| for (int i = 0; i < num_threads; i++) { |
| threads_.emplace_back([this, i, &latches]() { |
| this->InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread, |
| FLAGS_client_inserts_per_thread, |
| FLAGS_client_num_batches_per_thread, latches); |
| }); |
| } |
| |
| for (const auto& latch : latches) { |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| latch->Wait(); |
| StopOrKillLeaderAndElectNewOne(); |
| } |
| |
| for (auto& t : threads_) { |
| t.join(); |
| } |
| |
| NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads)); |
| } |
| |
| // Regression test for KUDU-597, an issue where we could mis-order operations on |
| // a machine if the following sequence occurred: |
| // 1) Replica is a FOLLOWER |
| // 2) A client request hits the machine |
| // 3) It receives some operations from the current leader |
| // 4) It gets elected LEADER |
| // In this scenario, it would incorrectly sequence the client request's PREPARE phase |
| // before the operations received in step (3), even though the correct behavior would be |
| // to either reject them or sequence them after those operations, because the operation |
| // index is higher. |
| // |
| // The test works by setting up three replicas and manually hammering them with write |
| // requests targeting a single row. If the bug exists, then OpOrderVerifier |
| // will trigger an assertion because the prepare order and the op indexes will become |
| // misaligned. |
| TEST_F(RaftConsensusITest, TestKUDU_597) { |
| FLAGS_num_replicas = 3; |
| FLAGS_num_tablet_servers = 3; |
| NO_FATALS(BuildAndStart()); |
| |
| AtomicBool finish(false); |
| for (int i = 0; i < FLAGS_num_tablet_servers; i++) { |
| threads_.emplace_back([this, i, &finish]() { |
| this->StubbornlyWriteSameRowThread(i, &finish); |
| }); |
| } |
| SCOPED_CLEANUP({ |
| finish.Store(true); |
| for (auto& t : threads_) { |
| t.join(); |
| } |
| }); |
| |
| const int num_loops = AllowSlowTests() ? 10 : 1; |
| for (int i = 0; i < num_loops; i++) { |
| StopOrKillLeaderAndElectNewOne(); |
| SleepFor(MonoDelta::FromSeconds(1)); |
| ASSERT_OK(CheckTabletServersAreAlive(FLAGS_num_tablet_servers)); |
| } |
| } |
| |
| // Regression test for KUDU-1775: when a replica is restarted, and the first |
| // request it receives from a leader results in a LMP mismatch error, the |
| // replica should still respond with the correct 'last_committed_idx'. |
| TEST_F(RaftConsensusITest, TestLMPMismatchOnRestartedReplica) { |
| TServerDetails* replica_ts; |
| NO_FATALS(SetupSingleReplicaTest(&replica_ts)); |
| auto* replica_ets = cluster_->tablet_server_by_uuid(replica_ts->uuid()); |
| |
| ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get()); |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| |
| req.set_tablet_id(tablet_id_); |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_caller_uuid("fake_caller"); |
| req.set_caller_term(2); |
| req.set_all_replicated_index(0); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); |
| |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| int64_t base_ts = GetTimestampOnServer(replica_ts); |
| |
| // Send operations 2.1 through 2.3, committing through 2.2. |
| AddOp(MakeOpId(2, 1), base_ts, &req); |
| AddOp(MakeOpId(2, 2), base_ts, &req); |
| AddOp(MakeOpId(2, 3), base_ts, &req); |
| req.set_committed_index(2); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| // The COMMIT messages end up in the WAL asynchronously, so loop reading the |
| // tablet server's WAL until it shows up. |
| int replica_idx = cluster_->tablet_server_index_by_uuid(replica_ets->uuid()); |
| ASSERT_EVENTUALLY([&]() { |
| LogVerifier lv(cluster_.get()); |
| OpId commit; |
| ASSERT_OK(lv.ScanForHighestCommittedOpIdInLog(replica_idx, tablet_id_, &commit)); |
| ASSERT_EQ("2.2", OpIdToString(commit)); |
| }); |
| |
| // Restart the replica. |
| replica_ets->Shutdown(); |
| ASSERT_OK(replica_ets->Restart()); |
| |
| // Send an operation 3.4 with preceding OpId 3.3. |
| // We expect an LMP mismatch, since the replica has operation 2.3. |
| // We use 'ASSERT_EVENTUALLY' here because the replica |
| // may need a few retries while it's in BOOTSTRAPPING state. |
| req.set_caller_term(3); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(3, 3)); |
| req.clear_ops(); |
| AddOp(MakeOpId(3, 4), base_ts, &req); |
| ASSERT_EVENTUALLY([&]() { |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_EQ(resp.status().error().code(), |
| consensus::ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH) |
| << SecureDebugString(resp); |
| }); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| EXPECT_EQ(2, resp.status().last_committed_idx()); |
| EXPECT_EQ("0.0", OpIdToString(resp.status().last_received_current_leader())); |
| // Even though the replica previously received operations through 2.3, the LMP mismatch |
| // above causes us to truncate operation 2.3, so 2.2 remains. |
| EXPECT_EQ("2.2", OpIdToString(resp.status().last_received())); |
| } |
| |
| // Test a scenario where a replica has pending operations with lock |
| // dependencies on each other: |
| // 2.2: UPSERT row 1 |
| // 2.3: UPSERT row 1 |
| // 2.4: UPSERT row 1 |
| // ...and a new leader tries to abort 2.4 in order to replace it with a new |
| // operation. Because the operations have a lock dependency, operation 2.4 |
| // will be 'stuck' in the Prepare queue. This verifies that we can abort an |
| // operation even if it's stuck in the queue. |
| TEST_F(RaftConsensusITest, TestReplaceOperationStuckInPrepareQueue) { |
| TServerDetails* replica_ts; |
| NO_FATALS(SetupSingleReplicaTest(&replica_ts)); |
| |
| ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get()); |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| |
| req.set_tablet_id(tablet_id_); |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_caller_uuid("fake_caller"); |
| req.set_caller_term(2); |
| req.set_all_replicated_index(0); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); |
| int64_t base_ts = GetTimestampOnServer(replica_ts); |
| AddOpWithTypeAndKey(MakeOpId(2, 2), base_ts, RowOperationsPB::UPSERT, 1, &req); |
| AddOpWithTypeAndKey(MakeOpId(2, 3), base_ts, RowOperationsPB::UPSERT, 1, &req); |
| AddOpWithTypeAndKey(MakeOpId(2, 4), base_ts, RowOperationsPB::UPSERT, 1, &req); |
| req.set_committed_index(2); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| // Replace operation 2.4 with 3.4, add 3.5 (upsert of a new key) |
| req.set_caller_term(3); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 3)); |
| req.clear_ops(); |
| AddOpWithTypeAndKey(MakeOpId(3, 4), base_ts, RowOperationsPB::UPSERT, 1, &req); |
| AddOpWithTypeAndKey(MakeOpId(3, 5), base_ts, RowOperationsPB::UPSERT, 2, &req); |
| rpc.Reset(); |
| rpc.set_timeout(MonoDelta::FromSeconds(5)); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| // Commit all ops. |
| req.clear_ops(); |
| req.set_committed_index(5); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(3, 5)); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| // Ensure we can read the data. |
| // We need to ASSERT_EVENTUALLY here because otherwise it's possible to read the old value |
| // of row '1', if the operation is still in flight. |
| ASSERT_EVENTUALLY([&]() { |
| vector<string> results; |
| NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results)); |
| ASSERT_EQ("(int32 key=1, int32 int_val=3, string string_val=\"term: 3 index: 4\")", |
| results[0]); |
| ASSERT_EQ("(int32 key=2, int32 int_val=3, string string_val=\"term: 3 index: 5\")", |
| results[1]); |
| }); |
| } |
| |
| // Regression test for KUDU-644: |
| // Triggers some complicated scenarios on the replica involving aborting and |
| // replacing ops. |
| TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { |
| TServerDetails* replica_ts; |
| NO_FATALS(SetupSingleReplicaTest(&replica_ts)); |
| |
| // Check that the 'term' metric is correctly exposed. |
| { |
| int64_t term_from_metric = -1; |
| ASSERT_OK(GetTermMetricValue(cluster_->tablet_server_by_uuid(replica_ts->uuid()), |
| &term_from_metric)); |
| ASSERT_EQ(term_from_metric, 1); |
| } |
| |
| ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get()); |
| |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| |
| // Send a simple request with no ops. |
| req.set_tablet_id(tablet_id_); |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_caller_uuid("fake_caller"); |
| req.set_caller_term(2); |
| req.set_all_replicated_index(0); |
| req.set_committed_index(1); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); |
| |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| // Send some operations, but don't advance the commit index. |
| // They should not commit. |
| int64_t base_ts = GetTimestampOnServer(replica_ts); |
| AddOp(MakeOpId(2, 2), base_ts, &req); |
| AddOp(MakeOpId(2, 3), base_ts, &req); |
| AddOp(MakeOpId(2, 4), base_ts, &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| // We shouldn't read anything yet, because the ops should be pending. |
| { |
| vector<string> results; |
| NO_FATALS(ScanReplica(replica_ts->tserver_proxy.get(), &results)); |
| ASSERT_EQ(0, results.size()) << results; |
| } |
| |
| // Send op 2.6, but set preceding OpId to 2.4. This is an invalid |
| // request, and the replica should reject it. |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| req.clear_ops(); |
| AddOp(MakeOpId(2, 6), base_ts, &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp); |
| ASSERT_EQ(resp.error().status().message(), |
| "New operation's index does not follow the previous op's index. " |
| "Current: 2.6. Previous: 2.4"); |
| |
| resp.Clear(); |
| req.clear_ops(); |
| // Send ops 3.5 and 2.6, then commit up to index 6, the replica |
| // should fail because of the out-of-order terms. |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| AddOp(MakeOpId(3, 5), base_ts, &req); |
| AddOp(MakeOpId(2, 6), base_ts, &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp); |
| ASSERT_EQ(resp.error().status().message(), |
| "New operation's term is not >= than the previous op's term." |
| " Current: 2.6. Previous: 3.5"); |
| |
| // Regression test for KUDU-639: if we send a valid request, but the |
| // current commit index is higher than the data we're sending, we shouldn't |
| // commit anything higher than the last op sent by the leader. |
| // |
| // To test, we re-send operation 2.3, with the correct preceding ID 2.2, |
| // but we set the committed index to 2.4. This should only commit |
| // 2.2 and 2.3. |
| resp.Clear(); |
| req.clear_ops(); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2)); |
| AddOp(MakeOpId(2, 3), base_ts, &req); |
| req.set_committed_index(4); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| // Verify only 2.2 and 2.3 are committed. |
| { |
| vector<string> results; |
| NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results)); |
| ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2"); |
| ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3"); |
| } |
| |
| resp.Clear(); |
| req.clear_ops(); |
| // Now send some more ops, and commit the earlier ones. |
| req.set_committed_index(4); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| AddOp(MakeOpId(2, 5), base_ts, &req); |
| AddOp(MakeOpId(2, 6), base_ts, &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| |
| // Verify they are committed. |
| { |
| vector<string> results; |
| NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 3, &results)); |
| ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2"); |
| ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3"); |
| ASSERT_STR_CONTAINS(results[2], "term: 2 index: 4"); |
| } |
| |
| // At this point, we still have two operations which aren't committed. If we |
| // try to perform a snapshot-consistent scan, we should time out rather than |
| // hanging the RPC service thread. |
| { |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromMilliseconds(100)); |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(tablet_id_); |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Send the call. We expect to get a timeout passed back from the server side |
| // (i.e. not an RPC timeout) |
| req.set_batch_size_bytes(0); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(replica_ts->tserver_proxy->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| string err_str = StatusFromPB(resp.error().status()).ToString(); |
| ASSERT_STR_CONTAINS(err_str, "Timed out waiting for ts:"); |
| ASSERT_STR_CONTAINS(err_str, "to be safe"); |
| } |
| |
| resp.Clear(); |
| req.clear_ops(); |
| int leader_term = 2; |
| const int kNumTerms = AllowSlowTests() ? 10000 : 100; |
| while (leader_term < kNumTerms) { |
| leader_term++; |
| // Now pretend to be a new leader (term 3) and replace the earlier ops |
| // without committing the new replacements. |
| req.set_caller_term(leader_term); |
| req.set_caller_uuid("new_leader"); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| req.clear_ops(); |
| AddOp(MakeOpId(leader_term, 5), base_ts, &req); |
| AddOp(MakeOpId(leader_term, 6), base_ts, &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << "Req: " << SecureShortDebugString(req) |
| << " Resp: " << SecureDebugString(resp); |
| } |
| |
| // Send an empty request from the newest term which should commit |
| // the earlier ops. |
| { |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6)); |
| req.set_committed_index(6); |
| req.clear_ops(); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| } |
| |
| // Verify the new rows are committed. |
| { |
| vector<string> results; |
| NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 5, &results)); |
| SCOPED_TRACE(results); |
| ASSERT_STR_CONTAINS(results[3], Substitute("term: $0 index: 5", leader_term)); |
| ASSERT_STR_CONTAINS(results[4], Substitute("term: $0 index: 6", leader_term)); |
| } |
| } |
| |
| // Basic test of adding and removing servers from a configuration. |
| TEST_F(RaftConsensusITest, TestAddRemoveServer) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| "--master_add_server_when_underreplicated=false", |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| const string& leader_uuid = tservers[0]->uuid(); |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, kTimeout)); |
| |
| // Make sure the server rejects removal of itself from the configuration. |
| Status s = RemoveServer(leader_tserver, tablet_id_, leader_tserver, kTimeout); |
| ASSERT_TRUE(s.IsInvalidArgument()) << "Should not be able to remove self from config: " |
| << s.ToString(); |
| |
| // Insert the row that we will update throughout the test. |
| ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_, RowOperationsPB::INSERT, |
| kTestRowKey, kTestRowIntVal, "initial insert", kTimeout)); |
| |
| // Kill the master, so we can change the config without interference. |
| cluster_->master()->Shutdown(); |
| |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| |
| // Do majority correctness check for 3 servers. |
| NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
| OpId opid; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout, |
| &opid)); |
| int64_t cur_log_index = opid.index(); |
| |
| // Go from 3 tablet servers down to 1 in the configuration. |
| vector<int> remove_list = { 2, 1 }; |
| for (int to_remove_idx : remove_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas"; |
| |
| TServerDetails* tserver_to_remove = tservers[to_remove_idx]; |
| LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid(); |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, kTimeout)); |
| ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid())); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Do majority correctness check for each incremental decrease. |
| NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout, |
| &opid)); |
| cur_log_index = opid.index(); |
| } |
| |
| // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration. |
| vector<int> add_list = { 1, 2 }; |
| for (int to_add_idx : add_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas"; |
| |
| TServerDetails* tserver_to_add = tservers[to_add_idx]; |
| LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, |
| RaftPeerPB::VOTER, kTimeout)); |
| InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Do majority correctness check for each incremental increase. |
| NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout, |
| &opid)); |
| cur_log_index = opid.index(); |
| } |
| } |
| |
| // Regression test for KUDU-1169: a crash when a Config Change operation is replaced |
| // by a later leader. |
| TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) { |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| "--master_add_server_when_underreplicated=false", |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| |
| TabletServerMap original_followers = tablet_servers_; |
| ASSERT_EQ(1, original_followers.erase(leader_tserver->uuid())); |
| |
| |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| // Shut down servers 1 and 2, so that server 0 can't replicate anything. |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| // Now try to replicate a ChangeConfig operation. This should get stuck and time out |
| // because the server can't replicate any operations. |
| Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1], |
| MonoDelta::FromSeconds(1), -1); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| // Pause the leader, and restart the other servers. |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Pause()); |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Restart()); |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Restart()); |
| |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), original_followers, tablet_id_, 1)); |
| |
| // Elect one of the other servers. |
| ASSERT_OK(StartElection(tservers[1], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitUntilLeader(tservers[1], tablet_id_, MonoDelta::FromSeconds(10))); |
| leader_tserver = tservers[1]; |
| |
| // Resume the original leader. Its change-config operation will now be aborted |
| // since it was never replicated to the majority, and the new leader will have |
| // replaced the operation. |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Resume()); |
| |
| // Insert some data and verify that it propagates to all servers. |
| NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1)); |
| NO_FATALS(AssertAllReplicasAgree(10)); |
| |
| // Try another config change. |
| // This acts as a regression test for KUDU-1338, in which aborting the original |
| // config change didn't properly unset the 'pending' configuration. |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tservers[2], |
| MonoDelta::FromSeconds(5), -1)); |
| NO_FATALS(InsertTestRowsRemoteThread(10, 10, 1)); |
| } |
| |
| // Test the atomic CAS arguments to ChangeConfig() add server and remove server. |
| TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) { |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| "--master_add_server_when_underreplicated=false", |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| int64_t cur_log_index = 1; |
| |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| |
| TServerDetails* follower_ts = tservers[2]; |
| |
| // Initial committed config should have opid_index == -1. |
| // Server should reject request to change config from opid other than this. |
| TabletServerErrorPB::Code error_code; |
| Status s = RemoveServer(leader_tserver, tablet_id_, follower_ts, |
| MonoDelta::FromSeconds(10), 7, &error_code); |
| ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code); |
| ASSERT_STR_CONTAINS(s.ToString(), "of 7 but the committed config has opid_index of -1"); |
| |
| // Specifying the correct committed opid index should work. |
| int64_t committed_opid_index = -1; |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, follower_ts, |
| MonoDelta::FromSeconds(10), committed_opid_index)); |
| |
| ASSERT_EQ(1, active_tablet_servers.erase(follower_ts->uuid())); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Now, add the server back. Again, specifying something other than the |
| // latest committed_opid_index should fail. |
| s = AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER, |
| MonoDelta::FromSeconds(10), {}, -1, &error_code); |
| ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code); |
| ASSERT_STR_CONTAINS(s.ToString(), "of -1 but the committed config has opid_index of 2"); |
| |
| // Specifying the correct committed opid index should work. |
| // The previous config change op is the latest entry in the log. |
| committed_opid_index = cur_log_index; |
| ASSERT_OK(AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER, |
| MonoDelta::FromSeconds(10), {}, committed_opid_index)); |
| |
| InsertOrDie(&active_tablet_servers, follower_ts->uuid(), follower_ts); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, ++cur_log_index)); |
| } |
| |
| // Writes test rows in ascending order to a single tablet server. |
| // Essentially a poor-man's version of TestWorkload that only operates on a |
| // single tablet. Does not batch, does not tolerate timeouts, and does not |
| // interact with the Master. 'rows_inserted' is used to determine row id and is |
| // incremented prior to each successful insert. Since a write failure results in |
| // a crash, as long as there is no crash then 'rows_inserted' will have a |
| // correct count at the end of the run. |
| // Crashes on any failure, so 'write_timeout' should be high. |
| static void DoWriteTestRows(const TServerDetails* leader_tserver, |
| const string& tablet_id, |
| const MonoDelta& write_timeout, |
| AtomicInt<int32_t>* rows_inserted, |
| const AtomicBool* finish) { |
| while (!finish->Load()) { |
| int row_key = rows_inserted->Increment(); |
| CHECK_OK(WriteSimpleTestRow(leader_tserver, tablet_id, RowOperationsPB::INSERT, |
| row_key, row_key, Substitute("key=$0", row_key), |
| write_timeout)); |
| } |
| } |
| |
| // Test that config change works while running a workload. |
| TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) { |
| const auto kTimeout = MonoDelta::FromSeconds(10); |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| "--master_add_server_when_underreplicated=false", |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_tserver, tablet_id_, |
| consensus::COMMITTED_OPID, kTimeout)); |
| |
| // Start a write workload. |
| LOG(INFO) << "Starting write workload..."; |
| vector<thread> threads; |
| threads.reserve(FLAGS_num_client_threads); |
| AtomicInt<int32_t> rows_inserted(0); |
| AtomicBool finish(false); |
| for (auto i = 0; i < FLAGS_num_client_threads; i++) { |
| threads.emplace_back([this, leader_tserver, kTimeout, &rows_inserted, &finish]() { |
| DoWriteTestRows(leader_tserver, this->tablet_id_, kTimeout, &rows_inserted, &finish); |
| }); |
| } |
| auto thread_join_func = [&]() { |
| for (auto& t : threads) { |
| t.join(); |
| } |
| }; |
| auto thread_joiner = MakeScopedCleanup(thread_join_func); |
| |
| // The scenario modifies the Raft configuration while adding and removing |
| // tablet replicas. To successfully modify the configuration, the leader has |
| // to commit at least one operation on its current term. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_tserver, tablet_id_, |
| consensus::COMMITTED_OPID, kTimeout)); |
| |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| LOG(INFO) << "Removing servers..."; |
| // Go from 3 tablet servers down to 1 in the configuration. |
| vector<int> remove_list = { 2, 1 }; |
| for (int to_remove_idx : remove_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas"; |
| |
| TServerDetails* tserver_to_remove = tservers[to_remove_idx]; |
| LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid(); |
| ASSERT_OK(WaitForLeaderWithCommittedOp(tablet_id_, kTimeout, &leader_tserver)); |
| ASSERT_OK(RemoveServer( |
| leader_tserver, tablet_id_, tserver_to_remove, kTimeout)); |
| ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid())); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader_tserver, tablet_id_, |
| kTimeout)); |
| } |
| |
| LOG(INFO) << "Adding servers..."; |
| // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration. |
| vector<int> add_list = { 1, 2 }; |
| for (int to_add_idx : add_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas"; |
| |
| TServerDetails* tserver_to_add = tservers[to_add_idx]; |
| LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(WaitForLeaderWithCommittedOp(tablet_id_, kTimeout, &leader_tserver)); |
| ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, |
| RaftPeerPB::VOTER, kTimeout)); |
| InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader_tserver, tablet_id_, |
| kTimeout)); |
| } |
| |
| LOG(INFO) << "Joining writer threads..."; |
| finish.Store(true); |
| thread_join_func(); |
| thread_joiner.cancel(); |
| |
| LOG(INFO) << "Waiting for replicas to agree..."; |
| // Wait for all servers to replicate everything up through the last write op. |
| // Since we don't batch, there should be at least # rows inserted log entries, |
| // plus the initial leader's no-op, plus 2 for the removed servers, plus 2 for |
| // the added servers for a total of 5. |
| int min_log_index = rows_inserted.Load() + 5; |
| ASSERT_OK(WaitForServersToAgree( |
| kTimeout, active_tablet_servers, tablet_id_, min_log_index)); |
| |
| LOG(INFO) << "Number of rows inserted: " << rows_inserted.Load(); |
| NO_FATALS(AssertAllReplicasAgree(rows_inserted.Load())); |
| } |
| |
| TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) { |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| const vector<string> kMasterFlags = { |
| "--master_add_server_when_underreplicated=false", |
| "--allow_unsafe_replication_factor=true", |
| |
| // If running with the 3-4-3 replication scheme, the catalog manager removes |
| // excess replicas, so it's necessary to disable that default behavior |
| // since we want the newly added replica to stay. |
| "--catalog_manager_evict_excess_replicas=false", |
| }; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 2; |
| NO_FATALS(BuildAndStart({}, kMasterFlags)); |
| |
| LOG(INFO) << "Finding tablet leader and waiting for things to start..."; |
| string tablet_id = tablet_replicas_.begin()->first; |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| for (itest::TabletReplicaMap::const_iterator iter = tablet_replicas_.find(tablet_id); |
| iter != tablet_replicas_.end(); ++iter) { |
| InsertOrDie(&active_tablet_servers, iter->second->uuid(), iter->second); |
| } |
| |
| // Determine the server to add to the config. |
| string uuid_to_add; |
| for (const TabletServerMap::value_type& entry : tablet_servers_) { |
| if (!ContainsKey(active_tablet_servers, entry.second->uuid())) { |
| uuid_to_add = entry.second->uuid(); |
| } |
| } |
| ASSERT_FALSE(uuid_to_add.empty()); |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 2, tablet_id, timeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| |
| // Wait for initial NO_OP to be committed by the leader. |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id, timeout, &leader_ts)); |
| ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id, timeout)); |
| |
| // Change the config. |
| TServerDetails* tserver_to_add = tablet_servers_[uuid_to_add]; |
| LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(AddServer(leader_ts, tablet_id_, tserver_to_add, RaftPeerPB::VOTER, |
| timeout)); |
| ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 2)); |
| |
| // Wait for the master to be notified of the config change. |
| // It should continue to have the same leader, even without waiting. |
| LOG(INFO) << "Waiting for Master to see config change..."; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id, timeout, |
| DONT_WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| |
| // Change the config again. |
| LOG(INFO) << "Removing tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(RemoveServer(leader_ts, tablet_id_, tserver_to_add, timeout)); |
| active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_add->uuid())); |
| ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 3)); |
| |
| // Wait for the master to be notified of the removal. |
| LOG(INFO) << "Waiting for Master to see config change..."; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 2, tablet_id, timeout, |
| DONT_WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| } |
| |
| // Test that even with memory pressure, a replica will still commit pending |
| // operations that the leader has committed. |
| TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) { |
| // Enough operations to put us over our memory limit (defined below). |
| const int kNumOps = 10000; |
| |
| // Set up a 3-node configuration with only one live follower so that we can |
| // manipulate it directly via RPC. |
| const vector<string> kTsFlags = { |
| // Very low memory limit to ease testing. |
| // When using tcmalloc, we set it to 30MB, since we can get accurate process memory |
| // usage statistics. Otherwise, set to only 4MB, since we'll only be throttling based |
| // on our tracked memory. |
| // Since part of the point of the test is to have memory pressure, don't |
| // insist the block cache capacity is small compared to the memory pressure |
| // threshold. |
| "--force_block_cache_capacity", |
| #ifdef TCMALLOC_ENABLED |
| "--memory_limit_hard_bytes=30000000", |
| #else |
| "--memory_limit_hard_bytes=4194304", |
| #endif |
| "--enable_leader_failure_detection=false", |
| // Don't let op memory tracking get in the way. |
| "--tablet_transaction_memory_limit_mb=-1", |
| }; |
| |
| // If failure detection were on, a follower could be elected as leader after |
| // we kill the leader below. |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| // Elect server 2 as leader, then kill it and server 1, leaving behind |
| // server 0 as the sole follower. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| TServerDetails *replica_ts = tservers[0]; |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| // Pretend to be the leader and send a request to replicate some operations. |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_tablet_id(tablet_id_); |
| req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid()); |
| req.set_caller_term(1); |
| req.set_committed_index(1); |
| req.set_all_replicated_index(0); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); |
| int64_t base_ts = GetTimestampOnServer(replica_ts); |
| for (int i = 0; i < kNumOps; i++) { |
| AddOp(MakeOpId(1, 2 + i), base_ts, &req); |
| } |
| OpId last_opid = MakeOpId(1, 2 + kNumOps - 1); |
| ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc)); |
| |
| // At the time that the follower received our request it was still under the |
| // tiny memory limit defined above, so the request should have succeeded. |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| ASSERT_TRUE(resp.has_status()); |
| ASSERT_TRUE(resp.status().has_last_committed_idx()); |
| ASSERT_EQ(last_opid.index(), resp.status().last_received().index()); |
| ASSERT_EQ(1, resp.status().last_committed_idx()); |
| |
| // But no operations have been applied yet; there should be no data. |
| vector<string> rows; |
| WaitForRowCount(replica_ts->tserver_proxy.get(), 0, &rows); |
| |
| // Try again, but this time: |
| // 1. Replicate just one new operation. |
| // 2. Tell the follower that the previous set of operations were committed. |
| req.mutable_preceding_id()->CopyFrom(last_opid); |
| req.set_committed_index(last_opid.index()); |
| req.mutable_ops()->Clear(); |
| AddOp(MakeOpId(1, last_opid.index() + 1), base_ts, &req); |
| rpc.Reset(); |
| Status s = replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc); |
| |
| // Our memory limit was truly tiny, so we should be over it by now... |
| ASSERT_TRUE(s.IsRemoteError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Soft memory limit exceeded"); |
| |
| // ...but despite rejecting the request, we should have committed the |
| // previous set of operations. That is, we should be able to see those rows. |
| WaitForRowCount(replica_ts->tserver_proxy.get(), kNumOps, &rows); |
| } |
| |
| // Test that we can create (vivify) a new tablet via tablet copy. |
| TEST_F(RaftConsensusITest, TestAutoCreateReplica) { |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| "--log_cache_size_limit_mb=1", |
| "--log_segment_size_mb=1", |
| "--log_async_preallocate_segments=false", |
| "--flush_threshold_mb=1", |
| "--maintenance_manager_polling_interval_ms=300", |
| }; |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| "--allow_unsafe_replication_factor=true", |
| |
| // If running with the 3-4-3 replication scheme, the catalog manager removes |
| // excess replicas, so it's necessary to disable that default behavior |
| // since we want the newly added replica to stay. |
| "--catalog_manager_evict_excess_replicas=false", |
| }; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 2; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| // 50K is enough to cause flushes & log rolls. |
| const int num_rows_to_write = AllowSlowTests() ? 150000 : 50000; |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| TabletServerMap active_tablet_servers; |
| TabletServerMap::const_iterator iter = tablet_replicas_.find(tablet_id_); |
| TServerDetails* leader = iter->second; |
| TServerDetails* follower = (++iter)->second; |
| InsertOrDie(&active_tablet_servers, leader->uuid(), leader); |
| InsertOrDie(&active_tablet_servers, follower->uuid(), follower); |
| |
| TServerDetails* new_node = nullptr; |
| for (TServerDetails* ts : tservers) { |
| if (!ContainsKey(active_tablet_servers, ts->uuid())) { |
| new_node = ts; |
| break; |
| } |
| } |
| ASSERT_TRUE(new_node != nullptr); |
| |
| // Elect the leader (still only a consensus config size of 2). |
| ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, |
| tablet_id_, 1)); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_num_write_threads(10); |
| workload.set_num_read_threads(2); |
| workload.set_write_batch_size(100); |
| workload.Setup(); |
| |
| LOG(INFO) << "Starting write workload..."; |
| workload.Start(); |
| |
| while (true) { |
| int rows_inserted = workload.rows_inserted(); |
| if (rows_inserted >= num_rows_to_write) { |
| break; |
| } |
| LOG(INFO) << "Only inserted " << rows_inserted << " rows so far, sleeping for 100ms"; |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| |
| LOG(INFO) << "Adding tserver with uuid " << new_node->uuid() << " as VOTER..."; |
| ASSERT_OK(AddServer(leader, tablet_id_, new_node, RaftPeerPB::VOTER, |
| MonoDelta::FromSeconds(10))); |
| InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader, tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| |
| workload.StopAndJoin(); |
| int num_batches = workload.batches_completed(); |
| |
| LOG(INFO) << "Waiting for replicas to agree..."; |
| // Wait for all servers to replicate everything up through the last write op. |
| // Since we don't batch, there should be at least # rows inserted log entries, |
| // plus the initial leader's no-op, plus 1 for |
| // the added replica for a total == #rows + 2. |
| int min_log_index = num_batches + 2; |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(120), |
| active_tablet_servers, tablet_id_, |
| min_log_index)); |
| |
| int rows_inserted = workload.rows_inserted(); |
| LOG(INFO) << "Number of rows inserted: " << rows_inserted; |
| NO_FATALS(AssertAllReplicasAgree(rows_inserted)); |
| } |
| |
| TEST_F(RaftConsensusITest, TestMemoryRemainsConstantDespiteTwoDeadFollowers) { |
| const int64_t kMinRejections = 100; |
| const MonoDelta kMaxWaitTime = MonoDelta::FromSeconds(60); |
| |
| NO_FATALS(BuildAndStart({ |
| // Start the cluster with a low per-tablet op memory limit, so that the |
| // test can complete faster. |
| "--tablet_transaction_memory_limit_mb=2", |
| // Make the validator of 'RPC vs op memory size' happy. |
| "--rpc_max_message_size=2097152", |
| })); |
| |
| // Kill both followers. |
| TServerDetails* details; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &details)); |
| int num_shutdown = 0; |
| int leader_ts_idx = -1; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| if (ts->instance_id().permanent_uuid() != details->uuid()) { |
| ts->Shutdown(); |
| num_shutdown++; |
| } else { |
| leader_ts_idx = i; |
| } |
| } |
| ASSERT_EQ(2, num_shutdown); |
| ASSERT_NE(-1, leader_ts_idx); |
| |
| // Because the majority of the cluster is dead and because of this workload's |
| // timeout behavior, more and more wedged ops will accumulate in the leader. |
| // To prevent memory usage from skyrocketing, the leader will eventually |
| // reject new ops. That's what we're testing for here. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(true); |
| workload.set_write_timeout_millis(150); |
| workload.set_payload_bytes(30000); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Run until the leader has rejected several ops. |
| MonoTime deadline = MonoTime::Now() + kMaxWaitTime; |
| while (true) { |
| int64_t num_rejections = 0; |
| ASSERT_OK(GetInt64Metric( |
| cluster_->tablet_server(leader_ts_idx)->bound_http_hostport(), |
| &METRIC_ENTITY_tablet, |
| nullptr, |
| &METRIC_transaction_memory_pressure_rejections, |
| "value", |
| &num_rejections)); |
| if (num_rejections >= kMinRejections) { |
| break; |
| } else if (deadline < MonoTime::Now()) { |
| FAIL() << "Ran for " << kMaxWaitTime.ToString() << ", deadline expired"; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(200)); |
| } |
| } |
| |
| static void EnableLogLatency(server::GenericServiceProxy* proxy) { |
| const unordered_map<string, string> kFlags = { |
| { "log_inject_latency", "true" }, |
| { "log_inject_latency_ms_mean", "1000" }, |
| }; |
| for (const auto& e : kFlags) { |
| SetFlagRequestPB req; |
| req.set_flag(e.first); |
| req.set_value(e.second); |
| SetFlagResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(proxy->SetFlag(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_EQ(SetFlagResponsePB::SUCCESS, resp.result()); |
| } |
| } |
| |
| // Run a regular workload with a leader that's writing to its WAL slowly. |
| TEST_F(RaftConsensusITest, TestSlowLeader) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| NO_FATALS(BuildAndStart()); |
| |
| TServerDetails* leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| NO_FATALS(EnableLogLatency(leader->generic_proxy.get())); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_num_read_threads(2); |
| workload.Setup(); |
| workload.Start(); |
| SleepFor(MonoDelta::FromSeconds(60)); |
| } |
| |
| // Test write batches just below the maximum limit. |
| TEST_F(RaftConsensusITest, TestLargeBatches) { |
| const vector<string> kTsFlags = { |
| // We write 128KB cells in this test, so bump the limit, and disable compression. |
| "--max_cell_size_bytes=1000000", |
| "--log_segment_size_mb=1", |
| "--log_compression_codec=no_compression", |
| "--log_min_segments_to_retain=100", // disable GC of logs. |
| }; |
| |
| NO_FATALS(BuildAndStart(kTsFlags)); |
| |
| const int64_t kBatchSize = 40; // Write 40 * 128kb = 5MB per batch. |
| const int64_t kNumBatchesToWrite = 100; |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB. |
| workload.set_write_batch_size(kBatchSize); |
| workload.set_num_write_threads(1); |
| workload.Setup(); |
| workload.Start(); |
| LOG(INFO) << "Waiting until we've written enough data..."; |
| while (workload.rows_inserted() < kBatchSize * kNumBatchesToWrite) { |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| workload.StopAndJoin(); |
| |
| // Verify replication. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(workload.table_name(), |
| ClusterVerifier::EXACTLY, |
| workload.rows_inserted())); |
| |
| int num_wals = inspect_->CountFilesInWALDirForTS(0, tablet_id_, "wal-*"); |
| int num_batches = workload.rows_inserted() / kBatchSize; |
| // The number of WALs should be similar to 'num_batches'. We can't make |
| // an exact assertion because async preallocation may take a small amount |
| // of time, in which case it's possible to put more than one batch in a |
| // single WAL. |
| ASSERT_GE(num_wals, num_batches / 2); |
| ASSERT_LE(num_wals, num_batches + 2); |
| } |
| |
| |
| // Regression test for KUDU-1469, a case in which a leader and follower could get "stuck" |
| // in a tight RPC loop, in which the leader would repeatedly send a batch of ops that the |
| // follower already had, the follower would fully de-dupe them, and yet the leader would |
| // never advance to the next batch. |
| // |
| // The 'perfect storm' reproduced here consists of: |
| // - the commit index has fallen far behind due to a slow log on the leader |
| // and one of the three replicas being inaccessible |
| // - the other replica elects itself |
| // - before the old leader notices it has been ousted, it writes at least one more |
| // operation to its local log. |
| // - before the replica can replicate anything to the old leader, it receives |
| // more writes, such that the first batch's preceding_op_id is ahead of |
| // the old leader's last written |
| // |
| // See the detailed comments below for more details. |
| TEST_F(RaftConsensusITest, TestCommitIndexFarBehindAfterLeaderElection) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // Set the batch size low so that, after the new leader takes |
| // over below, the ops required to catch up from the committed index |
| // to the newly replicated index don't fit into a single batch. |
| NO_FATALS(BuildAndStart({"--consensus_max_batch_size_bytes=50000"})); |
| |
| // Get the leader and the two replica tablet servers. |
| // These will have the following roles in this test: |
| // 1) 'first_leader_ts' is the initial leader. |
| // 2) 'second_leader_ts' will be forced to be elected as the second leader |
| // 3) 'only_vote_ts' will simulate a heavily overloaded (or corrupted) TS |
| // which is far enough behind (or failed) such that it only participates |
| // by voting. |
| TServerDetails* leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| ExternalTabletServer* first_leader_ts = cluster_->tablet_server_by_uuid(leader->uuid()); |
| ExternalTabletServer* second_leader_ts = nullptr; |
| ExternalTabletServer* only_vote_ts = nullptr; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| if (ts->instance_id().permanent_uuid() != leader->uuid()) { |
| if (second_leader_ts == nullptr) { |
| second_leader_ts = ts; |
| } else { |
| only_vote_ts = ts; |
| } |
| } |
| } |
| |
| // The 'only_vote' tablet server doesn't participate in replication. |
| ASSERT_OK(cluster_->SetFlag(only_vote_ts, "follower_reject_update_consensus_requests", "true")); |
| |
| // Inject a long delay in the log of the first leader, and write 10 operations. |
| // This delay ensures that it will replicate them to both itself and its follower, |
| // but due to its log sync not completing, it won't know that it is safe to advance its |
| // commit index until long after it has lost its leadership. |
| ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency_ms_mean", "6000")); |
| ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency", "true")); |
| InsertPayloadIgnoreErrors(0, 10, 10000); |
| |
| // Write one more operation to the leader, but disable consensus on the follower so that |
| // it doesn't get replicated. |
| ASSERT_OK(cluster_->SetFlag( |
| second_leader_ts, "follower_reject_update_consensus_requests", "true")); |
| InsertPayloadIgnoreErrors(10, 1, 10000); |
| |
| // Pause the initial leader and wait for the replica to elect itself. The third TS participates |
| // here by voting. |
| ASSERT_OK(first_leader_ts->Pause()); |
| ASSERT_OK(WaitUntilLeader(tablet_servers_[second_leader_ts->uuid()], tablet_id_, kTimeout)); |
| |
| // The voter TS has done its duty. Shut it down to avoid log spam where it tries to run |
| // elections. |
| only_vote_ts->Shutdown(); |
| |
| // Perform one insert on the new leader. The new leader has not yet replicated its NO_OP to |
| // the old leader, since the old leader is still paused. |
| NO_FATALS(CreateClient(&client_)); |
| InsertPayloadIgnoreErrors(13, 1, 10000); |
| |
| // Now we expect to have the following logs: |
| // |
| // first_leader_ts second_leader_ts |
| // ------------------- ------------ |
| // 1.1 NO_OP 1.1 NO_OP |
| // 1.2 WRITE_OP 1.2 WRITE_OP |
| // ................................ |
| // 1.11 WRITE_OP 1.11 WRITE_OP |
| // 1.12 WRITE_OP 2.12 NO_OP |
| // 2.13 WRITE_OP |
| // |
| // Both servers should have a committed_idx of 1.1 since the log was delayed. |
| |
| // Now, when we resume the original leader, we expect them to recover properly. |
| // Previously this triggered KUDU-1469. |
| ASSERT_OK(first_leader_ts->Resume()); |
| |
| TabletServerMap active_tservers = tablet_servers_; |
| active_tservers.erase(only_vote_ts->uuid()); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60), |
| active_tservers, |
| tablet_id_, 13)); |
| } |
| |
| // Run a regular workload with one follower that's writing to its WAL slowly. |
| TEST_F(RaftConsensusITest, TestSlowFollower) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // Leaving the default --missed_heartbeats_before_rejecting_snapshot_scans=1.5 |
| // makes the scenario prone to flakiness with the default setting of |
| // --raft_heartbeat_interval_ms=500 because the injected WAL latency of 1000ms |
| // is above of 750 ms (500 * 1.5 = 750). We don't want too many scan requests |
| // to be rejected because the follower replica hasn't heard from the leader |
| // when the safe time has already been advanced beyond the timestamp of the |
| // snapshot scan request. The customized setting for the |
| // --missed_heartbeats_before_rejecting_snapshot_scans flag helps making the |
| // scenario more stable. |
| NO_FATALS(BuildAndStart( |
| {"--missed_heartbeats_before_rejecting_snapshot_scans=3"})); |
| |
| TServerDetails* leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| int num_reconfigured = 0; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| if (ts->instance_id().permanent_uuid() != leader->uuid()) { |
| TServerDetails* follower = |
| GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid()); |
| ASSERT_NE(nullptr, follower); |
| NO_FATALS(EnableLogLatency(follower->generic_proxy.get())); |
| num_reconfigured++; |
| break; |
| } |
| } |
| ASSERT_EQ(1, num_reconfigured); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_num_read_threads(2); |
| workload.Setup(); |
| workload.Start(); |
| SleepFor(MonoDelta::FromSeconds(60)); |
| } |
| |
| // Run a special workload that constantly updates a single row on a cluster |
| // where every replica is writing to its WAL slowly. |
| TEST_F(RaftConsensusITest, TestHammerOneRow) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| NO_FATALS(BuildAndStart()); |
| |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| const ExternalTabletServer* ts = cluster_->tablet_server(i); |
| const TServerDetails* replica = GetReplicaWithUuidOrNull( |
| tablet_id_, ts->instance_id().permanent_uuid()); |
| ASSERT_NE(nullptr, replica); |
| NO_FATALS(EnableLogLatency(replica->generic_proxy.get())); |
| } |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_write_pattern(TestWorkload::UPDATE_ONE_ROW); |
| workload.set_write_timeout_millis(60000); |
| workload.set_num_write_threads(20); |
| workload.Setup(); |
| workload.Start(); |
| SleepFor(MonoDelta::FromSeconds(60)); |
| workload.StopAndJoin(); |
| |
| // Ensure that the replicas converge. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| } |
| |
| // Test that followers that fall behind the leader's log GC threshold are |
| // evicted from the config. |
| TEST_F(RaftConsensusITest, TestEvictAbandonedFollowers) { |
| const vector<string> master_flags = { |
| "--master_add_server_when_underreplicated=false", |
| |
| // This test is specific for the 3-2-3 replica management scheme. |
| "--raft_prepare_replacement_before_eviction=false", |
| }; |
| vector<string> ts_flags = { |
| // This test is specific for the 3-2-3 replica management scheme. |
| "--raft_prepare_replacement_before_eviction=false", |
| }; |
| AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC(). |
| |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(3, active_tablet_servers.size()); |
| |
| string leader_uuid; |
| int64_t orig_term; |
| string follower_uuid; |
| NO_FATALS(CauseFollowerToFallBehindLogGC( |
| tablet_servers_, &leader_uuid, &orig_term, &follower_uuid)); |
| |
| // Wait for the abandoned follower to be evicted. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, tablet_servers_[leader_uuid], |
| tablet_id_, timeout)); |
| ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid)); |
| ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 2)); |
| } |
| |
| // Test that, after followers are evicted from the config, the master re-adds a new |
| // replica for that follower and it eventually catches back up. |
| TEST_F(RaftConsensusITest, TestMasterReplacesEvictedFollowers) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| const int kReplicasNum = 3; |
| |
| FLAGS_num_replicas = kReplicasNum; |
| // Need an extra tablet server to place the replacement replica in case of the |
| // 3-4-3 replication scheme. |
| if (FLAGS_raft_prepare_replacement_before_eviction) { |
| FLAGS_num_tablet_servers = kReplicasNum + 1; |
| } |
| vector<string> ts_flags; |
| AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC(). |
| NO_FATALS(BuildAndStart(ts_flags)); |
| |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(FLAGS_num_tablet_servers, active_tablet_servers.size()); |
| if (FLAGS_raft_prepare_replacement_before_eviction) { |
| auto no_replica_uuids = GetServersWithoutReplica(tablet_id_); |
| ASSERT_EQ(1, no_replica_uuids.size()); |
| ASSERT_EQ(1, active_tablet_servers.erase(no_replica_uuids.front())); |
| } |
| |
| string leader_uuid; |
| int64_t orig_term; |
| string follower_uuid; |
| NO_FATALS(CauseFollowerToFallBehindLogGC( |
| active_tablet_servers, &leader_uuid, &orig_term, &follower_uuid)); |
| |
| if (FLAGS_raft_prepare_replacement_before_eviction) { |
| // Wait for the new replacement replica to be added and the replica that |
| // fell behind to be evicted. Since the 3-4-3 replication scheme adds a new |
| // non-voter replica before evicting the failed voter replica, the tablet |
| // server which used to host the failed replica will no longer be hosting |
| // any replica after the replacement completes. |
| ASSERT_EQ(kReplicasNum + 1, tablet_servers_.size()); |
| ASSERT_EVENTUALLY([&] { |
| NO_FATALS(WaitForReplicasAndUpdateLocations(kTableId)); |
| const auto uuids_no_replica = GetServersWithoutReplica(tablet_id_); |
| ASSERT_EQ(1, uuids_no_replica.size()); |
| ASSERT_EQ(follower_uuid, uuids_no_replica.front()); |
| }); |
| } else { |
| // The follower will be evicted. Now wait for the master to cause it to be |
| // copied. |
| ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 2)); |
| } |
| |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::AT_LEAST, 1)); |
| } |
| |
| // Test that a ChangeConfig() request is rejected unless the leader has |
| // replicated one of its own log entries during the current term. |
| // This is required for correctness of Raft config change. For details, |
| // see https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E |
| TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) { |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| |
| int kLeaderIndex = 0; |
| TServerDetails* leader_ts = tablet_servers_[cluster_->tablet_server(kLeaderIndex)->uuid()]; |
| |
| // Prevent followers from accepting UpdateConsensus requests from the leader, |
| // even though they will vote. This will allow us to get the distributed |
| // system into a state where there is a valid leader (based on winning an |
| // election) but that leader will be unable to commit any entries from its |
| // own term, making it illegal to accept ChangeConfig() requests. |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), |
| "follower_reject_update_consensus_requests", "true")); |
| } |
| |
| // Elect the leader. |
| ASSERT_OK(StartElection(leader_ts, tablet_id_, timeout)); |
| ASSERT_OK(WaitUntilLeader(leader_ts, tablet_id_, timeout)); |
| |
| // Now attempt to do a config change. It should be rejected because there |
| // have not been any ops (notably the initial NO_OP) from the leader's term |
| // that have been committed yet. |
| Status s = RemoveServer(leader_ts, tablet_id_, |
| tablet_servers_[cluster_->tablet_server(1)->uuid()], |
| timeout); |
| ASSERT_TRUE(!s.ok()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Leader has not yet committed an operation in its own term"); |
| } |
| |
| class RaftConsensusParamReplicationModesITest : |
| public RaftConsensusITest, |
| public ::testing::WithParamInterface<bool> { |
| }; |
| INSTANTIATE_TEST_SUITE_P(, RaftConsensusParamReplicationModesITest, |
| ::testing::Bool()); |
| |
| // Regression test for KUDU-1735, a crash in the case where a pending |
| // config change operation is aborted during tablet deletion when that config |
| // change was in fact already persisted to disk. |
| TEST_P(RaftConsensusParamReplicationModesITest, Test_KUDU_1735) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| const bool is_3_4_3 = GetParam(); |
| const vector<string> kTsFlags = { |
| // This scenario uses 'manual election' mode and assumes no leader change. |
| "--enable_leader_failure_detection=false", |
| // The test runs in both 3-2-3 and 3-4-3 replication modes. |
| Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3), |
| }; |
| const vector<string> kMasterFlags = { |
| // This scenario uses 'manual election' mode. |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| // The test runs in both 3-2-3 and 3-4-3 replication modes. |
| Substitute("--raft_prepare_replacement_before_eviction=$0", is_3_4_3), |
| }; |
| |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| ASSERT_EQ(3, tablet_servers_.size()); |
| TServerDetails* leader_tserver = |
| tablet_servers_[cluster_->tablet_server(0)->uuid()]; |
| TServerDetails* evicted_tserver = |
| tablet_servers_[cluster_->tablet_server(1)->uuid()]; |
| // Elect leader replica. |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitUntilLeader(leader_tserver, tablet_id_, kTimeout)); |
| |
| // Wait for at least one operation to be committed in current term by the |
| // leader replica. Attempting to perform a config change (RemoveServer() |
| // in this scenario) prior to committing at least one operation in current |
| // term leads to an error since Kudu's Raft implementation enforces that |
| // invariant. |
| consensus::OpId opid; |
| ASSERT_OK(WaitForOpFromCurrentTerm( |
| leader_tserver, tablet_id_, consensus::COMMITTED_OPID, kTimeout, &opid)); |
| // Wait for the committed index to propagate to all involved tablet servers. |
| ASSERT_OK(WaitForServersToAgree( |
| kTimeout, tablet_servers_, tablet_id_, opid.index(), |
| consensus::COMMITTED_OPID)); |
| |
| // Make follower tablet servers crash before writing a commit message. |
| for (const auto& e : tablet_servers_) { |
| const auto& server_uuid = e.first; |
| if (server_uuid == leader_tserver->uuid()) { |
| continue; |
| } |
| auto* ts = cluster_->tablet_server_by_uuid(server_uuid); |
| ASSERT_OK(cluster_->SetFlag(ts, "fault_crash_before_append_commit", "1.0")); |
| } |
| |
| // Run a config change. This will cause the other servers to crash with |
| // pending config change operations due to the above fault injection. |
| auto num_crashed_servers = 0; |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, evicted_tserver, kTimeout)); |
| for (const auto& e : tablet_servers_) { |
| const auto& server_uuid = e.first; |
| if (server_uuid == leader_tserver->uuid()) { |
| // The tablet server with the leader replica will not crash. |
| continue; |
| } |
| // One of the remaining followers will crash while trying to commit the |
| // config change corresponding to the eviction of the other follower. |
| // |
| // As for the other (gone-and-back) follower, the behavior depends on |
| // the replica management scheme, being a bit more subtle in the case of |
| // the 3-4-3 scheme. |
| // |
| // In case of the 3-2-3 scheme, the gone-and-back server will crash while |
| // trying to add the COMMIT entry into the WAL. That change corresponds to |
| // the op which adds the server as a voting member of the resulting Raft |
| // configuration. |
| // |
| // In the 3-4-3 case, the catalog manager sends DeleteTablet() and ADD_PEER |
| // Raft configuration change requests upon detecting a change in the |
| // tablet's configuration reported by leader replica (the latter request is |
| // to keep the target replication factor). Those requests are scheduled |
| // to be sent asynchronously around the same time. Due to the concurrency of |
| // sending and processing those two requests, there are two possible |
| // outcomes: |
| // 1. If the tablet server completes processing the DeleteTablet() request |
| // before receiving the ADD_PEER config change request, the ADD_PEER |
| // config change to add a new non-voter will remain pending since |
| // only one voter (the leader replica of the tablet) is alive. Since |
| // the configuration change cannot be committed, no commit message is |
| // written to the WAL in this case. So, the tablet server will not hit |
| // the injected crash. |
| // 2. If the DeleteTablet() request is delayed and the leader replica |
| // re-discovers the evicted replica as a new peer with LMP_MISMATCH |
| // status after processing the ADD_PEER configuration change, |
| // the leader replica will send the missing updates to the lingering |
| // one. The lingering replica will try to replay the first missing |
| // update (REMOVE_PEER) and will crash upon adding corresponding |
| // record into its WAL. |
| auto* ts = cluster_->tablet_server_by_uuid(server_uuid); |
| auto s = ts->WaitForInjectedCrash(MonoDelta::FromSeconds(5)); |
| if (server_uuid == evicted_tserver->uuid() && is_3_4_3 && !s.ok()) { |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| continue; |
| } |
| ASSERT_TRUE(s.ok()) << s.ToString(); |
| ++num_crashed_servers; |
| } |
| if (!is_3_4_3) { |
| ASSERT_EQ(2, num_crashed_servers); |
| } else { |
| ASSERT_GE(num_crashed_servers, 1); |
| ASSERT_LE(num_crashed_servers, 2); |
| vector<decltype(leader_tserver)> servers = { leader_tserver }; |
| if (num_crashed_servers == 1) { |
| servers.push_back(evicted_tserver); |
| } |
| // In case of the 3-4-3 scheme, make sure the configuration change to |
| // add the removed server back as a non-voter hasn't been committed. |
| ASSERT_EVENTUALLY([&] { |
| for (const auto* server : servers) { |
| consensus::ConsensusStatePB cstate; |
| ASSERT_OK(itest::GetConsensusState( |
| server, tablet_id_, kTimeout, EXCLUDE_HEALTH_REPORT, &cstate)); |
| ASSERT_TRUE(cstate.has_pending_config()); |
| } |
| }); |
| } |
| |
| // Delete the table, so that when we restart the crashed servers, they'll get RPCs to |
| // delete tablets while config changes are pending. |
| ASSERT_OK(client_->DeleteTable(kTableId)); |
| |
| // Restart tablet servers with follower replicas and wait for them to delete |
| // their replicas. |
| for (const auto& e : tablet_servers_) { |
| const auto& server_uuid = e.first; |
| if (server_uuid == leader_tserver->uuid()) { |
| // The leader should not crash, no need to restart it. |
| continue; |
| } |
| auto* ts = cluster_->tablet_server_by_uuid(server_uuid); |
| ts->Shutdown(); |
| ASSERT_OK(ts->Restart()); |
| } |
| for (const auto& e : tablet_servers_) { |
| ASSERT_OK(WaitForNumTabletsOnTS(e.second, 0, kTimeout, nullptr)); |
| } |
| } |
| |
| // Test that if for some reason none of the ops can be prepared, that it will |
| // come back as an error in UpdateConsensus(). |
| TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) { |
| const int kNumOps = 10; |
| vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| |
| // Shutdown the other servers so they don't get chatty. |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| // Configure the first server to fail all on prepare. |
| TServerDetails *replica_ts = tservers[0]; |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(replica_ts->uuid()), |
| "follower_fail_all_prepare", "true")); |
| |
| // Pretend to be the leader and send a request that should return an error. |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_tablet_id(tablet_id_); |
| req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid()); |
| req.set_caller_term(0); |
| req.set_committed_index(0); |
| req.set_all_replicated_index(0); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0)); |
| int64_t base_ts = GetTimestampOnServer(replica_ts); |
| for (int i = 0; i < kNumOps; i++) { |
| AddOp(MakeOpId(0, 1 + i), base_ts, &req); |
| } |
| |
| ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc)); |
| LOG(INFO) << SecureShortDebugString(resp); |
| ASSERT_TRUE(resp.status().has_error()); |
| ASSERT_EQ(consensus::ConsensusErrorPB::CANNOT_PREPARE, resp.status().error().code()); |
| ASSERT_STR_CONTAINS(SecureShortDebugString(resp), "Could not prepare a single op"); |
| } |
| |
| // Test that, if the raft metadata on a replica is corrupt, then the server |
| // doesn't crash, but instead marks the tablet as failed. |
| TEST_F(RaftConsensusITest, TestCorruptReplicaMetadata) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| // Start cluster and wait until we have a stable leader. |
| // Switch off tombstoning of evicted replicas to observe the failed tablet state. |
| const vector<string> kTserverFlags = { |
| // Ensure we are safe to evict. |
| "--consensus_rpc_timeout_ms=10000", |
| }; |
| const vector<string> kMasterFlags = { |
| // This is to spot the FAILED status of the corresponding tablet. |
| "--master_tombstone_evicted_tablet_replicas=false", |
| }; |
| |
| FLAGS_num_tablet_servers = FLAGS_num_replicas + 1; |
| NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); |
| |
| const auto uuids_no_replica = GetServersWithoutReplica(tablet_id_); |
| ASSERT_EQ(1, uuids_no_replica.size()); |
| const auto uuids_with_replica = GetServersWithReplica(tablet_id_); |
| ASSERT_EQ(FLAGS_num_replicas, uuids_with_replica.size()); |
| |
| // Wait until the tablet servers hosting the replica agree on the log index. |
| TabletServerMap tablet_servers; |
| for (const auto& e : tablet_replicas_) { |
| if (e.first == tablet_id_) { |
| tablet_servers.emplace(e.second->uuid(), e.second); |
| } |
| } |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers, |
| tablet_id_, 1)); |
| |
| // Shut down one of the tablet servers, and then muck |
| // with its consensus metadata to corrupt it. |
| const auto ts_idx = cluster_->tablet_server_index_by_uuid(uuids_with_replica.front()); |
| auto* ts = cluster_->tablet_server(ts_idx); |
| ts->Shutdown(); |
| consensus::ConsensusMetadataPB cmeta_pb; |
| ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(ts_idx, tablet_id_, &cmeta_pb)); |
| cmeta_pb.set_current_term(-2); |
| ASSERT_OK(inspect_->WriteConsensusMetadataOnTS(ts_idx, tablet_id_, cmeta_pb)); |
| |
| ASSERT_OK(ts->Restart()); |
| |
| // The server should come up with a 'FAILED' status because of the corrupt |
| // metadata. |
| ASSERT_OK(WaitUntilTabletInState(tablet_servers_[ts->uuid()], |
| tablet_id_, |
| tablet::FAILED, |
| kTimeout)); |
| // A new good copy should get created automatically. |
| ASSERT_OK(WaitUntilTabletInState(tablet_servers_[uuids_no_replica.front()], |
| tablet_id_, |
| tablet::RUNNING, |
| kTimeout)); |
| } |
| |
| int64_t GetFailedElectionsSinceStableLeader(const ExternalTabletServer* ets, |
| const std::string& tablet_id) { |
| int64_t ret; |
| CHECK_OK(GetInt64Metric( |
| ets->bound_http_hostport(), |
| &METRIC_ENTITY_tablet, |
| tablet_id.c_str(), |
| &METRIC_failed_elections_since_stable_leader, |
| "value", |
| &ret)); |
| return ret; |
| } |
| |
| int64_t GetTimeSinceLastLeaderHeartbeat(const ExternalTabletServer* ets, |
| const std::string& tablet_id) { |
| int64_t ret; |
| CHECK_OK(GetInt64Metric( |
| ets->bound_http_hostport(), |
| &METRIC_ENTITY_tablet, |
| tablet_id.c_str(), |
| &METRIC_time_since_last_leader_heartbeat, |
| "value", |
| &ret)); |
| return ret; |
| } |
| |
| // Test for election-related metrics with the leader failure detection disabled. |
| // This avoids inadvertent leader elections that might race with the tests |
| // of the metric's behavior. |
| TEST_F(RaftConsensusITest, TestElectionMetricsFailureDetectionDisabled) { |
| constexpr auto kNumReplicas = 3; |
| constexpr auto kNumTservers = 3; |
| const vector<string> kTsFlags = { |
| // Make leader elections faster so we can test |
| // failed_elections_since_stable_leader faster. |
| "--raft_heartbeat_interval_ms=100", |
| |
| // For stability reasons, this scenario uses the 'manual election' mode |
| // and assumes no leader change before restarting tablet servers later on. |
| "--enable_leader_failure_detection=false", |
| }; |
| const vector<string> kMasterFlags = { |
| // Corresponding flag for the tserver's '--enable_leader_failure_detection'. |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| const auto kTimeout = MonoDelta::FromSeconds(30); |
| |
| FLAGS_num_replicas = kNumReplicas; |
| FLAGS_num_tablet_servers = kNumTservers; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| |
| const auto* leader = tablet_servers_.begin()->second; |
| const auto& leader_uuid = leader->uuid(); |
| ASSERT_EVENTUALLY([&]() { |
| const auto kLeaderTimeout = MonoDelta::FromSeconds(10); |
| ASSERT_OK(StartElection(leader, tablet_id_, kLeaderTimeout)); |
| TServerDetails* elected_leader; |
| ASSERT_OK(WaitForLeaderWithCommittedOp( |
| tablet_id_, kLeaderTimeout, &elected_leader)); |
| ASSERT_EQ(leader->uuid(), elected_leader->uuid()); |
| }); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
| |
| auto* leader_srv = cluster_->tablet_server_by_uuid(leader_uuid); |
| auto* follower_srv = cluster_->tablet_server( |
| (cluster_->tablet_server_index_by_uuid(leader_uuid) + 1) % kNumTservers); |
| |
| // Leader should always report 0 since last leader heartbeat. |
| EXPECT_EQ(0, GetTimeSinceLastLeaderHeartbeat(leader_srv, tablet_id_)); |
| EXPECT_EQ(0, GetFailedElectionsSinceStableLeader(leader_srv, tablet_id_)); |
| |
| // Let's shut down all tablet servers except our chosen follower to make sure |
| // we don't have a leader. |
| for (auto i = 0; i < kNumTservers; ++i) { |
| if (cluster_->tablet_server(i) != follower_srv) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| } |
| |
| // Get two measurements with 500 ms sleep between them and see if the |
| // difference between them is at least 500ms. |
| const int64_t time_before_wait = GetTimeSinceLastLeaderHeartbeat(follower_srv, |
| tablet_id_); |
| SleepFor(MonoDelta::FromMilliseconds(500)); |
| const int64_t time_after_wait = GetTimeSinceLastLeaderHeartbeat(follower_srv, |
| tablet_id_); |
| ASSERT_TRUE(time_after_wait >= time_before_wait + 500) |
| << "time_before_wait: " << time_before_wait << "; " |
| << "time_after_wait: " << time_after_wait; |
| |
| // The follower doesn't start elections on itself because of |
| // --enable_leader_failure_detection=false flag, so the metric should read 0. |
| ASSERT_EQ(0, GetFailedElectionsSinceStableLeader(follower_srv, tablet_id_)); |
| } |
| |
| // Test for election-related metrics with the leader failure detection enabled. |
| TEST_F(RaftConsensusITest, TestElectionMetricsFailureDetectionEnabled) { |
| constexpr auto kNumReplicas = 3; |
| constexpr auto kNumTservers = 3; |
| const vector<string> kTsFlags = { |
| // Make leader elections faster so we can test |
| // failed_elections_since_stable_leader faster. |
| "--raft_heartbeat_interval_ms=100", |
| |
| // For the stability of the test, make leader election less likely in case |
| // of intermittent network failures and latency spikes. Otherwise, in case |
| // of on-going re-elections the metric value would not stabilize for a long |
| // time. |
| "--leader_failure_max_missed_heartbeat_periods=20", |
| }; |
| const auto kTimeout = MonoDelta::FromSeconds(30); |
| |
| FLAGS_num_replicas = kNumReplicas; |
| FLAGS_num_tablet_servers = kNumTservers; |
| NO_FATALS(BuildAndStart(kTsFlags)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
| |
| // Verify failed_elections_since_stable_leader is reset to 0 in the majority |
| // of all replicas after the cluster is in stable state, i.e. the leader |
| // is elected and functional. |
| ASSERT_EVENTUALLY([&]() { |
| for (auto i = 0; i < kNumTservers; ++i) { |
| ASSERT_EQ(0, GetFailedElectionsSinceStableLeader( |
| cluster_->tablet_server(i), tablet_id_)); |
| } |
| }); |
| |
| auto* srv_ts = tablet_servers_.begin()->second; |
| auto* srv_ext = cluster_->tablet_server_by_uuid(srv_ts->uuid()); |
| for (auto i = 0; i < kNumTservers; ++i) { |
| auto* s = cluster_->tablet_server(i); |
| if (s == srv_ext) { |
| continue; |
| } |
| s->Shutdown(); |
| } |
| |
| // If the server which is left alive hosts the leader replica for the tablet, |
| // make it step down. Otherwise, it's a no-op. |
| ignore_result(LeaderStepDown(srv_ts, tablet_id_, kTimeout)); |
| |
| // Verify failed_elections_since_stable_leader is advanced eventually. |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_GT(GetFailedElectionsSinceStableLeader(srv_ext, tablet_id_), 0); |
| }); |
| |
| // Restart the rest of tablet servers. |
| for (auto i = 0; i < kNumTservers; ++i) { |
| auto* s = cluster_->tablet_server(i); |
| if (s == srv_ext) { |
| continue; |
| } |
| ASSERT_OK(s->Restart()); |
| } |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
| |
| // Verify failed_elections_since_stable_leader is reset to 0 eventually. |
| ASSERT_EVENTUALLY([&]() { |
| for (auto i = 0; i < kNumTservers; ++i) { |
| ASSERT_EQ(0, GetFailedElectionsSinceStableLeader( |
| cluster_->tablet_server(i), tablet_id_)); |
| } |
| }); |
| } |
| |
| // Test that an IOError when writing to the write-ahead log is a fatal error. |
| // First, we test that failed replicates are fatal. Then, we test that failed |
| // commits are fatal. |
| TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) { |
| const vector<string> kTsFlags = { |
| "--enable_leader_failure_detection=false", |
| // Disable core dumps since we will inject FATAL errors, and dumping |
| // core can take a long time. |
| "--disable_core_dumps", |
| }; |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| FLAGS_num_replicas = 3; |
| FLAGS_num_tablet_servers = 3; |
| NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| vector<ExternalTabletServer*> ext_tservers; |
| for (auto* details : tservers) { |
| ext_tservers.push_back(cluster_->tablet_server_by_uuid(details->uuid())); |
| } |
| |
| // Test failed replicates. |
| |
| // Elect server 2 as leader and wait for log index 1 to propagate to all servers. |
| ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| // Inject an IOError the next time servers 1 and 2 write to their WAL. |
| // Then, cause server 0 to start and win a leader election. |
| // This will cause servers 0 and 1 to crash. |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(cluster_->SetFlag(ext_tservers[i], |
| "log_inject_io_error_on_append_fraction", "1.0")); |
| } |
| ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(ext_tservers[i]->WaitForFatal(MonoDelta::FromSeconds(10))); |
| } |
| |
| // Now we know followers crash when they write to their log. |
| // Let's verify the same for the leader (server 0). |
| ASSERT_OK(cluster_->SetFlag(ext_tservers[0], |
| "log_inject_io_error_on_append_fraction", "1.0")); |
| |
| // Attempt to write to the leader, but with a short timeout. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(true); |
| workload.set_write_timeout_millis(100); |
| workload.set_num_write_threads(1); |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Leader should crash as well. |
| ASSERT_OK(ext_tservers[0]->WaitForFatal(MonoDelta::FromSeconds(10))); |
| workload.StopAndJoin(); |
| |
| LOG(INFO) << "Everything crashed!"; |
| |
| // Test failed commits. |
| |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| NO_FATALS(WaitForTSAndReplicas()); |
| tservers.clear(); |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| |
| // Elect server 0 as leader, wait until writes are going through. |
| ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
| workload.Start(); |
| int64_t prev_inserted = workload.rows_inserted(); |
| while (workload.rows_inserted() == prev_inserted) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| // Now shutdown servers 1 and 2 so that writes cannot commit. Write to the |
| // leader, set flags so that commits crash the server, then bring the |
| // followers back up. |
| for (int i = 1; i <= 2; i++) { |
| ext_tservers[i]->Shutdown(); |
| } |
| |
| OpId prev_opid, cur_opid; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, tservers[0], consensus::RECEIVED_OPID, |
| MonoDelta::FromSeconds(10), &prev_opid)); |
| VLOG(1) << "Previous OpId on server 0: " << OpIdToString(prev_opid); |
| workload.Start(); |
| // Wait until we've got (uncommitted) entries into the leader's log. |
| do { |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, tservers[0], consensus::RECEIVED_OPID, |
| MonoDelta::FromSeconds(10), &cur_opid)); |
| VLOG(1) << "Current OpId on server 0: " << OpIdToString(cur_opid); |
| } while (consensus::OpIdEquals(prev_opid, cur_opid)); |
| workload.StopAndJoin(); |
| ASSERT_OK(cluster_->SetFlag(ext_tservers[0], |
| "log_inject_io_error_on_append_fraction", "1.0")); |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(ext_tservers[i]->Restart()); |
| } |
| // Leader will crash. |
| ASSERT_OK(ext_tservers[0]->WaitForFatal(MonoDelta::FromSeconds(10))); |
| } |
| |
| // KUDU-1613: Test that when we reset and restart a tablet server, with a new |
| // uuid but with the same host and port, replicas that were hosted by the |
| // previous incarnation are correctly detected as failed and eventually |
| // re-replicated. |
| TEST_P(RaftConsensusParamReplicationModesITest, TestRestartWithDifferentUUID) { |
| // Start a cluster and insert data. |
| const bool kPrepareReplacementBeforeEviction = GetParam(); |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = kPrepareReplacementBeforeEviction ? 4 : 3; |
| opts.extra_tserver_flags = { |
| // Set a low timeout. If we can't re-replicate in a reasonable amount of |
| // time, it means we're not evicting at all. |
| "--follower_unavailable_considered_failed_sec=10", |
| Substitute("--raft_prepare_replacement_before_eviction=$0", |
| kPrepareReplacementBeforeEviction), |
| }; |
| opts.extra_master_flags = { |
| Substitute("--raft_prepare_replacement_before_eviction=$0", |
| kPrepareReplacementBeforeEviction), |
| }; |
| cluster_.reset(new ExternalMiniCluster(std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| |
| // Write some data. In writing many tablets, we're making it more likely that |
| // all tablet servers will have some tablets on them. |
| TestWorkload writes(cluster_.get()); |
| writes.set_num_tablets(5); |
| writes.set_timeout_allowed(true); |
| writes.Setup(); |
| writes.Start(); |
| const auto wait_for_some_inserts = [&] { |
| const auto rows_init = writes.rows_inserted(); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_GT(writes.rows_inserted() - rows_init, 1000); |
| }); |
| }; |
| wait_for_some_inserts(); |
| |
| // Completely shut down one of the tablet servers, keeping its ports so we |
| // can take over with another tablet server. |
| ExternalTabletServer* ts = cluster_->tablet_server(0); |
| vector<HostPort> master_hostports; |
| for (int i = 0; i < cluster_->num_masters(); i++) { |
| master_hostports.push_back(cluster_->master(i)->bound_rpc_hostport()); |
| } |
| ExternalDaemonOptions ts_opts = ts->opts(); |
| LOG(INFO) << Substitute("Storing port $0 for use by new tablet server", |
| ts->bound_rpc_hostport().ToString()); |
| ts_opts.rpc_bind_address = ts->bound_rpc_hostport(); |
| ts->Shutdown(); |
| |
| // With one server down, we should be able to accept writes. |
| wait_for_some_inserts(); |
| writes.StopAndJoin(); |
| |
| // Start up a new server with a new UUID bound to the old RPC port. |
| ts_opts.wal_dir = Substitute("$0-new", ts_opts.wal_dir); |
| ts_opts.data_dirs = { Substitute("$0-new", ts_opts.data_dirs[0]) }; |
| ASSERT_OK(env_->CreateDir(ts_opts.wal_dir)); |
| ASSERT_OK(env_->CreateDir(ts_opts.data_dirs[0])); |
| scoped_refptr<ExternalTabletServer> new_ts = |
| new ExternalTabletServer(ts_opts, master_hostports); |
| ASSERT_OK(new_ts->Start()); |
| |
| // Eventually the new server should be copied to. |
| ASSERT_EVENTUALLY([&] { |
| vector<string> files_in_wal_dir; |
| ASSERT_OK(ListFilesInDir(env_, JoinPathSegments(ts_opts.wal_dir, "wals"), &files_in_wal_dir)); |
| ASSERT_FALSE(files_in_wal_dir.empty()); |
| }); |
| } |
| |
| // Designating graceful leadership transfer to a follower that cannot catch up |
| // should eventually fail. |
| TEST_F(RaftConsensusITest, TestLeaderTransferWhenFollowerFallsBehindLeaderGC) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| const auto kTimeout = MonoDelta::FromSeconds(30); |
| vector<string> ts_flags = { |
| // Disable follower eviction. |
| "--evict_failed_followers=false", |
| }; |
| vector<string> master_flags = { |
| // Prevent the master from evicting unrecoverably failed followers. |
| "--catalog_manager_evict_excess_replicas=false", |
| }; |
| AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC(). |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| string leader_uuid; |
| int64_t orig_term; |
| string follower_uuid; |
| NO_FATALS(CauseFollowerToFallBehindLogGC( |
| tablet_servers_, &leader_uuid, &orig_term, &follower_uuid)); |
| SCOPED_TRACE(Substitute("leader: $0 follower: $1", leader_uuid, follower_uuid)); |
| |
| // Wait for remaining majority to agree. |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(3, active_tablet_servers.size()); |
| ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 1)); |
| |
| // Try to transfer leadership to the follower that has fallen behind log GC. |
| // The leader role might fluctuate among tablet replicas: LeaderStepDown() |
| // request will be retried in such rare cases because of ASSERT_EVENTUALLY(). |
| ASSERT_EVENTUALLY([&] { |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader( |
| tablet_servers_, tablet_id_, kTimeout, &leader_ts)); |
| LeaderStepDownRequestPB req; |
| req.set_dest_uuid(leader_ts->uuid()); |
| req.set_tablet_id(tablet_id_); |
| req.set_mode(consensus::LeaderStepDownMode::GRACEFUL); |
| req.set_new_leader_uuid(follower_uuid); |
| |
| // The request should succeed. |
| ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(leader_ts->consensus_proxy.get()); |
| RpcController ctl; |
| LeaderStepDownResponsePB resp; |
| ASSERT_OK(c_proxy->LeaderStepDown(req, &resp, &ctl)); |
| ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp); |
| }); |
| |
| // However, the leader will not be able to transfer leadership to the lagging |
| // follower, and eventually will resume normal operation. We check this by |
| // waiting for a write to succeed. |
| ASSERT_EVENTUALLY([&] { |
| WriteRequestPB w_req; |
| w_req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, w_req.mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal, |
| "hello world", w_req.mutable_row_operations()); |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader( |
| tablet_servers_, tablet_id_, kTimeout, &leader_ts)); |
| RpcController ctl; |
| ctl.set_timeout(kTimeout); |
| WriteResponsePB w_resp; |
| ASSERT_OK(leader_ts->tserver_proxy->Write(w_req, &w_resp, &ctl)); |
| }); |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |