| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <boost/optional.hpp> |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| #include <unordered_map> |
| #include <unordered_set> |
| |
| #include "kudu/client/client-test-util.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus_peers.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/strcat.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/integration-tests/ts_itest-base.h" |
| #include "kudu/server/server_base.pb.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_int32(num_client_threads, 8, |
| "Number of client threads to launch"); |
| DEFINE_int64(client_inserts_per_thread, 50, |
| "Number of rows inserted by each client thread"); |
| DEFINE_int64(client_num_batches_per_thread, 5, |
| "In how many batches to group the rows, for each client"); |
| DECLARE_int32(consensus_rpc_timeout_ms); |
| |
| METRIC_DECLARE_entity(tablet); |
| METRIC_DECLARE_counter(transaction_memory_pressure_rejections); |
| METRIC_DECLARE_gauge_int64(raft_term); |
| |
| namespace kudu { |
| namespace tserver { |
| |
| using client::KuduInsert; |
| using client::KuduSession; |
| using client::KuduTable; |
| using client::sp::shared_ptr; |
| using consensus::ConsensusRequestPB; |
| using consensus::ConsensusResponsePB; |
| using consensus::ConsensusServiceProxy; |
| using consensus::MajoritySize; |
| using consensus::MakeOpId; |
| using consensus::RaftPeerPB; |
| using consensus::ReplicateMsg; |
| using itest::AddServer; |
| using itest::GetReplicaStatusAndCheckIfLeader; |
| using itest::LeaderStepDown; |
| using itest::RemoveServer; |
| using itest::StartElection; |
| using itest::WaitUntilLeader; |
| using itest::WriteSimpleTestRow; |
| using master::GetTabletLocationsRequestPB; |
| using master::GetTabletLocationsResponsePB; |
| using master::TabletLocationsPB; |
| using rpc::RpcController; |
| using server::SetFlagRequestPB; |
| using server::SetFlagResponsePB; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| static const int kConsensusRpcTimeoutForTests = 50; |
| |
| static const int kTestRowKey = 1234; |
| static const int kTestRowIntVal = 5678; |
| |
| // Integration test for the raft consensus implementation. |
| // Uses the whole tablet server stack with ExternalMiniCluster. |
| class RaftConsensusITest : public TabletServerIntegrationTestBase { |
| public: |
| RaftConsensusITest() |
| : inserters_(FLAGS_num_client_threads) { |
| } |
| |
| virtual void SetUp() OVERRIDE { |
| TabletServerIntegrationTestBase::SetUp(); |
| FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests; |
| } |
| |
| void ScanReplica(TabletServerServiceProxy* replica_proxy, |
| vector<string>* results) { |
| |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromSeconds(10)); // Squelch warnings. |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Send the call |
| { |
| req.set_batch_size_bytes(0); |
| SCOPED_TRACE(req.DebugString()); |
| ASSERT_OK(replica_proxy->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(resp.DebugString()); |
| if (resp.has_error()) { |
| ASSERT_OK(StatusFromPB(resp.error().status())); |
| } |
| } |
| |
| if (!resp.has_more_results()) |
| return; |
| |
| // Drain all the rows from the scanner. |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), |
| schema_, |
| results, |
| replica_proxy)); |
| |
| std::sort(results->begin(), results->end()); |
| } |
| |
| // Scan the given replica in a loop until the number of rows |
| // is 'expected_count'. If it takes more than 10 seconds, then |
| // fails the test. |
| void WaitForRowCount(TabletServerServiceProxy* replica_proxy, |
| int expected_count, |
| vector<string>* results) { |
| LOG(INFO) << "Waiting for row count " << expected_count << "..."; |
| MonoTime start = MonoTime::Now(MonoTime::FINE); |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(MonoDelta::FromSeconds(90)); |
| while (true) { |
| results->clear(); |
| NO_FATALS(ScanReplica(replica_proxy, results)); |
| if (results->size() == expected_count) { |
| return; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| if (!MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) { |
| break; |
| } |
| } |
| MonoTime end = MonoTime::Now(MonoTime::FINE); |
| LOG(WARNING) << "Didn't reach row count " << expected_count; |
| FAIL() << "Did not reach expected row count " << expected_count |
| << " after " << end.GetDeltaSince(start).ToString() |
| << ": rows: " << *results; |
| } |
| |
| |
| // Add an Insert operation to the given consensus request. |
| // The row to be inserted is generated based on the OpId. |
| void AddOp(const OpId& id, ConsensusRequestPB* req); |
| |
| string DumpToString(TServerDetails* leader, |
| const vector<string>& leader_results, |
| TServerDetails* replica, |
| const vector<string>& replica_results) { |
| string ret = strings::Substitute("Replica results did not match the leaders." |
| "\nLeader: $0\nReplica: $1. Results size " |
| "L: $2 R: $3", |
| leader->ToString(), |
| replica->ToString(), |
| leader_results.size(), |
| replica_results.size()); |
| |
| StrAppend(&ret, "Leader Results: \n"); |
| for (const string& result : leader_results) { |
| StrAppend(&ret, result, "\n"); |
| } |
| |
| StrAppend(&ret, "Replica Results: \n"); |
| for (const string& result : replica_results) { |
| StrAppend(&ret, result, "\n"); |
| } |
| |
| return ret; |
| } |
| |
| // Insert 'num_rows' rows starting with row key 'start_row'. |
| // Each row will have size 'payload_size'. A short (100ms) timeout is |
| // used. If the flush generates any errors they will be ignored. |
| void InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size); |
| |
| void InsertTestRowsRemoteThread(uint64_t first_row, |
| uint64_t count, |
| uint64_t num_batches, |
| const vector<CountDownLatch*>& latches) { |
| shared_ptr<KuduTable> table; |
| CHECK_OK(client_->OpenTable(kTableId, &table)); |
| |
| shared_ptr<KuduSession> session = client_->NewSession(); |
| session->SetTimeoutMillis(60000); |
| CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| |
| for (int i = 0; i < num_batches; i++) { |
| uint64_t first_row_in_batch = first_row + (i * count / num_batches); |
| uint64_t last_row_in_batch = first_row_in_batch + count / num_batches; |
| |
| for (int j = first_row_in_batch; j < last_row_in_batch; j++) { |
| gscoped_ptr<KuduInsert> insert(table->NewInsert()); |
| KuduPartialRow* row = insert->mutable_row(); |
| CHECK_OK(row->SetInt32(0, j)); |
| CHECK_OK(row->SetInt32(1, j * 2)); |
| CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", j)))); |
| CHECK_OK(session->Apply(insert.release())); |
| } |
| |
| // We don't handle write idempotency yet. (i.e making sure that when a leader fails |
| // writes to it that were eventually committed by the new leader but un-ackd to the |
| // client are not retried), so some errors are expected. |
| // It's OK as long as the errors are Status::AlreadyPresent(); |
| |
| int inserted = last_row_in_batch - first_row_in_batch; |
| |
| Status s = session->Flush(); |
| if (PREDICT_FALSE(!s.ok())) { |
| std::vector<client::KuduError*> errors; |
| ElementDeleter d(&errors); |
| bool overflow; |
| session->GetPendingErrors(&errors, &overflow); |
| CHECK(!overflow); |
| if (!errors.empty()) { |
| for (const client::KuduError* e : errors) { |
| LOG(ERROR) << "Unexpected error: " << e->status().ToString(); |
| } |
| FAIL() << "Found errors while inserting."; |
| } |
| inserted -= errors.size(); |
| } |
| |
| for (CountDownLatch* latch : latches) { |
| latch->CountDown(inserted); |
| } |
| } |
| |
| inserters_.CountDown(); |
| } |
| |
| // Brings Chaos to a MiniTabletServer by introducing random delays. Does this by |
| // pausing the daemon a random amount of time. |
| void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec) { |
| while (inserters_.count() > 0) { |
| |
| // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock |
| // longer than the the timeout a small (~5%) percentage of the times. |
| // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution). |
| double sleep_time_usec = 1000 * |
| ((random_.Normal(0, 1) * timeout_msec) / 1.64485); |
| |
| if (sleep_time_usec < 0) sleep_time_usec = 0; |
| |
| // Additionally only cause timeouts at all 50% of the time, otherwise sleep. |
| double val = (rand() * 1.0) / RAND_MAX; |
| if (val < 0.5) { |
| SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec)); |
| continue; |
| } |
| |
| ASSERT_OK(tablet_server->Pause()); |
| LOG_IF(INFO, sleep_time_usec > 0.0) |
| << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid() |
| << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec..."; |
| SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec)); |
| ASSERT_OK(tablet_server->Resume()); |
| } |
| } |
| |
| // Thread which loops until '*finish' becomes true, trying to insert a row |
| // on the given tablet server identified by 'replica_idx'. |
| void StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish); |
| |
| // Stops the current leader of the configuration, runs leader election and then brings it back. |
| // Before stopping the leader this pauses all follower nodes in regular intervals so that |
| // we get an increased chance of stuff being pending. |
| void StopOrKillLeaderAndElectNewOne() { |
| bool kill = rand() % 2 == 0; |
| |
| TServerDetails* old_leader; |
| CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &old_leader)); |
| ExternalTabletServer* old_leader_ets = cluster_->tablet_server_by_uuid(old_leader->uuid()); |
| |
| vector<TServerDetails*> followers; |
| GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
| |
| for (TServerDetails* ts : followers) { |
| ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid()); |
| CHECK_OK(ets->Pause()); |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| |
| // When all are paused also pause or kill the current leader. Since we've waited a bit |
| // the old leader is likely to have operations that must be aborted. |
| if (kill) { |
| old_leader_ets->Shutdown(); |
| } else { |
| CHECK_OK(old_leader_ets->Pause()); |
| } |
| |
| // Resume the replicas. |
| for (TServerDetails* ts : followers) { |
| ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid()); |
| CHECK_OK(ets->Resume()); |
| } |
| |
| // Get the new leader. |
| TServerDetails* new_leader; |
| CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader)); |
| |
| // Bring the old leader back. |
| if (kill) { |
| CHECK_OK(old_leader_ets->Restart()); |
| // Wait until we have the same number of followers. |
| int initial_followers = followers.size(); |
| do { |
| GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
| } while (followers.size() < initial_followers); |
| } else { |
| CHECK_OK(old_leader_ets->Resume()); |
| } |
| } |
| |
| // Writes 'num_writes' operations to the current leader. Each of the operations |
| // has a payload of around 128KB. Causes a gtest failure on error. |
| void Write128KOpsToLeader(int num_writes); |
| |
| // Ensure that a majority of servers is required for elections and writes. |
| // This is done by pausing a majority and asserting that writes and elections fail, |
| // then unpausing the majority and asserting that elections and writes succeed. |
| // If fails, throws a gtest assertion. |
| // Note: This test assumes all tablet servers listed in tablet_servers are voters. |
| void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers, |
| const string& leader_uuid); |
| |
| // Return the replicas of the specified 'tablet_id', as seen by the Master. |
| Status GetTabletLocations(const string& tablet_id, const MonoDelta& timeout, |
| master::TabletLocationsPB* tablet_locations); |
| |
| enum WaitForLeader { |
| NO_WAIT_FOR_LEADER = 0, |
| WAIT_FOR_LEADER = 1 |
| }; |
| |
| // Wait for the specified number of replicas to be reported by the master for |
| // the given tablet. Fails with an assertion if the timeout expires. |
| void WaitForReplicasReportedToMaster(int num_replicas, const string& tablet_id, |
| const MonoDelta& timeout, |
| WaitForLeader wait_for_leader, |
| bool* has_leader, |
| master::TabletLocationsPB* tablet_locations); |
| |
| static const bool WITH_NOTIFICATION_LATENCY = true; |
| static const bool WITHOUT_NOTIFICATION_LATENCY = false; |
| void DoTestChurnyElections(bool with_latency); |
| |
| protected: |
| // Flags needed for CauseFollowerToFallBehindLogGC() to work well. |
| void AddFlagsForLogRolls(vector<string>* extra_tserver_flags); |
| |
| // Pause one of the followers and write enough data to the remaining replicas |
| // to cause log GC, then resume the paused follower. On success, |
| // 'leader_uuid' will be set to the UUID of the leader, 'orig_term' will be |
| // set to the term of the leader before un-pausing the follower, and |
| // 'fell_behind_uuid' will be set to the UUID of the follower that was paused |
| // and caused to fall behind. These can be used for verification purposes. |
| // |
| // Certain flags should be set. You can add the required flags with |
| // AddFlagsForLogRolls() before starting the cluster. |
| void CauseFollowerToFallBehindLogGC(string* leader_uuid, |
| int64_t* orig_term, |
| string* fell_behind_uuid); |
| |
| shared_ptr<KuduTable> table_; |
| std::vector<scoped_refptr<kudu::Thread> > threads_; |
| CountDownLatch inserters_; |
| }; |
| |
| void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) { |
| // We configure a small log segment size so that we roll frequently, |
| // configure a small cache size so that we evict data from the cache, and |
| // retain as few segments as possible. We also turn off async segment |
| // allocation -- this ensures that we roll many segments of logs (with async |
| // allocation, it's possible that the preallocation is slow and we wouldn't |
| // roll deterministically). |
| extra_tserver_flags->push_back("--log_cache_size_limit_mb=1"); |
| extra_tserver_flags->push_back("--log_segment_size_mb=1"); |
| extra_tserver_flags->push_back("--log_async_preallocate_segments=false"); |
| extra_tserver_flags->push_back("--log_min_segments_to_retain=1"); |
| extra_tserver_flags->push_back("--log_min_seconds_to_retain=0"); |
| extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100"); |
| } |
| |
| // Test that we can retrieve the permanent uuid of a server running |
| // consensus service via RPC. |
| TEST_F(RaftConsensusITest, TestGetPermanentUuid) { |
| BuildAndStart(vector<string>()); |
| |
| RaftPeerPB peer; |
| TServerDetails* leader = nullptr; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| peer.mutable_last_known_addr()->CopyFrom(leader->registration.rpc_addresses(0)); |
| const string expected_uuid = leader->instance_id.permanent_uuid(); |
| |
| rpc::MessengerBuilder builder("test builder"); |
| builder.set_num_reactors(1); |
| std::shared_ptr<rpc::Messenger> messenger; |
| ASSERT_OK(builder.Build(&messenger)); |
| |
| ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(messenger, &peer)); |
| ASSERT_EQ(expected_uuid, peer.permanent_uuid()); |
| } |
| |
| // TODO allow the scan to define an operation id, fetch the last id |
| // from the leader and then use that id to make the replica wait |
| // until it is done. This will avoid the sleeps below. |
| TEST_F(RaftConsensusITest, TestInsertAndMutateThroughConsensus) { |
| BuildAndStart(vector<string>()); |
| |
| int num_iters = AllowSlowTests() ? 10 : 1; |
| |
| for (int i = 0; i < num_iters; i++) { |
| InsertTestRowsRemoteThread(i * FLAGS_client_inserts_per_thread, |
| FLAGS_client_inserts_per_thread, |
| FLAGS_client_num_batches_per_thread, |
| vector<CountDownLatch*>()); |
| } |
| ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters); |
| } |
| |
| TEST_F(RaftConsensusITest, TestFailedTransaction) { |
| BuildAndStart(vector<string>()); |
| |
| // Wait until we have a stable leader. |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
| tablet_id_, 1)); |
| |
| WriteRequestPB req; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| RowOperationsPB* data = req.mutable_row_operations(); |
| data->set_rows("some gibberish!"); |
| |
| WriteResponsePB resp; |
| RpcController controller; |
| controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
| |
| TServerDetails* leader = nullptr; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| |
| ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller)); |
| ASSERT_TRUE(resp.has_error()); |
| |
| // Add a proper row so that we can verify that all of the replicas continue |
| // to process transactions after a failure. Additionally, this allows us to wait |
| // for all of the replicas to finish processing transactions before shutting down, |
| // avoiding a potential stall as we currently can't abort transactions (see KUDU-341). |
| data->Clear(); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, 0, 0, "original0", data); |
| |
| controller.Reset(); |
| controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
| |
| ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller)); |
| SCOPED_TRACE(resp.ShortDebugString()); |
| ASSERT_FALSE(resp.has_error()); |
| |
| ASSERT_ALL_REPLICAS_AGREE(1); |
| } |
| |
| // Inserts rows through consensus and also starts one delay injecting thread |
| // that steals consensus peer locks for a while. This is meant to test that |
| // even with timeouts and repeated requests consensus still works. |
| TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) { |
| BuildAndStart(vector<string>()); |
| |
| if (500 == FLAGS_client_inserts_per_thread) { |
| if (AllowSlowTests()) { |
| FLAGS_client_inserts_per_thread = FLAGS_client_inserts_per_thread * 10; |
| FLAGS_client_num_batches_per_thread = FLAGS_client_num_batches_per_thread * 10; |
| } |
| } |
| |
| int num_threads = FLAGS_num_client_threads; |
| for (int i = 0; i < num_threads; i++) { |
| scoped_refptr<kudu::Thread> new_thread; |
| CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i), |
| &RaftConsensusITest::InsertTestRowsRemoteThread, |
| this, i * FLAGS_client_inserts_per_thread, |
| FLAGS_client_inserts_per_thread, |
| FLAGS_client_num_batches_per_thread, |
| vector<CountDownLatch*>(), |
| &new_thread)); |
| threads_.push_back(new_thread); |
| } |
| for (int i = 0; i < FLAGS_num_replicas; i++) { |
| scoped_refptr<kudu::Thread> new_thread; |
| CHECK_OK(kudu::Thread::Create("test", strings::Substitute("chaos-test$0", i), |
| &RaftConsensusITest::DelayInjectorThread, |
| this, cluster_->tablet_server(i), |
| kConsensusRpcTimeoutForTests, |
| &new_thread)); |
| threads_.push_back(new_thread); |
| } |
| for (scoped_refptr<kudu::Thread> thr : threads_) { |
| CHECK_OK(ThreadJoiner(thr.get()).Join()); |
| } |
| |
| ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads); |
| } |
| |
| TEST_F(RaftConsensusITest, TestInsertOnNonLeader) { |
| BuildAndStart(vector<string>()); |
| |
| // Wait for the initial leader election to complete. |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
| tablet_id_, 1)); |
| |
| // Manually construct a write RPC to a replica and make sure it responds |
| // with the correct error code. |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController rpc; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal, |
| "hello world via RPC", req.mutable_row_operations()); |
| |
| // Get the leader. |
| vector<TServerDetails*> followers; |
| GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
| |
| ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc)); |
| SCOPED_TRACE(resp.DebugString()); |
| ASSERT_TRUE(resp.has_error()); |
| Status s = StatusFromPB(resp.error().status()); |
| EXPECT_TRUE(s.IsIllegalState()); |
| ASSERT_STR_CONTAINS(s.ToString(), "is not leader of this config. Role: FOLLOWER"); |
| // TODO: need to change the error code to be something like REPLICA_NOT_LEADER |
| // so that the client can properly handle this case! plumbing this is a little difficult |
| // so not addressing at the moment. |
| ASSERT_ALL_REPLICAS_AGREE(0); |
| } |
| |
| TEST_F(RaftConsensusITest, TestRunLeaderElection) { |
| // Reset consensus rpc timeout to the default value or the election might fail often. |
| FLAGS_consensus_rpc_timeout_ms = 1000; |
| |
| BuildAndStart(vector<string>()); |
| |
| int num_iters = AllowSlowTests() ? 10 : 1; |
| |
| InsertTestRowsRemoteThread(0, |
| FLAGS_client_inserts_per_thread * num_iters, |
| FLAGS_client_num_batches_per_thread, |
| vector<CountDownLatch*>()); |
| |
| ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters); |
| |
| // Select the last follower to be new leader. |
| vector<TServerDetails*> followers; |
| GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
| |
| // Now shutdown the current leader. |
| TServerDetails* leader = DCHECK_NOTNULL(GetLeaderReplicaOrNull(tablet_id_)); |
| ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid()); |
| leader_ets->Shutdown(); |
| |
| TServerDetails* replica = followers.back(); |
| CHECK_NE(leader->instance_id.permanent_uuid(), replica->instance_id.permanent_uuid()); |
| |
| // Make the new replica leader. |
| ASSERT_OK(StartElection(replica, tablet_id_, MonoDelta::FromSeconds(10))); |
| |
| // Insert a bunch more rows. |
| InsertTestRowsRemoteThread(FLAGS_client_inserts_per_thread * num_iters, |
| FLAGS_client_inserts_per_thread * num_iters, |
| FLAGS_client_num_batches_per_thread, |
| vector<CountDownLatch*>()); |
| |
| // Restart the original replica and make sure they all agree. |
| ASSERT_OK(leader_ets->Restart()); |
| |
| ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters * 2); |
| } |
| |
| void RaftConsensusITest::Write128KOpsToLeader(int num_writes) { |
| TServerDetails* leader = nullptr; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| |
| WriteRequestPB req; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| RowOperationsPB* data = req.mutable_row_operations(); |
| WriteResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromMilliseconds(10000)); |
| int key = 0; |
| |
| // generate a 128Kb dummy payload |
| string test_payload(128 * 1024, '0'); |
| for (int i = 0; i < num_writes; i++) { |
| rpc.Reset(); |
| data->Clear(); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, key, |
| test_payload, data); |
| key++; |
| ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc)); |
| |
| ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
| } |
| } |
| |
| // Test that when a follower is stopped for a long time, the log cache |
| // properly evicts operations, but still allows the follower to catch |
| // up when it comes back. |
| TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) { |
| vector<string> extra_flags; |
| extra_flags.push_back("--log_cache_size_limit_mb=1"); |
| extra_flags.push_back("--consensus_max_batch_size_bytes=500000"); |
| BuildAndStart(extra_flags); |
| TServerDetails* replica = (*tablet_replicas_.begin()).second; |
| ASSERT_TRUE(replica != nullptr); |
| ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
| |
| // Pause a replica |
| ASSERT_OK(replica_ets->Pause()); |
| LOG(INFO)<< "Paused one of the replicas, starting to write."; |
| |
| // Insert 3MB worth of data. |
| const int kNumWrites = 25; |
| NO_FATALS(Write128KOpsToLeader(kNumWrites)); |
| |
| // Now unpause the replica, the lagging replica should eventually catch back up. |
| ASSERT_OK(replica_ets->Resume()); |
| |
| ASSERT_ALL_REPLICAS_AGREE(kNumWrites); |
| } |
| |
| void RaftConsensusITest::CauseFollowerToFallBehindLogGC(string* leader_uuid, |
| int64_t* orig_term, |
| string* fell_behind_uuid) { |
| MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| // Wait for all of the replicas to have acknowledged the elected |
| // leader and logged the first NO_OP. |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
| |
| // Pause one server. This might be the leader, but pausing it will cause |
| // a leader election to happen. |
| TServerDetails* replica = (*tablet_replicas_.begin()).second; |
| ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
| ASSERT_OK(replica_ets->Pause()); |
| |
| // Find a leader. In case we paused the leader above, this will wait until |
| // we have elected a new one. |
| TServerDetails* leader = nullptr; |
| while (true) { |
| Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader); |
| if (s.ok() && leader != nullptr && leader != replica) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| *leader_uuid = leader->uuid(); |
| int leader_index = cluster_->tablet_server_index_by_uuid(*leader_uuid); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(true); |
| workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB. |
| workload.set_write_batch_size(1); |
| workload.set_num_write_threads(4); |
| workload.Setup(); |
| workload.Start(); |
| |
| LOG(INFO) << "Waiting until we've written at least 4MB..."; |
| while (workload.rows_inserted() < 8 * 4) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| |
| LOG(INFO) << "Waiting for log GC on " << leader->uuid(); |
| // Some WAL segments must exist, but wal segment 1 must not exist. |
| ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs( |
| leader_index, tablet_id_, { "wal-" }, { "wal-000000001" })); |
| |
| LOG(INFO) << "Log GC complete on " << leader->uuid(); |
| |
| // Then wait another couple of seconds to be sure that it has bothered to try |
| // to write to the paused peer. |
| // TODO: would be nice to be able to poll the leader with an RPC like |
| // GetLeaderStatus() which could tell us whether it has made any requests |
| // since the log GC. |
| SleepFor(MonoDelta::FromSeconds(2)); |
| |
| // Make a note of whatever the current term of the cluster is, |
| // before we resume the follower. |
| { |
| OpId op_id; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout, |
| &op_id)); |
| *orig_term = op_id.term(); |
| LOG(INFO) << "Servers converged with original term " << *orig_term; |
| } |
| |
| // Resume the follower. |
| LOG(INFO) << "Resuming " << replica->uuid(); |
| ASSERT_OK(replica_ets->Resume()); |
| |
| // Ensure that none of the tablet servers crashed. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| // Make sure it didn't crash. |
| ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive()) |
| << "Tablet server " << i << " crashed"; |
| } |
| *fell_behind_uuid = replica->uuid(); |
| } |
| |
| // Test that the leader doesn't crash if one of its followers has |
| // fallen behind so far that the logs necessary to catch it up |
| // have been GCed. |
| // |
| // In a real cluster, this will eventually cause the follower to be |
| // evicted/replaced. In any case, the leader should not crash. |
| // |
| // We also ensure that, when the leader stops writing to the follower, |
| // the follower won't disturb the other nodes when it attempts to elect |
| // itself. |
| // |
| // This is a regression test for KUDU-775 and KUDU-562. |
| TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) { |
| // Disable follower eviction to maintain the original intent of this test. |
| vector<string> extra_flags = { "--evict_failed_followers=false" }; |
| AddFlagsForLogRolls(&extra_flags); // For CauseFollowerToFallBehindLogGC(). |
| BuildAndStart(extra_flags); |
| |
| string leader_uuid; |
| int64_t orig_term; |
| string follower_uuid; |
| NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid)); |
| |
| // Wait for remaining majority to agree. |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(3, active_tablet_servers.size()); |
| ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid)); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, tablet_id_, |
| 1)); |
| |
| if (AllowSlowTests()) { |
| // Sleep long enough that the "abandoned" server's leader election interval |
| // will trigger several times. Then, verify that the term has not increased. |
| // This ensures that the other servers properly ignore the election requests |
| // from the abandoned node. |
| // TODO: would be nicer to use an RPC to check the current term of the |
| // abandoned replica, and wait until it has incremented a couple of times. |
| SleepFor(MonoDelta::FromSeconds(5)); |
| OpId op_id; |
| TServerDetails* leader = tablet_servers_[leader_uuid]; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, |
| MonoDelta::FromSeconds(10), &op_id)); |
| ASSERT_EQ(orig_term, op_id.term()) |
| << "expected the leader to have not advanced terms but has op " << op_id; |
| } |
| } |
| |
| // This test starts several tablet servers, and configures them with |
| // fault injection so that the leaders frequently crash just before |
| // sending RPCs to followers. |
| // |
| // This can result in various scenarios where leaders crash right after |
| // being elected and never succeed in replicating their first operation. |
| // For example, KUDU-783 reproduces from this test approximately 5% of the |
| // time on a slow-test debug build. |
| TEST_F(RaftConsensusITest, InsertWithCrashyNodes) { |
| int kCrashesToCause = 3; |
| if (AllowSlowTests()) { |
| FLAGS_num_tablet_servers = 7; |
| FLAGS_num_replicas = 7; |
| kCrashesToCause = 15; |
| } |
| |
| vector<string> ts_flags, master_flags; |
| |
| // Crash 5% of the time just before sending an RPC. With 7 servers, |
| // this means we crash about 30% of the time before we've fully |
| // replicated the NO_OP at the start of the term. |
| ts_flags.push_back("--fault_crash_on_leader_request_fraction=0.05"); |
| |
| // Inject latency to encourage the replicas to fall out of sync |
| // with each other. |
| ts_flags.push_back("--log_inject_latency"); |
| ts_flags.push_back("--log_inject_latency_ms_mean=30"); |
| ts_flags.push_back("--log_inject_latency_ms_stddev=60"); |
| |
| // Make leader elections faster so we get through more cycles of |
| // leaders. |
| ts_flags.push_back("--raft_heartbeat_interval_ms=100"); |
| ts_flags.push_back("--leader_failure_monitor_check_mean_ms=50"); |
| ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=25"); |
| |
| // Avoid preallocating segments since bootstrap is a little bit |
| // faster if it doesn't have to scan forward through the preallocated |
| // log area. |
| ts_flags.push_back("--log_preallocate_segments=false"); |
| |
| CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_timeout_allowed(true); |
| workload.set_write_timeout_millis(10000); |
| workload.set_num_write_threads(10); |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| int num_crashes = 0; |
| while (num_crashes < kCrashesToCause && |
| workload.rows_inserted() < 100) { |
| num_crashes += RestartAnyCrashedTabletServers(); |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| workload.StopAndJoin(); |
| |
| // After we stop the writes, we can still get crashes because heartbeats could |
| // trigger the fault path. So, disable the faults and restart one more time. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| vector<string>* flags = ts->mutable_flags(); |
| bool removed_flag = false; |
| for (auto it = flags->begin(); it != flags->end(); ++it) { |
| if (HasPrefixString(*it, "--fault_crash")) { |
| flags->erase(it); |
| removed_flag = true; |
| break; |
| } |
| } |
| ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i |
| << "\nFlags:\n" << *flags; |
| ts->Shutdown(); |
| CHECK_OK(ts->Restart()); |
| } |
| |
| // Ensure that the replicas converge. |
| // We don't know exactly how many rows got inserted, since the writer |
| // probably saw many errors which left inserts in indeterminate state. |
| // But, we should have at least as many as we got confirmation for. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST, |
| workload.rows_inserted())); |
| } |
| |
| // This test sets all of the election timers to be very short, resulting |
| // in a lot of churn. We expect to make some progress and not diverge or |
| // crash, despite the frequent re-elections and races. |
| TEST_F(RaftConsensusITest, TestChurnyElections) { |
| DoTestChurnyElections(WITHOUT_NOTIFICATION_LATENCY); |
| } |
| |
| // The same test, except inject artificial latency when propagating notifications |
| // from the queue back to consensus. This can reproduce bugs like KUDU-1078 which |
| // normally only appear under high load. TODO: Re-enable once we get to the |
| // bottom of KUDU-1078. |
| TEST_F(RaftConsensusITest, DISABLED_TestChurnyElections_WithNotificationLatency) { |
| DoTestChurnyElections(WITH_NOTIFICATION_LATENCY); |
| } |
| |
| void RaftConsensusITest::DoTestChurnyElections(bool with_latency) { |
| vector<string> ts_flags, master_flags; |
| |
| #ifdef THREAD_SANITIZER |
| // On TSAN builds, we need to be a little bit less churny in order to make |
| // any progress at all. |
| ts_flags.push_back("--raft_heartbeat_interval_ms=5"); |
| #else |
| ts_flags.push_back("--raft_heartbeat_interval_ms=1"); |
| #endif |
| ts_flags.push_back("--leader_failure_monitor_check_mean_ms=1"); |
| ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=1"); |
| ts_flags.push_back("--never_fsync"); |
| if (with_latency) { |
| ts_flags.push_back("--consensus_inject_latency_ms_in_notifications=50"); |
| } |
| |
| CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_timeout_allowed(true); |
| workload.set_write_timeout_millis(100); |
| workload.set_num_write_threads(2); |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Run for either a prescribed number of writes, or 30 seconds, |
| // whichever comes first. This prevents test timeouts on slower |
| // build machines, TSAN builds, etc. |
| Stopwatch sw; |
| sw.start(); |
| const int kNumWrites = AllowSlowTests() ? 10000 : 1000; |
| while (workload.rows_inserted() < kNumWrites && |
| sw.elapsed().wall_seconds() < 30) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| NO_FATALS(AssertNoTabletServersCrashed()); |
| } |
| workload.StopAndJoin(); |
| ASSERT_GT(workload.rows_inserted(), 0) << "No rows inserted"; |
| |
| // Ensure that the replicas converge. |
| // We don't know exactly how many rows got inserted, since the writer |
| // probably saw many errors which left inserts in indeterminate state. |
| // But, we should have at least as many as we got confirmation for. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST, |
| workload.rows_inserted())); |
| NO_FATALS(AssertNoTabletServersCrashed()); |
| } |
| |
| TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) { |
| int kNumElections = FLAGS_num_replicas; |
| |
| if (AllowSlowTests()) { |
| FLAGS_num_tablet_servers = 7; |
| FLAGS_num_replicas = 7; |
| kNumElections = 3 * FLAGS_num_replicas; |
| } |
| |
| // Reset consensus rpc timeout to the default value or the election might fail often. |
| FLAGS_consensus_rpc_timeout_ms = 1000; |
| |
| // Start a 7 node configuration cluster (since we can't bring leaders back we start with a |
| // higher replica count so that we kill more leaders). |
| |
| vector<string> flags; |
| BuildAndStart(flags); |
| |
| OverrideFlagForSlowTests( |
| "client_inserts_per_thread", |
| strings::Substitute("$0", (FLAGS_client_inserts_per_thread * 100))); |
| OverrideFlagForSlowTests( |
| "client_num_batches_per_thread", |
| strings::Substitute("$0", (FLAGS_client_num_batches_per_thread * 100))); |
| |
| int num_threads = FLAGS_num_client_threads; |
| int64_t total_num_rows = num_threads * FLAGS_client_inserts_per_thread; |
| |
| // We create 2 * (kNumReplicas - 1) latches so that we kill the same node at least |
| // twice. |
| vector<CountDownLatch*> latches; |
| for (int i = 1; i < kNumElections; i++) { |
| latches.push_back(new CountDownLatch((i * total_num_rows) / kNumElections)); |
| } |
| |
| for (int i = 0; i < num_threads; i++) { |
| scoped_refptr<kudu::Thread> new_thread; |
| CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i), |
| &RaftConsensusITest::InsertTestRowsRemoteThread, |
| this, i * FLAGS_client_inserts_per_thread, |
| FLAGS_client_inserts_per_thread, |
| FLAGS_client_num_batches_per_thread, |
| latches, |
| &new_thread)); |
| threads_.push_back(new_thread); |
| } |
| |
| for (CountDownLatch* latch : latches) { |
| latch->Wait(); |
| StopOrKillLeaderAndElectNewOne(); |
| } |
| |
| for (scoped_refptr<kudu::Thread> thr : threads_) { |
| CHECK_OK(ThreadJoiner(thr.get()).Join()); |
| } |
| |
| ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads); |
| STLDeleteElements(&latches); |
| } |
| |
| // Test automatic leader election by killing leaders. |
| TEST_F(RaftConsensusITest, TestAutomaticLeaderElection) { |
| if (AllowSlowTests()) { |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 5; |
| } |
| BuildAndStart(vector<string>()); |
| |
| TServerDetails* leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| |
| unordered_set<TServerDetails*> killed_leaders; |
| |
| const int kNumLeadersToKill = FLAGS_num_replicas / 2; |
| const int kFinalNumReplicas = FLAGS_num_replicas / 2 + 1; |
| |
| for (int leaders_killed = 0; leaders_killed < kFinalNumReplicas; leaders_killed++) { |
| LOG(INFO) << Substitute("Writing data to leader of $0-node config ($1 alive)...", |
| FLAGS_num_replicas, FLAGS_num_replicas - leaders_killed); |
| |
| InsertTestRowsRemoteThread(leaders_killed * FLAGS_client_inserts_per_thread, |
| FLAGS_client_inserts_per_thread, |
| FLAGS_client_num_batches_per_thread, |
| vector<CountDownLatch*>()); |
| |
| // At this point, the writes are flushed but the commit index may not be |
| // propagated to all replicas. We kill the leader anyway. |
| if (leaders_killed < kNumLeadersToKill) { |
| LOG(INFO) << "Killing current leader " << leader->instance_id.permanent_uuid() << "..."; |
| cluster_->tablet_server_by_uuid(leader->uuid())->Shutdown(); |
| InsertOrDie(&killed_leaders, leader); |
| |
| LOG(INFO) << "Waiting for new guy to be elected leader."; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| } |
| } |
| |
| // Restart every node that was killed, and wait for the nodes to converge |
| for (TServerDetails* killed_node : killed_leaders) { |
| CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart()); |
| } |
| // Verify the data on the remaining replicas. |
| ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * kFinalNumReplicas); |
| } |
| |
| // Single-replica leader election test. |
| TEST_F(RaftConsensusITest, TestAutomaticLeaderElectionOneReplica) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| NO_FATALS(BuildAndStart()); |
| // Ensure that single-node Raft configs elect themselves as leader |
| // immediately upon Consensus startup. |
| ASSERT_OK(GetReplicaStatusAndCheckIfLeader(tablet_servers_[cluster_->tablet_server(0)->uuid()], |
| tablet_id_, MonoDelta::FromMilliseconds(500))); |
| } |
| |
| void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) { |
| vector<TServerDetails*> servers; |
| AppendValuesFromMap(tablet_servers_, &servers); |
| CHECK_LT(replica_idx, servers.size()); |
| TServerDetails* ts = servers[replica_idx]; |
| |
| // Manually construct an RPC to our target replica. We expect most of the calls |
| // to fail either with an "already present" or an error because we are writing |
| // to a follower. That's OK, though - what we care about for this test is |
| // just that the operations Apply() in the same order everywhere (even though |
| // in this case the result will just be an error). |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController rpc; |
| req.set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal, |
| "hello world", req.mutable_row_operations()); |
| |
| while (!finish->Load()) { |
| resp.Clear(); |
| rpc.Reset(); |
| rpc.set_timeout(MonoDelta::FromSeconds(10)); |
| ignore_result(ts->tserver_proxy->Write(req, &resp, &rpc)); |
| VLOG(1) << "Response from server " << replica_idx << ": " |
| << resp.ShortDebugString(); |
| } |
| } |
| |
| // Regression test for KUDU-597, an issue where we could mis-order operations on |
| // a machine if the following sequence occurred: |
| // 1) Replica is a FOLLOWER |
| // 2) A client request hits the machine |
| // 3) It receives some operations from the current leader |
| // 4) It gets elected LEADER |
| // In this scenario, it would incorrectly sequence the client request's PREPARE phase |
| // before the operations received in step (3), even though the correct behavior would be |
| // to either reject them or sequence them after those operations, because the operation |
| // index is higher. |
| // |
| // The test works by setting up three replicas and manually hammering them with write |
| // requests targeting a single row. If the bug exists, then TransactionOrderVerifier |
| // will trigger an assertion because the prepare order and the op indexes will become |
| // misaligned. |
| TEST_F(RaftConsensusITest, TestKUDU_597) { |
| FLAGS_num_replicas = 3; |
| FLAGS_num_tablet_servers = 3; |
| BuildAndStart(vector<string>()); |
| |
| AtomicBool finish(false); |
| for (int i = 0; i < FLAGS_num_tablet_servers; i++) { |
| scoped_refptr<kudu::Thread> new_thread; |
| CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i), |
| &RaftConsensusITest::StubbornlyWriteSameRowThread, |
| this, i, &finish, &new_thread)); |
| threads_.push_back(new_thread); |
| } |
| |
| const int num_loops = AllowSlowTests() ? 10 : 1; |
| for (int i = 0; i < num_loops; i++) { |
| StopOrKillLeaderAndElectNewOne(); |
| SleepFor(MonoDelta::FromSeconds(1)); |
| ASSERT_OK(CheckTabletServersAreAlive(FLAGS_num_tablet_servers)); |
| } |
| |
| finish.Store(true); |
| for (scoped_refptr<kudu::Thread> thr : threads_) { |
| CHECK_OK(ThreadJoiner(thr.get()).Join()); |
| } |
| } |
| |
| void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) { |
| ReplicateMsg* msg = req->add_ops(); |
| msg->mutable_id()->CopyFrom(id); |
| msg->set_timestamp(id.index()); |
| msg->set_op_type(consensus::WRITE_OP); |
| WriteRequestPB* write_req = msg->mutable_write_request(); |
| CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema())); |
| write_req->set_tablet_id(tablet_id_); |
| int key = id.index() * 10000 + id.term(); |
| AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, id.term(), |
| id.ShortDebugString(), write_req->mutable_row_operations()); |
| } |
| |
| // Regression test for KUDU-644: |
| // Triggers some complicated scenarios on the replica involving aborting and |
| // replacing transactions. |
| TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { |
| FLAGS_num_replicas = 3; |
| FLAGS_num_tablet_servers = 3; |
| vector<string> ts_flags, master_flags; |
| ts_flags.push_back("--enable_leader_failure_detection=false"); |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| BuildAndStart(ts_flags, master_flags); |
| |
| // Kill all the servers but one. |
| TServerDetails *replica_ts; |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| |
| // Elect server 2 as leader and wait for log index 1 to propagate to all servers. |
| ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| replica_ts = tservers[0]; |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| LOG(INFO) << "================================== Cluster setup complete."; |
| |
| // Check that the 'term' metric is correctly exposed. |
| { |
| int64_t term_from_metric = -1; |
| ASSERT_OK(cluster_->tablet_server_by_uuid(replica_ts->uuid())->GetInt64Metric( |
| &METRIC_ENTITY_tablet, |
| nullptr, |
| &METRIC_raft_term, |
| "value", |
| &term_from_metric)); |
| ASSERT_EQ(term_from_metric, 1); |
| } |
| |
| ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get()); |
| |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| |
| // Send a simple request with no ops. |
| req.set_tablet_id(tablet_id_); |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_caller_uuid("fake_caller"); |
| req.set_caller_term(2); |
| req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1)); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); |
| |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
| |
| // Send some operations, but don't advance the commit index. |
| // They should not commit. |
| AddOp(MakeOpId(2, 2), &req); |
| AddOp(MakeOpId(2, 3), &req); |
| AddOp(MakeOpId(2, 4), &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
| |
| // We shouldn't read anything yet, because the ops should be pending. |
| { |
| vector<string> results; |
| NO_FATALS(ScanReplica(replica_ts->tserver_proxy.get(), &results)); |
| ASSERT_EQ(0, results.size()) << results; |
| } |
| |
| // Send op 2.6, but set preceding OpId to 2.4. This is an invalid |
| // request, and the replica should reject it. |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| req.clear_ops(); |
| AddOp(MakeOpId(2, 6), &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()) << resp.DebugString(); |
| ASSERT_EQ(resp.error().status().message(), |
| "New operation's index does not follow the previous op's index. " |
| "Current: 2.6. Previous: 2.4"); |
| |
| resp.Clear(); |
| req.clear_ops(); |
| // Send ops 3.5 and 2.6, then commit up to index 6, the replica |
| // should fail because of the out-of-order terms. |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| AddOp(MakeOpId(3, 5), &req); |
| AddOp(MakeOpId(2, 6), &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()) << resp.DebugString(); |
| ASSERT_EQ(resp.error().status().message(), |
| "New operation's term is not >= than the previous op's term." |
| " Current: 2.6. Previous: 3.5"); |
| |
| // Regression test for KUDU-639: if we send a valid request, but the |
| // current commit index is higher than the data we're sending, we shouldn't |
| // commit anything higher than the last op sent by the leader. |
| // |
| // To test, we re-send operation 2.3, with the correct preceding ID 2.2, |
| // but we set the committed index to 2.4. This should only commit |
| // 2.2 and 2.3. |
| resp.Clear(); |
| req.clear_ops(); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2)); |
| AddOp(MakeOpId(2, 3), &req); |
| req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4)); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
| // Verify only 2.2 and 2.3 are committed. |
| { |
| vector<string> results; |
| NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results)); |
| ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2"); |
| ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3"); |
| } |
| |
| resp.Clear(); |
| req.clear_ops(); |
| // Now send some more ops, and commit the earlier ones. |
| req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4)); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| AddOp(MakeOpId(2, 5), &req); |
| AddOp(MakeOpId(2, 6), &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
| |
| // Verify they are committed. |
| { |
| vector<string> results; |
| NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 3, &results)); |
| ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2"); |
| ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3"); |
| ASSERT_STR_CONTAINS(results[2], "term: 2 index: 4"); |
| } |
| |
| // At this point, we still have two operations which aren't committed. If we |
| // try to perform a snapshot-consistent scan, we should time out rather than |
| // hanging the RPC service thread. |
| { |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromMilliseconds(100)); |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(tablet_id_); |
| scan->set_read_mode(READ_AT_SNAPSHOT); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Send the call. We expect to get a timeout passed back from the server side |
| // (i.e. not an RPC timeout) |
| req.set_batch_size_bytes(0); |
| SCOPED_TRACE(req.DebugString()); |
| ASSERT_OK(replica_ts->tserver_proxy->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(resp.DebugString()); |
| string err_str = StatusFromPB(resp.error().status()).ToString(); |
| ASSERT_STR_CONTAINS(err_str, "Timed out waiting for all transactions"); |
| ASSERT_STR_CONTAINS(err_str, "to commit"); |
| } |
| |
| resp.Clear(); |
| req.clear_ops(); |
| int leader_term = 2; |
| const int kNumTerms = AllowSlowTests() ? 10000 : 100; |
| while (leader_term < kNumTerms) { |
| leader_term++; |
| // Now pretend to be a new leader (term 3) and replace the earlier ops |
| // without committing the new replacements. |
| req.set_caller_term(leader_term); |
| req.set_caller_uuid("new_leader"); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| req.clear_ops(); |
| AddOp(MakeOpId(leader_term, 5), &req); |
| AddOp(MakeOpId(leader_term, 6), &req); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << "Req: " << req.ShortDebugString() |
| << " Resp: " << resp.DebugString(); |
| } |
| |
| // Send an empty request from the newest term which should commit |
| // the earlier ops. |
| { |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6)); |
| req.mutable_committed_index()->CopyFrom(MakeOpId(leader_term, 6)); |
| req.clear_ops(); |
| rpc.Reset(); |
| ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
| } |
| |
| // Verify the new rows are committed. |
| { |
| vector<string> results; |
| NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 5, &results)); |
| SCOPED_TRACE(results); |
| ASSERT_STR_CONTAINS(results[3], Substitute("term: $0 index: 5", leader_term)); |
| ASSERT_STR_CONTAINS(results[4], Substitute("term: $0 index: 6", leader_term)); |
| } |
| } |
| |
| TEST_F(RaftConsensusITest, TestLeaderStepDown) { |
| FLAGS_num_replicas = 3; |
| FLAGS_num_tablet_servers = 3; |
| |
| vector<string> ts_flags, master_flags; |
| ts_flags.push_back("--enable_leader_failure_detection=false"); |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| BuildAndStart(ts_flags, master_flags); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| |
| // Start with no leader. |
| Status s = GetReplicaStatusAndCheckIfLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)); |
| ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " << s.ToString(); |
| |
| // Become leader. |
| ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT, |
| kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 2)); |
| |
| // Step down and test that a 2nd stepdown returns the expected result. |
| ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
| TabletServerErrorPB error; |
| s = LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10), &error); |
| ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader anymore: " << s.ToString(); |
| ASSERT_EQ(TabletServerErrorPB::NOT_THE_LEADER, error.code()) << error.ShortDebugString(); |
| |
| s = WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT, |
| kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10)); |
| ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not accept writes as follower: " |
| << s.ToString(); |
| } |
| |
| void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites( |
| const TabletServerMap& tablet_servers, const string& leader_uuid) { |
| |
| TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid); |
| |
| // Calculate number of servers to leave unpaused (minority). |
| // This math is a little unintuitive but works for cluster sizes including 2 and 1. |
| // Note: We assume all of these TSes are voters. |
| int config_size = tablet_servers.size(); |
| int minority_to_retain = MajoritySize(config_size) - 1; |
| |
| // Only perform this part of the test if we have some servers to pause, else |
| // the failure assertions will throw. |
| if (config_size > 1) { |
| // Pause enough replicas to prevent a majority. |
| int num_to_pause = config_size - minority_to_retain; |
| LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size; |
| vector<string> paused_uuids; |
| for (const TabletServerMap::value_type& entry : tablet_servers) { |
| if (paused_uuids.size() == num_to_pause) { |
| continue; |
| } |
| const string& replica_uuid = entry.first; |
| if (replica_uuid == leader_uuid) { |
| // Always leave this one alone. |
| continue; |
| } |
| ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid); |
| ASSERT_OK(replica_ts->Pause()); |
| paused_uuids.push_back(replica_uuid); |
| } |
| |
| // Ensure writes timeout while only a minority is alive. |
| Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE, |
| kTestRowKey, kTestRowIntVal, "foo", |
| MonoDelta::FromMilliseconds(100)); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| // Step down. |
| ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| |
| // Assert that elections time out without a live majority. |
| // We specify a very short timeout here to keep the tests fast. |
| ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100)); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString(); |
| |
| // Resume the paused servers. |
| LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size; |
| for (const string& replica_uuid : paused_uuids) { |
| ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid); |
| ASSERT_OK(replica_ts->Resume()); |
| } |
| } |
| |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(20), tablet_servers, tablet_id_, 1)); |
| |
| // Now an election should succeed. |
| ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| LOG(INFO) << "Successful election with full config of size " << config_size; |
| |
| // And a write should also succeed. |
| ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE, |
| kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size), |
| MonoDelta::FromSeconds(10))); |
| } |
| |
| // Return the replicas of the specified 'tablet_id', as seen by the Master. |
| Status RaftConsensusITest::GetTabletLocations(const string& tablet_id, const MonoDelta& timeout, |
| master::TabletLocationsPB* tablet_locations) { |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| GetTabletLocationsRequestPB req; |
| *req.add_tablet_ids() = tablet_id; |
| GetTabletLocationsResponsePB resp; |
| RETURN_NOT_OK(cluster_->master_proxy()->GetTabletLocations(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| if (resp.errors_size() > 0) { |
| CHECK_EQ(1, resp.errors_size()) << resp.ShortDebugString(); |
| CHECK_EQ(tablet_id, resp.errors(0).tablet_id()) << resp.ShortDebugString(); |
| return StatusFromPB(resp.errors(0).status()); |
| } |
| CHECK_EQ(1, resp.tablet_locations_size()) << resp.ShortDebugString(); |
| *tablet_locations = resp.tablet_locations(0); |
| return Status::OK(); |
| } |
| |
| void RaftConsensusITest::WaitForReplicasReportedToMaster( |
| int num_replicas, const string& tablet_id, |
| const MonoDelta& timeout, |
| WaitForLeader wait_for_leader, |
| bool* has_leader, |
| master::TabletLocationsPB* tablet_locations) { |
| MonoTime deadline(MonoTime::Now(MonoTime::FINE)); |
| deadline.AddDelta(timeout); |
| while (true) { |
| ASSERT_OK(GetTabletLocations(tablet_id, timeout, tablet_locations)); |
| *has_leader = false; |
| if (tablet_locations->replicas_size() == num_replicas) { |
| for (const master::TabletLocationsPB_ReplicaPB& replica : |
| tablet_locations->replicas()) { |
| if (replica.role() == RaftPeerPB::LEADER) { |
| *has_leader = true; |
| } |
| } |
| if (wait_for_leader == NO_WAIT_FOR_LEADER || |
| (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) { |
| break; |
| } |
| } |
| if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) break; |
| SleepFor(MonoDelta::FromMilliseconds(20)); |
| } |
| ASSERT_EQ(num_replicas, tablet_locations->replicas_size()) << tablet_locations->DebugString(); |
| if (wait_for_leader == WAIT_FOR_LEADER) { |
| ASSERT_TRUE(*has_leader) << tablet_locations->DebugString(); |
| } |
| } |
| |
| // Basic test of adding and removing servers from a configuration. |
| TEST_F(RaftConsensusITest, TestAddRemoveServer) { |
| MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
| vector<string> master_flags = { "--master_add_server_when_underreplicated=false" }; |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| const string& leader_uuid = tservers[0]->uuid(); |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
| |
| // Make sure the server rejects removal of itself from the configuration. |
| Status s = RemoveServer(leader_tserver, tablet_id_, leader_tserver, boost::none, kTimeout); |
| ASSERT_TRUE(s.IsInvalidArgument()) << "Should not be able to remove self from config: " |
| << s.ToString(); |
| |
| // Insert the row that we will update throughout the test. |
| ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_, RowOperationsPB::INSERT, |
| kTestRowKey, kTestRowIntVal, "initial insert", kTimeout)); |
| |
| // Kill the master, so we can change the config without interference. |
| cluster_->master()->Shutdown(); |
| |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| |
| // Do majority correctness check for 3 servers. |
| NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
| OpId opid; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout, |
| &opid)); |
| int64_t cur_log_index = opid.index(); |
| |
| // Go from 3 tablet servers down to 1 in the configuration. |
| vector<int> remove_list = { 2, 1 }; |
| for (int to_remove_idx : remove_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas"; |
| |
| TServerDetails* tserver_to_remove = tservers[to_remove_idx]; |
| LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid(); |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, kTimeout)); |
| ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid())); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Do majority correctness check for each incremental decrease. |
| NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout, |
| &opid)); |
| cur_log_index = opid.index(); |
| } |
| |
| // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration. |
| vector<int> add_list = { 1, 2 }; |
| for (int to_add_idx : add_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas"; |
| |
| TServerDetails* tserver_to_add = tservers[to_add_idx]; |
| LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, RaftPeerPB::VOTER, boost::none, |
| kTimeout)); |
| InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Do majority correctness check for each incremental increase. |
| NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout, |
| &opid)); |
| cur_log_index = opid.index(); |
| } |
| } |
| |
| // Regression test for KUDU-1169: a crash when a Config Change operation is replaced |
| // by a later leader. |
| TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
| vector<string> master_flags = { "--master_add_server_when_underreplicated=false" }; |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| |
| TabletServerMap original_followers = tablet_servers_; |
| ASSERT_EQ(1, original_followers.erase(leader_tserver->uuid())); |
| |
| |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| // Shut down servers 1 and 2, so that server 0 can't replicate anything. |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| // Now try to replicate a ChangeConfig operation. This should get stuck and time out |
| // because the server can't replicate any operations. |
| TabletServerErrorPB::Code error_code; |
| Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1], |
| -1, MonoDelta::FromSeconds(1), |
| &error_code); |
| ASSERT_TRUE(s.IsTimedOut()); |
| |
| // Pause the leader, and restart the other servers. |
| cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Pause(); |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Restart()); |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Restart()); |
| |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), original_followers, tablet_id_, 1)); |
| |
| // Elect one of the other servers. |
| ASSERT_OK(StartElection(tservers[1], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitUntilLeader(tservers[1], tablet_id_, MonoDelta::FromSeconds(10))); |
| leader_tserver = tservers[1]; |
| |
| // Resume the original leader. Its change-config operation will now be aborted |
| // since it was never replicated to the majority, and the new leader will have |
| // replaced the operation. |
| cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Resume(); |
| |
| // Insert some data and verify that it propagates to all servers. |
| NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1, vector<CountDownLatch*>())); |
| ASSERT_ALL_REPLICAS_AGREE(10); |
| |
| // Try another config change. |
| // This acts as a regression test for KUDU-1338, in which aborting the original |
| // config change didn't properly unset the 'pending' configuration. |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tservers[2], |
| -1, MonoDelta::FromSeconds(5), |
| &error_code)); |
| NO_FATALS(InsertTestRowsRemoteThread(10, 10, 1, vector<CountDownLatch*>())); |
| } |
| |
| // Test the atomic CAS arguments to ChangeConfig() add server and remove server. |
| TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
| vector<string> master_flags = { "--master_add_server_when_underreplicated=false" }; |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| int64_t cur_log_index = 1; |
| |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| |
| TServerDetails* follower_ts = tservers[2]; |
| |
| // Initial committed config should have opid_index == -1. |
| // Server should reject request to change config from opid other than this. |
| int64_t invalid_committed_opid_index = 7; |
| TabletServerErrorPB::Code error_code; |
| Status s = RemoveServer(leader_tserver, tablet_id_, follower_ts, |
| invalid_committed_opid_index, MonoDelta::FromSeconds(10), |
| &error_code); |
| ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code); |
| ASSERT_STR_CONTAINS(s.ToString(), "of 7 but the committed config has opid_index of -1"); |
| |
| // Specifying the correct committed opid index should work. |
| int64_t committed_opid_index = -1; |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, follower_ts, |
| committed_opid_index, MonoDelta::FromSeconds(10))); |
| |
| ASSERT_EQ(1, active_tablet_servers.erase(follower_ts->uuid())); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Now, add the server back. Again, specifying something other than the |
| // latest committed_opid_index should fail. |
| invalid_committed_opid_index = -1; // The old one is no longer valid. |
| s = AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER, |
| invalid_committed_opid_index, MonoDelta::FromSeconds(10), |
| &error_code); |
| ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code); |
| ASSERT_STR_CONTAINS(s.ToString(), "of -1 but the committed config has opid_index of 2"); |
| |
| // Specifying the correct committed opid index should work. |
| // The previous config change op is the latest entry in the log. |
| committed_opid_index = cur_log_index; |
| ASSERT_OK(AddServer(leader_tserver, tablet_id_, follower_ts, RaftPeerPB::VOTER, |
| committed_opid_index, MonoDelta::FromSeconds(10))); |
| |
| InsertOrDie(&active_tablet_servers, follower_ts->uuid(), follower_ts); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, ++cur_log_index)); |
| } |
| |
| // Ensure that we can elect a server that is in the "pending" configuration. |
| // This is required by the Raft protocol. See Diego Ongaro's PhD thesis, section |
| // 4.1, where it states that "it is the caller’s configuration that is used in |
| // reaching consensus, both for voting and for log replication". |
| // |
| // This test also tests the case where a node comes back from the dead to a |
| // leader that was not in its configuration when it died. That should also work, i.e. |
| // the revived node should accept writes from the new leader. |
| TEST_F(RaftConsensusITest, TestElectPendingVoter) { |
| // Test plan: |
| // 1. Disable failure detection to avoid non-deterministic behavior. |
| // 2. Start with a configuration size of 5, all servers synced. |
| // 3. Remove one server from the configuration, wait until committed. |
| // 4. Pause the 3 remaining non-leaders (SIGSTOP). |
| // 5. Run a config change to add back the previously-removed server. |
| // Ensure that, while the op cannot be committed yet due to lack of a |
| // majority in the new config (only 2 out of 5 servers are alive), the op |
| // has been replicated to both the local leader and the new member. |
| // 6. Force the existing leader to step down. |
| // 7. Resume one of the paused nodes so that a majority (of the 5-node |
| // configuration, but not the original 4-node configuration) will be available. |
| // 8. Start a leader election on the new (pending) node. It should win. |
| // 9. Unpause the two remaining stopped nodes. |
| // 10. Wait for all nodes to sync to the new leader's log. |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 5; |
| vector<string> ts_flags, master_flags; |
| ts_flags.push_back("--enable_leader_failure_detection=false"); |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| BuildAndStart(ts_flags, master_flags); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* initial_leader = tservers[0]; |
| ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| // The server we will remove and then bring back. |
| TServerDetails* final_leader = tservers[4]; |
| |
| // Kill the master, so we can change the config without interference. |
| cluster_->master()->Shutdown(); |
| |
| // Now remove server 4 from the configuration. |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| LOG(INFO) << "Removing tserver with uuid " << final_leader->uuid(); |
| ASSERT_OK(RemoveServer(initial_leader, tablet_id_, final_leader, boost::none, |
| MonoDelta::FromSeconds(10))); |
| ASSERT_EQ(1, active_tablet_servers.erase(final_leader->uuid())); |
| int64_t cur_log_index = 2; |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, cur_log_index)); |
| |
| // Pause tablet servers 1 through 3, so they won't see the operation to add |
| // server 4 back. |
| LOG(INFO) << "Pausing 3 replicas..."; |
| for (int i = 1; i <= 3; i++) { |
| ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(tservers[i]->uuid()); |
| ASSERT_OK(replica_ts->Pause()); |
| } |
| |
| // Now add server 4 back to the peers. |
| // This operation will time out on the client side. |
| LOG(INFO) << "Adding back Peer " << final_leader->uuid() << " and expecting timeout..."; |
| Status s = AddServer(initial_leader, tablet_id_, final_leader, RaftPeerPB::VOTER, boost::none, |
| MonoDelta::FromMilliseconds(100)); |
| ASSERT_TRUE(s.IsTimedOut()) << "Expected AddServer() to time out. Result: " << s.ToString(); |
| LOG(INFO) << "Timeout achieved."; |
| active_tablet_servers = tablet_servers_; // Reset to the unpaused servers. |
| for (int i = 1; i <= 3; i++) { |
| ASSERT_EQ(1, active_tablet_servers.erase(tservers[i]->uuid())); |
| } |
| // Only wait for TS 0 and 4 to agree that the new change config op has been |
| // replicated. |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Now that TS 4 is electable (and pending), have TS 0 step down. |
| LOG(INFO) << "Forcing Peer " << initial_leader->uuid() << " to step down..."; |
| ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| |
| // Resume TS 1 so we have a majority of 3 to elect a new leader. |
| LOG(INFO) << "Resuming Peer " << tservers[1]->uuid() << " ..."; |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Resume()); |
| InsertOrDie(&active_tablet_servers, tservers[1]->uuid(), tservers[1]); |
| |
| // Now try to get TS 4 elected. It should succeed and push a NO_OP. |
| LOG(INFO) << "Trying to elect Peer " << tservers[4]->uuid() << " ..."; |
| ASSERT_OK(StartElection(final_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, ++cur_log_index)); |
| |
| // Resume the remaining paused nodes. |
| LOG(INFO) << "Resuming remaining nodes..."; |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume()); |
| ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[3]->uuid())->Resume()); |
| active_tablet_servers = tablet_servers_; |
| |
| // Do one last operation on the new leader: an insert. |
| ASSERT_OK(WriteSimpleTestRow(final_leader, tablet_id_, RowOperationsPB::INSERT, |
| kTestRowKey, kTestRowIntVal, "Ob-La-Di, Ob-La-Da", |
| MonoDelta::FromSeconds(10))); |
| |
| // Wait for all servers to replicate everything up through the last write op. |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, ++cur_log_index)); |
| } |
| |
| // Writes test rows in ascending order to a single tablet server. |
| // Essentially a poor-man's version of TestWorkload that only operates on a |
| // single tablet. Does not batch, does not tolerate timeouts, and does not |
| // interact with the Master. 'rows_inserted' is used to determine row id and is |
| // incremented prior to each successful insert. Since a write failure results in |
| // a crash, as long as there is no crash then 'rows_inserted' will have a |
| // correct count at the end of the run. |
| // Crashes on any failure, so 'write_timeout' should be high. |
| void DoWriteTestRows(const TServerDetails* leader_tserver, |
| const string& tablet_id, |
| const MonoDelta& write_timeout, |
| AtomicInt<int32_t>* rows_inserted, |
| const AtomicBool* finish) { |
| |
| while (!finish->Load()) { |
| int row_key = rows_inserted->Increment(); |
| CHECK_OK(WriteSimpleTestRow(leader_tserver, tablet_id, RowOperationsPB::INSERT, |
| row_key, row_key, Substitute("key=$0", row_key), |
| write_timeout)); |
| } |
| } |
| |
| // Test that config change works while running a workload. |
| TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
| vector<string> master_flags = { "--master_add_server_when_underreplicated=false" }; |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| BuildAndStart(ts_flags, master_flags); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
| TServerDetails* leader_tserver = tservers[0]; |
| ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| |
| // Start a write workload. |
| LOG(INFO) << "Starting write workload..."; |
| vector<scoped_refptr<Thread> > threads; |
| AtomicInt<int32_t> rows_inserted(0); |
| AtomicBool finish(false); |
| int num_threads = FLAGS_num_client_threads; |
| for (int i = 0; i < num_threads; i++) { |
| scoped_refptr<Thread> thread; |
| ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), Substitute("row-writer-$0", i), |
| &DoWriteTestRows, |
| leader_tserver, tablet_id_, MonoDelta::FromSeconds(10), |
| &rows_inserted, &finish, |
| &thread)); |
| threads.push_back(thread); |
| } |
| |
| LOG(INFO) << "Removing servers..."; |
| // Go from 3 tablet servers down to 1 in the configuration. |
| vector<int> remove_list = { 2, 1 }; |
| for (int to_remove_idx : remove_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas"; |
| |
| TServerDetails* tserver_to_remove = tservers[to_remove_idx]; |
| LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid(); |
| ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, |
| MonoDelta::FromSeconds(10))); |
| ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid())); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader_tserver, tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| } |
| |
| LOG(INFO) << "Adding servers..."; |
| // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration. |
| vector<int> add_list = { 1, 2 }; |
| for (int to_add_idx : add_list) { |
| int num_servers = active_tablet_servers.size(); |
| LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas"; |
| |
| TServerDetails* tserver_to_add = tservers[to_add_idx]; |
| LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, RaftPeerPB::VOTER, boost::none, |
| MonoDelta::FromSeconds(10))); |
| InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader_tserver, tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| } |
| |
| LOG(INFO) << "Joining writer threads..."; |
| finish.Store(true); |
| for (const scoped_refptr<Thread>& thread : threads) { |
| ASSERT_OK(ThreadJoiner(thread.get()).Join()); |
| } |
| |
| LOG(INFO) << "Waiting for replicas to agree..."; |
| // Wait for all servers to replicate everything up through the last write op. |
| // Since we don't batch, there should be at least # rows inserted log entries, |
| // plus the initial leader's no-op, plus 2 for the removed servers, plus 2 for |
| // the added servers for a total of 5. |
| int min_log_index = rows_inserted.Load() + 5; |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
| active_tablet_servers, tablet_id_, |
| min_log_index)); |
| |
| LOG(INFO) << "Number of rows inserted: " << rows_inserted.Load(); |
| ASSERT_ALL_REPLICAS_AGREE(rows_inserted.Load()); |
| } |
| |
| TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) { |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 2; |
| vector<string> ts_flags; |
| vector<string> master_flags = { "--master_add_server_when_underreplicated=false" }; |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| LOG(INFO) << "Finding tablet leader and waiting for things to start..."; |
| string tablet_id = tablet_replicas_.begin()->first; |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| for (itest::TabletReplicaMap::const_iterator iter = tablet_replicas_.find(tablet_id); |
| iter != tablet_replicas_.end(); ++iter) { |
| InsertOrDie(&active_tablet_servers, iter->second->uuid(), iter->second); |
| } |
| |
| // Determine the server to add to the config. |
| string uuid_to_add; |
| for (const TabletServerMap::value_type& entry : tablet_servers_) { |
| if (!ContainsKey(active_tablet_servers, entry.second->uuid())) { |
| uuid_to_add = entry.second->uuid(); |
| } |
| } |
| ASSERT_FALSE(uuid_to_add.empty()); |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::TabletLocationsPB tablet_locations; |
| bool has_leader; |
| NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, WAIT_FOR_LEADER, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString(); |
| |
| // Wait for initial NO_OP to be committed by the leader. |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id, timeout, &leader_ts)); |
| ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id, timeout)); |
| |
| // Change the config. |
| TServerDetails* tserver_to_add = tablet_servers_[uuid_to_add]; |
| LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(AddServer(leader_ts, tablet_id_, tserver_to_add, RaftPeerPB::VOTER, boost::none, |
| timeout)); |
| ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 2)); |
| |
| // Wait for the master to be notified of the config change. |
| // It should continue to have the same leader, even without waiting. |
| LOG(INFO) << "Waiting for Master to see config change..."; |
| NO_FATALS(WaitForReplicasReportedToMaster(3, tablet_id, timeout, NO_WAIT_FOR_LEADER, |
| &has_leader, &tablet_locations)); |
| ASSERT_TRUE(has_leader) << tablet_locations.DebugString(); |
| LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString(); |
| |
| // Change the config again. |
| LOG(INFO) << "Removing tserver with uuid " << tserver_to_add->uuid(); |
| ASSERT_OK(RemoveServer(leader_ts, tablet_id_, tserver_to_add, boost::none, timeout)); |
| active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_add->uuid())); |
| ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 3)); |
| |
| // Wait for the master to be notified of the removal. |
| LOG(INFO) << "Waiting for Master to see config change..."; |
| NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, NO_WAIT_FOR_LEADER, |
| &has_leader, &tablet_locations)); |
| ASSERT_TRUE(has_leader) << tablet_locations.DebugString(); |
| LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString(); |
| } |
| |
| // Test that even with memory pressure, a replica will still commit pending |
| // operations that the leader has committed. |
| TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) { |
| // Enough operations to put us over our memory limit (defined below). |
| const int kNumOps = 10000; |
| |
| // Set up a 3-node configuration with only one live follower so that we can |
| // manipulate it directly via RPC. |
| vector<string> ts_flags, master_flags; |
| |
| // If failure detection were on, a follower could be elected as leader after |
| // we kill the leader below. |
| ts_flags.push_back("--enable_leader_failure_detection=false"); |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| |
| // Very low memory limit to ease testing. |
| ts_flags.push_back("--memory_limit_hard_bytes=4194304"); |
| |
| // Don't let transaction memory tracking get in the way. |
| ts_flags.push_back("--tablet_transaction_memory_limit_mb=-1"); |
| |
| BuildAndStart(ts_flags, master_flags); |
| |
| // Elect server 2 as leader, then kill it and server 1, leaving behind |
| // server 0 as the sole follower. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| TServerDetails *replica_ts = tservers[0]; |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| // Pretend to be the leader and send a request to replicate some operations. |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_tablet_id(tablet_id_); |
| req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid()); |
| req.set_caller_term(1); |
| req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1)); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); |
| for (int i = 0; i < kNumOps; i++) { |
| AddOp(MakeOpId(1, 2 + i), &req); |
| } |
| OpId last_opid = MakeOpId(1, 2 + kNumOps - 1); |
| ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc)); |
| |
| // At the time that the follower received our request it was still under the |
| // tiny memory limit defined above, so the request should have succeeded. |
| ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
| ASSERT_TRUE(resp.has_status()); |
| ASSERT_TRUE(resp.status().has_last_committed_idx()); |
| ASSERT_EQ(last_opid.index(), resp.status().last_received().index()); |
| ASSERT_EQ(1, resp.status().last_committed_idx()); |
| |
| // But no operations have been applied yet; there should be no data. |
| vector<string> rows; |
| WaitForRowCount(replica_ts->tserver_proxy.get(), 0, &rows); |
| |
| // Try again, but this time: |
| // 1. Replicate just one new operation. |
| // 2. Tell the follower that the previous set of operations were committed. |
| req.mutable_preceding_id()->CopyFrom(last_opid); |
| req.mutable_committed_index()->CopyFrom(last_opid); |
| req.mutable_ops()->Clear(); |
| AddOp(MakeOpId(1, last_opid.index() + 1), &req); |
| rpc.Reset(); |
| Status s = replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc); |
| |
| // Our memory limit was truly tiny, so we should be over it by now... |
| ASSERT_TRUE(s.IsRemoteError()); |
| ASSERT_STR_CONTAINS(s.ToString(), "Soft memory limit exceeded"); |
| |
| // ...but despite rejecting the request, we should have committed the |
| // previous set of operations. That is, we should be able to see those rows. |
| WaitForRowCount(replica_ts->tserver_proxy.get(), kNumOps, &rows); |
| } |
| |
| // Test that we can create (vivify) a new tablet via tablet copy. |
| TEST_F(RaftConsensusITest, TestAutoCreateReplica) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 2; |
| vector<string> ts_flags, master_flags; |
| ts_flags.push_back("--enable_leader_failure_detection=false"); |
| ts_flags.push_back("--log_cache_size_limit_mb=1"); |
| ts_flags.push_back("--log_segment_size_mb=1"); |
| ts_flags.push_back("--log_async_preallocate_segments=false"); |
| ts_flags.push_back("--flush_threshold_mb=1"); |
| ts_flags.push_back("--maintenance_manager_polling_interval_ms=300"); |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| BuildAndStart(ts_flags, master_flags); |
| |
| // 50K is enough to cause flushes & log rolls. |
| int num_rows_to_write = 50000; |
| if (AllowSlowTests()) { |
| num_rows_to_write = 150000; |
| } |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| TabletServerMap active_tablet_servers; |
| TabletServerMap::const_iterator iter = tablet_replicas_.find(tablet_id_); |
| TServerDetails* leader = iter->second; |
| TServerDetails* follower = (++iter)->second; |
| InsertOrDie(&active_tablet_servers, leader->uuid(), leader); |
| InsertOrDie(&active_tablet_servers, follower->uuid(), follower); |
| |
| TServerDetails* new_node = nullptr; |
| for (TServerDetails* ts : tservers) { |
| if (!ContainsKey(active_tablet_servers, ts->uuid())) { |
| new_node = ts; |
| break; |
| } |
| } |
| ASSERT_TRUE(new_node != nullptr); |
| |
| // Elect the leader (still only a consensus config size of 2). |
| ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, |
| tablet_id_, 1)); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_num_write_threads(10); |
| workload.set_write_batch_size(100); |
| workload.Setup(); |
| |
| LOG(INFO) << "Starting write workload..."; |
| workload.Start(); |
| |
| while (true) { |
| int rows_inserted = workload.rows_inserted(); |
| if (rows_inserted >= num_rows_to_write) { |
| break; |
| } |
| LOG(INFO) << "Only inserted " << rows_inserted << " rows so far, sleeping for 100ms"; |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| |
| LOG(INFO) << "Adding tserver with uuid " << new_node->uuid() << " as VOTER..."; |
| ASSERT_OK(AddServer(leader, tablet_id_, new_node, RaftPeerPB::VOTER, boost::none, |
| MonoDelta::FromSeconds(10))); |
| InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader, tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| |
| workload.StopAndJoin(); |
| int num_batches = workload.batches_completed(); |
| |
| LOG(INFO) << "Waiting for replicas to agree..."; |
| // Wait for all servers to replicate everything up through the last write op. |
| // Since we don't batch, there should be at least # rows inserted log entries, |
| // plus the initial leader's no-op, plus 1 for |
| // the added replica for a total == #rows + 2. |
| int min_log_index = num_batches + 2; |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(120), |
| active_tablet_servers, tablet_id_, |
| min_log_index)); |
| |
| int rows_inserted = workload.rows_inserted(); |
| LOG(INFO) << "Number of rows inserted: " << rows_inserted; |
| ASSERT_ALL_REPLICAS_AGREE(rows_inserted); |
| } |
| |
| TEST_F(RaftConsensusITest, TestMemoryRemainsConstantDespiteTwoDeadFollowers) { |
| const int64_t kMinRejections = 100; |
| const MonoDelta kMaxWaitTime = MonoDelta::FromSeconds(60); |
| |
| // Start the cluster with a low per-tablet transaction memory limit, so that |
| // the test can complete faster. |
| vector<string> flags; |
| flags.push_back("--tablet_transaction_memory_limit_mb=2"); |
| BuildAndStart(flags); |
| |
| // Kill both followers. |
| TServerDetails* details; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &details)); |
| int num_shutdown = 0; |
| int leader_ts_idx = -1; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| if (ts->instance_id().permanent_uuid() != details->uuid()) { |
| ts->Shutdown(); |
| num_shutdown++; |
| } else { |
| leader_ts_idx = i; |
| } |
| } |
| ASSERT_EQ(2, num_shutdown); |
| ASSERT_NE(-1, leader_ts_idx); |
| |
| // Because the majority of the cluster is dead and because of this workload's |
| // timeout behavior, more and more wedged transactions will accumulate in the |
| // leader. To prevent memory usage from skyrocketing, the leader will |
| // eventually reject new transactions. That's what we're testing for here. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(true); |
| workload.set_write_timeout_millis(50); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Run until the leader has rejected several transactions. |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(kMaxWaitTime); |
| while (true) { |
| int64_t num_rejections = 0; |
| ASSERT_OK(cluster_->tablet_server(leader_ts_idx)->GetInt64Metric( |
| &METRIC_ENTITY_tablet, |
| nullptr, |
| &METRIC_transaction_memory_pressure_rejections, |
| "value", |
| &num_rejections)); |
| if (num_rejections >= kMinRejections) { |
| break; |
| } else if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) { |
| FAIL() << "Ran for " << kMaxWaitTime.ToString() << ", deadline expired"; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(200)); |
| } |
| } |
| |
| static void EnableLogLatency(server::GenericServiceProxy* proxy) { |
| typedef unordered_map<string, string> FlagMap; |
| FlagMap flags; |
| InsertOrDie(&flags, "log_inject_latency", "true"); |
| InsertOrDie(&flags, "log_inject_latency_ms_mean", "1000"); |
| for (const FlagMap::value_type& e : flags) { |
| SetFlagRequestPB req; |
| SetFlagResponsePB resp; |
| RpcController rpc; |
| req.set_flag(e.first); |
| req.set_value(e.second); |
| ASSERT_OK(proxy->SetFlag(req, &resp, &rpc)); |
| } |
| } |
| |
| // Run a regular workload with a leader that's writing to its WAL slowly. |
| TEST_F(RaftConsensusITest, TestSlowLeader) { |
| if (!AllowSlowTests()) return; |
| BuildAndStart(vector<string>()); |
| |
| TServerDetails* leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| NO_FATALS(EnableLogLatency(leader->generic_proxy.get())); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.Setup(); |
| workload.Start(); |
| SleepFor(MonoDelta::FromSeconds(60)); |
| } |
| |
| void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size) { |
| shared_ptr<KuduTable> table; |
| CHECK_OK(client_->OpenTable(kTableId, &table)); |
| shared_ptr<KuduSession> session = client_->NewSession(); |
| session->SetTimeoutMillis(100); |
| CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| string payload(payload_size, 'x'); |
| for (int i = 0; i < num_rows; i++) { |
| gscoped_ptr<KuduInsert> insert(table->NewInsert()); |
| KuduPartialRow* row = insert->mutable_row(); |
| CHECK_OK(row->SetInt32(0, i + start_row)); |
| CHECK_OK(row->SetInt32(1, 0)); |
| CHECK_OK(row->SetStringCopy(2, payload)); |
| CHECK_OK(session->Apply(insert.release())); |
| ignore_result(session->Flush()); |
| } |
| } |
| |
| // Regression test for KUDU-1469, a case in which a leader and follower could get "stuck" |
| // in a tight RPC loop, in which the leader would repeatedly send a batch of ops that the |
| // follower already had, the follower would fully de-dupe them, and yet the leader would |
| // never advance to the next batch. |
| // |
| // The 'perfect storm' reproduced here consists of: |
| // - the commit index has fallen far behind due to a slow log on the leader |
| // and one of the three replicas being inaccessible |
| // - the other replica elects itself |
| // - before the old leader notices it has been ousted, it writes at least one more |
| // operation to its local log. |
| // - before the replica can replicate anything to the old leader, it receives |
| // more writes, such that the first batch's preceding_op_id is ahead of |
| // the old leader's last written |
| // |
| // See the detailed comments below for more details. |
| TEST_F(RaftConsensusITest, TestCommitIndexFarBehindAfterLeaderElection) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| |
| if (!AllowSlowTests()) return; |
| |
| // Set the batch size low so that, after the new leader takes |
| // over below, the ops required to catch up from the committed index |
| // to the newly replicated index don't fit into a single batch. |
| BuildAndStart({"--consensus_max_batch_size_bytes=50000"}); |
| |
| // Get the leader and the two replica tablet servers. |
| // These will have the following roles in this test: |
| // 1) 'first_leader_ts' is the initial leader. |
| // 2) 'second_leader_ts' will be forced to be elected as the second leader |
| // 3) 'only_vote_ts' will simulate a heavily overloaded (or corrupted) TS |
| // which is far enough behind (or failed) such that it only participates |
| // by voting. |
| TServerDetails* leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| ExternalTabletServer* first_leader_ts = cluster_->tablet_server_by_uuid(leader->uuid()); |
| ExternalTabletServer* second_leader_ts = nullptr; |
| ExternalTabletServer* only_vote_ts = nullptr; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| if (ts->instance_id().permanent_uuid() != leader->uuid()) { |
| if (second_leader_ts == nullptr) { |
| second_leader_ts = ts; |
| } else { |
| only_vote_ts = ts; |
| } |
| } |
| } |
| |
| // The 'only_vote' tablet server doesn't participate in replication. |
| ASSERT_OK(cluster_->SetFlag(only_vote_ts, "follower_reject_update_consensus_requests", "true")); |
| |
| // Inject a long delay in the log of the first leader, and write 10 operations. |
| // This delay ensures that it will replicate them to both itself and its follower, |
| // but due to its log sync not completing, it won't know that it is safe to advance its |
| // commit index until long after it has lost its leadership. |
| ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency_ms_mean", "6000")); |
| ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency", "true")); |
| InsertPayloadIgnoreErrors(0, 10, 10000); |
| |
| // Write one more operation to the leader, but disable consensus on the follower so that |
| // it doesn't get replicated. |
| ASSERT_OK(cluster_->SetFlag( |
| second_leader_ts, "follower_reject_update_consensus_requests", "true")); |
| InsertPayloadIgnoreErrors(10, 1, 10000); |
| |
| // Pause the initial leader and wait for the replica to elect itself. The third TS participates |
| // here by voting. |
| first_leader_ts->Pause(); |
| ASSERT_OK(WaitUntilLeader(tablet_servers_[second_leader_ts->uuid()], tablet_id_, kTimeout)); |
| |
| // The voter TS has done its duty. Shut it down to avoid log spam where it tries to run |
| // elections. |
| only_vote_ts->Shutdown(); |
| |
| // Perform one insert on the new leader. The new leader has not yet replicated its NO_OP to |
| // the old leader, since the old leader is still paused. |
| NO_FATALS(CreateClient(&client_)); |
| InsertPayloadIgnoreErrors(13, 1, 10000); |
| |
| // Now we expect to have the following logs: |
| // |
| // first_leader_ts second_leader_ts |
| // ------------------- ------------ |
| // 1.1 NO_OP 1.1 NO_OP |
| // 1.2 WRITE_OP 1.2 WRITE_OP |
| // ................................ |
| // 1.11 WRITE_OP 1.11 WRITE_OP |
| // 1.12 WRITE_OP 2.12 NO_OP |
| // 2.13 WRITE_OP |
| // |
| // Both servers should have a committed_idx of 1.1 since the log was delayed. |
| |
| // Now, when we resume the original leader, we expect them to recover properly. |
| // Previously this triggered KUDU-1469. |
| first_leader_ts->Resume(); |
| |
| TabletServerMap active_tservers = tablet_servers_; |
| active_tservers.erase(only_vote_ts->uuid()); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), |
| active_tservers, |
| tablet_id_, 13)); |
| } |
| |
| // Run a regular workload with one follower that's writing to its WAL slowly. |
| TEST_F(RaftConsensusITest, TestSlowFollower) { |
| if (!AllowSlowTests()) return; |
| BuildAndStart(vector<string>()); |
| |
| TServerDetails* leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
| int num_reconfigured = 0; |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| if (ts->instance_id().permanent_uuid() != leader->uuid()) { |
| TServerDetails* follower; |
| follower = GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid()); |
| ASSERT_TRUE(follower); |
| NO_FATALS(EnableLogLatency(follower->generic_proxy.get())); |
| num_reconfigured++; |
| break; |
| } |
| } |
| ASSERT_EQ(1, num_reconfigured); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.Setup(); |
| workload.Start(); |
| SleepFor(MonoDelta::FromSeconds(60)); |
| } |
| |
| // Run a special workload that constantly updates a single row on a cluster |
| // where every replica is writing to its WAL slowly. |
| TEST_F(RaftConsensusITest, TestHammerOneRow) { |
| if (!AllowSlowTests()) return; |
| BuildAndStart(vector<string>()); |
| |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ExternalTabletServer* ts = cluster_->tablet_server(i); |
| TServerDetails* follower; |
| follower = GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid()); |
| ASSERT_TRUE(follower); |
| NO_FATALS(EnableLogLatency(follower->generic_proxy.get())); |
| } |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_write_pattern(TestWorkload::UPDATE_ONE_ROW); |
| workload.set_num_write_threads(20); |
| workload.Setup(); |
| workload.Start(); |
| SleepFor(MonoDelta::FromSeconds(60)); |
| } |
| |
| // Test that followers that fall behind the leader's log GC threshold are |
| // evicted from the config. |
| TEST_F(RaftConsensusITest, TestEvictAbandonedFollowers) { |
| vector<string> ts_flags; |
| AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC(). |
| vector<string> master_flags = { "--master_add_server_when_underreplicated=false" }; |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| TabletServerMap active_tablet_servers = tablet_servers_; |
| ASSERT_EQ(3, active_tablet_servers.size()); |
| |
| string leader_uuid; |
| int64_t orig_term; |
| string follower_uuid; |
| NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid)); |
| |
| // Wait for the abandoned follower to be evicted. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, tablet_servers_[leader_uuid], |
| tablet_id_, timeout)); |
| ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid)); |
| ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 2)); |
| } |
| |
| // Test that followers that fall behind the leader's log GC threshold are |
| // evicted from the config. |
| TEST_F(RaftConsensusITest, TestMasterReplacesEvictedFollowers) { |
| vector<string> extra_flags; |
| AddFlagsForLogRolls(&extra_flags); // For CauseFollowerToFallBehindLogGC(). |
| BuildAndStart(extra_flags); |
| |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| |
| string leader_uuid; |
| int64_t orig_term; |
| string follower_uuid; |
| NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid)); |
| |
| // The follower will be evicted. Now wait for the master to cause it to be |
| // copied. |
| ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 2)); |
| |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::AT_LEAST, 1)); |
| } |
| |
| // Test that a ChangeConfig() request is rejected unless the leader has |
| // replicated one of its own log entries during the current term. |
| // This is required for correctness of Raft config change. For details, |
| // see https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E |
| TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) { |
| vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
| vector<string> master_flags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" }; |
| BuildAndStart(ts_flags, master_flags); |
| |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| |
| int kLeaderIndex = 0; |
| TServerDetails* leader_ts = tablet_servers_[cluster_->tablet_server(kLeaderIndex)->uuid()]; |
| |
| // Prevent followers from accepting UpdateConsensus requests from the leader, |
| // even though they will vote. This will allow us to get the distributed |
| // system into a state where there is a valid leader (based on winning an |
| // election) but that leader will be unable to commit any entries from its |
| // own term, making it illegal to accept ChangeConfig() requests. |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), |
| "follower_reject_update_consensus_requests", "true")); |
| } |
| |
| // Elect the leader. |
| ASSERT_OK(StartElection(leader_ts, tablet_id_, timeout)); |
| ASSERT_OK(WaitUntilLeader(leader_ts, tablet_id_, timeout)); |
| |
| // Now attempt to do a config change. It should be rejected because there |
| // have not been any ops (notably the initial NO_OP) from the leader's term |
| // that have been committed yet. |
| Status s = itest::RemoveServer(leader_ts, tablet_id_, |
| tablet_servers_[cluster_->tablet_server(1)->uuid()], |
| boost::none, timeout); |
| ASSERT_TRUE(!s.ok()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Latest committed op is not from this term"); |
| } |
| |
| // Test that if for some reason none of the transactions can be prepared, that it will come |
| // back as an error in UpdateConsensus(). |
| TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) { |
| const int kNumOps = 10; |
| |
| vector<string> ts_flags, master_flags; |
| ts_flags.push_back("--enable_leader_failure_detection=false"); |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| BuildAndStart(ts_flags, master_flags); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| |
| // Shutdown the other servers so they don't get chatty. |
| cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
| |
| // Configure the first server to fail all on prepare. |
| TServerDetails *replica_ts = tservers[0]; |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0), |
| "follower_fail_all_prepare", "true")); |
| |
| // Pretend to be the leader and send a request that should return an error. |
| ConsensusRequestPB req; |
| ConsensusResponsePB resp; |
| RpcController rpc; |
| req.set_dest_uuid(replica_ts->uuid()); |
| req.set_tablet_id(tablet_id_); |
| req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid()); |
| req.set_caller_term(0); |
| req.mutable_committed_index()->CopyFrom(MakeOpId(0, 0)); |
| req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0)); |
| for (int i = 0; i < kNumOps; i++) { |
| AddOp(MakeOpId(0, 1 + i), &req); |
| } |
| |
| ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc)); |
| LOG(INFO) << resp.ShortDebugString(); |
| ASSERT_TRUE(resp.status().has_error()); |
| ASSERT_EQ(consensus::ConsensusErrorPB::CANNOT_PREPARE, resp.status().error().code()); |
| ASSERT_STR_CONTAINS(resp.ShortDebugString(), "Could not prepare a single transaction"); |
| } |
| |
| // Test that, if the raft metadata on a replica is corrupt, then the server |
| // doesn't crash, but instead just marks the tablet as corrupt. |
| TEST_F(RaftConsensusITest, TestCorruptReplicaMetadata) { |
| // Start cluster and wait until we have a stable leader. |
| BuildAndStart({}, {}); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
| tablet_id_, 1)); |
| |
| |
| // Shut down one of the tablet servers, and then muck |
| // with its consensus metadata to corrupt it. |
| auto* ts = cluster_->tablet_server(0); |
| ts->Shutdown(); |
| consensus::ConsensusMetadataPB cmeta_pb; |
| ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(0, tablet_id_, &cmeta_pb)); |
| cmeta_pb.set_current_term(cmeta_pb.current_term() - 1); |
| ASSERT_OK(inspect_->WriteConsensusMetadataOnTS(0, tablet_id_, cmeta_pb)); |
| |
| ASSERT_OK(ts->Restart()); |
| |
| // The server should come up with a 'FAILED' status because of the corrupt |
| // metadata. |
| ASSERT_OK(WaitUntilTabletInState(tablet_servers_[ts->uuid()], |
| tablet_id_, |
| tablet::FAILED, |
| MonoDelta::FromSeconds(30))); |
| |
| // Currently, the tablet server does not automatically delete FAILED replicas. |
| // So, manually delete the bad replica in order to recover. |
| ASSERT_OK(itest::DeleteTablet(tablet_servers_[ts->uuid()], tablet_id_, |
| tablet::TABLET_DATA_TOMBSTONED, boost::none, |
| MonoDelta::FromSeconds(30))); |
| |
| // A new good copy should get created. |
| ASSERT_OK(WaitUntilTabletInState(tablet_servers_[ts->uuid()], |
| tablet_id_, |
| tablet::RUNNING, |
| MonoDelta::FromSeconds(30))); |
| } |
| |
| // Test that an IOError when writing to the write-ahead log is a fatal error. |
| // First, we test that failed replicates are fatal. Then, we test that failed |
| // commits are fatal. |
| TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) { |
| FLAGS_num_replicas = 3; |
| FLAGS_num_tablet_servers = 3; |
| vector<string> ts_flags, master_flags; |
| ts_flags = {"--enable_leader_failure_detection=false", |
| // Disable core dumps since we will inject FATAL errors, and dumping |
| // core can take a long time. |
| "--disable_core_dumps"}; |
| master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| vector<ExternalTabletServer*> ext_tservers; |
| for (auto* details : tservers) { |
| ext_tservers.push_back(cluster_->tablet_server_by_uuid(details->uuid())); |
| } |
| |
| // Test failed replicates. |
| |
| // Elect server 2 as leader and wait for log index 1 to propagate to all servers. |
| ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| // Inject an IOError the next time servers 1 and 2 write to their WAL. |
| // Then, cause server 0 to start and win a leader election. |
| // This will cause servers 0 and 1 to crash. |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(cluster_->SetFlag(ext_tservers[i], |
| "log_inject_io_error_on_append_fraction", "1.0")); |
| } |
| ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(ext_tservers[i]->WaitForCrash(MonoDelta::FromSeconds(10))); |
| } |
| |
| // Now we know followers crash when they write to their log. |
| // Let's verify the same for the leader (server 0). |
| ASSERT_OK(cluster_->SetFlag(ext_tservers[0], |
| "log_inject_io_error_on_append_fraction", "1.0")); |
| |
| // Attempt to write to the leader, but with a short timeout. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(true); |
| workload.set_write_timeout_millis(100); |
| workload.set_num_write_threads(1); |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Leader should crash as well. |
| ASSERT_OK(ext_tservers[0]->WaitForCrash(MonoDelta::FromSeconds(10))); |
| workload.StopAndJoin(); |
| |
| LOG(INFO) << "Everything crashed!"; |
| |
| // Test failed commits. |
| |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| NO_FATALS(WaitForTSAndReplicas()); |
| tservers.clear(); |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(3, tservers.size()); |
| |
| // Elect server 0 as leader, wait until writes are going through. |
| ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
| workload.Start(); |
| int64_t prev_inserted = workload.rows_inserted(); |
| while (workload.rows_inserted() == prev_inserted) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
| |
| // Now shutdown servers 1 and 2 so that writes cannot commit. Write to the |
| // leader, set flags so that commits crash the server, then bring the |
| // followers back up. |
| for (int i = 1; i <= 2; i++) { |
| ext_tservers[i]->Shutdown(); |
| } |
| |
| OpId prev_opid, cur_opid; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, tservers[0], consensus::RECEIVED_OPID, |
| MonoDelta::FromSeconds(10), &prev_opid)); |
| VLOG(1) << "Previous OpId on server 0: " << OpIdToString(prev_opid); |
| workload.Start(); |
| // Wait until we've got (uncommitted) entries into the leader's log. |
| do { |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, tservers[0], consensus::RECEIVED_OPID, |
| MonoDelta::FromSeconds(10), &cur_opid)); |
| VLOG(1) << "Current OpId on server 0: " << OpIdToString(cur_opid); |
| } while (consensus::OpIdEquals(prev_opid, cur_opid)); |
| workload.StopAndJoin(); |
| ASSERT_OK(cluster_->SetFlag(ext_tservers[0], |
| "log_inject_io_error_on_append_fraction", "1.0")); |
| for (int i = 1; i <= 2; i++) { |
| ASSERT_OK(ext_tservers[i]->Restart()); |
| } |
| // Leader will crash. |
| ASSERT_OK(ext_tservers[0]->WaitForCrash(MonoDelta::FromSeconds(10))); |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |
| |