| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/schema.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/consensus/consensus_peers.h" |
| #include "kudu/consensus/consensus-test-util.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/peer_manager.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/server/logical_clock.h" |
| #include "kudu/util/async_util.h" |
| #include "kudu/util/mem_tracker.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(enable_leader_failure_detection); |
| |
| METRIC_DECLARE_entity(tablet); |
| |
| using std::shared_ptr; |
| using std::string; |
| |
| namespace kudu { |
| namespace consensus { |
| |
| using log::Log; |
| using log::LogOptions; |
| using ::testing::_; |
| using ::testing::AnyNumber; |
| using ::testing::AtLeast; |
| using ::testing::Eq; |
| using ::testing::InSequence; |
| using ::testing::Invoke; |
| using ::testing::Mock; |
| using ::testing::Property; |
| using ::testing::Return; |
| |
| const char* kTestTablet = "TestTablet"; |
| const char* kLocalPeerUuid = "peer-0"; |
| |
| // A simple map to collect the results of a sequence of transactions. |
| typedef std::map<OpId, Status, OpIdCompareFunctor> StatusesMap; |
| |
| class MockQueue : public PeerMessageQueue { |
| public: |
| explicit MockQueue(const scoped_refptr<MetricEntity>& metric_entity, log::Log* log) |
| : PeerMessageQueue(metric_entity, log, FakeRaftPeerPB(kLocalPeerUuid), kTestTablet) {} |
| MOCK_METHOD1(Init, void(const OpId& locally_replicated_index)); |
| MOCK_METHOD3(SetLeaderMode, void(const OpId& committed_opid, |
| int64_t current_term, |
| const RaftConfigPB& active_config)); |
| MOCK_METHOD0(SetNonLeaderMode, void()); |
| virtual Status AppendOperations(const vector<ReplicateRefPtr>& msgs, |
| const StatusCallback& callback) OVERRIDE { |
| return AppendOperationsMock(msgs, callback); |
| } |
| MOCK_METHOD2(AppendOperationsMock, Status(const vector<ReplicateRefPtr>& msgs, |
| const StatusCallback& callback)); |
| MOCK_METHOD1(TrackPeer, void(const string&)); |
| MOCK_METHOD1(UntrackPeer, void(const string&)); |
| MOCK_METHOD4(RequestForPeer, Status(const std::string& uuid, |
| ConsensusRequestPB* request, |
| std::vector<ReplicateRefPtr>* msg_refs, |
| bool* needs_remote_bootstrap)); |
| MOCK_METHOD3(ResponseFromPeer, void(const std::string& peer_uuid, |
| const ConsensusResponsePB& response, |
| bool* more_pending)); |
| MOCK_METHOD0(Close, void()); |
| }; |
| |
| class MockPeerManager : public PeerManager { |
| public: |
| MockPeerManager() : PeerManager("", "", nullptr, nullptr, nullptr, nullptr) {} |
| MOCK_METHOD1(UpdateRaftConfig, Status(const consensus::RaftConfigPB& config)); |
| MOCK_METHOD1(SignalRequest, void(bool force_if_queue_empty)); |
| MOCK_METHOD0(Close, void()); |
| }; |
| |
| class RaftConsensusSpy : public RaftConsensus { |
| public: |
| typedef Callback<Status(const scoped_refptr<ConsensusRound>& round)> AppendCallback; |
| |
| RaftConsensusSpy(const ConsensusOptions& options, |
| gscoped_ptr<ConsensusMetadata> cmeta, |
| gscoped_ptr<PeerProxyFactory> proxy_factory, |
| gscoped_ptr<PeerMessageQueue> queue, |
| gscoped_ptr<PeerManager> peer_manager, |
| gscoped_ptr<ThreadPool> thread_pool, |
| const scoped_refptr<MetricEntity>& metric_entity, |
| const std::string& peer_uuid, |
| const scoped_refptr<server::Clock>& clock, |
| ReplicaTransactionFactory* txn_factory, |
| const scoped_refptr<log::Log>& log, |
| const shared_ptr<MemTracker>& parent_mem_tracker, |
| const Callback<void(const std::string& reason)>& mark_dirty_clbk) |
| : RaftConsensus(options, |
| std::move(cmeta), |
| std::move(proxy_factory), |
| std::move(queue), |
| std::move(peer_manager), |
| std::move(thread_pool), |
| metric_entity, |
| peer_uuid, |
| clock, |
| txn_factory, |
| log, |
| parent_mem_tracker, |
| mark_dirty_clbk) { |
| // These "aliases" allow us to count invocations and assert on them. |
| ON_CALL(*this, StartConsensusOnlyRoundUnlocked(_)) |
| .WillByDefault(Invoke(this, |
| &RaftConsensusSpy::StartNonLeaderConsensusRoundUnlockedConcrete)); |
| ON_CALL(*this, NonTxRoundReplicationFinished(_, _, _)) |
| .WillByDefault(Invoke(this, &RaftConsensusSpy::NonTxRoundReplicationFinishedConcrete)); |
| } |
| |
| MOCK_METHOD1(AppendNewRoundToQueueUnlocked, Status(const scoped_refptr<ConsensusRound>& round)); |
| Status AppendNewRoundToQueueUnlockedConcrete(const scoped_refptr<ConsensusRound>& round) { |
| return RaftConsensus::AppendNewRoundToQueueUnlocked(round); |
| } |
| |
| MOCK_METHOD1(StartConsensusOnlyRoundUnlocked, Status(const ReplicateRefPtr& msg)); |
| Status StartNonLeaderConsensusRoundUnlockedConcrete(const ReplicateRefPtr& msg) { |
| return RaftConsensus::StartConsensusOnlyRoundUnlocked(msg); |
| } |
| |
| MOCK_METHOD3(NonTxRoundReplicationFinished, void(ConsensusRound* round, |
| const StatusCallback& client_cb, |
| const Status& status)); |
| void NonTxRoundReplicationFinishedConcrete(ConsensusRound* round, |
| const StatusCallback& client_cb, |
| const Status& status) { |
| LOG(INFO) << "Committing round with opid " << round->id() |
| << " given Status " << status.ToString(); |
| RaftConsensus::NonTxRoundReplicationFinished(round, client_cb, status); |
| } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(RaftConsensusSpy); |
| }; |
| |
| void DoNothing(const string& s) { |
| } |
| |
| class RaftConsensusTest : public KuduTest { |
| public: |
| RaftConsensusTest() |
| : clock_(server::LogicalClock::CreateStartingAt(Timestamp(0))), |
| metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-consensus-test")), |
| schema_(GetSimpleTestSchema()) { |
| FLAGS_enable_leader_failure_detection = false; |
| options_.tablet_id = kTestTablet; |
| } |
| |
| virtual void SetUp() OVERRIDE { |
| LogOptions options; |
| string test_path = GetTestPath("test-peer-root"); |
| |
| // TODO mock the Log too, since we're gonna mock the queue |
| // monitors and pretty much everything else. |
| fs_manager_.reset(new FsManager(env_.get(), test_path)); |
| CHECK_OK(fs_manager_->CreateInitialFileSystemLayout()); |
| CHECK_OK(fs_manager_->Open()); |
| CHECK_OK(Log::Open(LogOptions(), |
| fs_manager_.get(), |
| kTestTablet, |
| schema_, |
| 0, // schema_version |
| NULL, |
| &log_)); |
| |
| queue_ = new MockQueue(metric_entity_, log_.get()); |
| peer_manager_ = new MockPeerManager; |
| txn_factory_.reset(new MockTransactionFactory); |
| |
| ON_CALL(*queue_, AppendOperationsMock(_, _)) |
| .WillByDefault(Invoke(this, &RaftConsensusTest::AppendToLog)); |
| } |
| |
| void SetUpConsensus(int64_t initial_term = consensus::kMinimumTerm, int num_peers = 1) { |
| config_ = BuildRaftConfigPBForTests(num_peers); |
| config_.set_opid_index(kInvalidOpIdIndex); |
| |
| gscoped_ptr<PeerProxyFactory> proxy_factory(new LocalTestPeerProxyFactory(nullptr)); |
| |
| string peer_uuid = config_.peers(num_peers - 1).permanent_uuid(); |
| |
| gscoped_ptr<ConsensusMetadata> cmeta; |
| CHECK_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid, |
| config_, initial_term, &cmeta)); |
| |
| gscoped_ptr<ThreadPool> thread_pool; |
| CHECK_OK(ThreadPoolBuilder("raft-pool") .Build(&thread_pool)); |
| |
| consensus_.reset(new RaftConsensusSpy(options_, |
| std::move(cmeta), |
| std::move(proxy_factory), |
| gscoped_ptr<PeerMessageQueue>(queue_), |
| gscoped_ptr<PeerManager>(peer_manager_), |
| std::move(thread_pool), |
| metric_entity_, |
| peer_uuid, |
| clock_, |
| txn_factory_.get(), |
| log_.get(), |
| MemTracker::GetRootTracker(), |
| Bind(&DoNothing))); |
| |
| ON_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
| .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRound)); |
| } |
| |
| Status AppendToLog(const vector<ReplicateRefPtr>& msgs, |
| const StatusCallback& callback) { |
| return log_->AsyncAppendReplicates(msgs, |
| Bind(LogAppendCallback, callback)); |
| } |
| |
| static void LogAppendCallback(const StatusCallback& callback, |
| const Status& s) { |
| CHECK_OK(s); |
| callback.Run(s); |
| } |
| |
| Status MockAppendNewRound(const scoped_refptr<ConsensusRound>& round) { |
| rounds_.push_back(round); |
| RETURN_NOT_OK(consensus_->AppendNewRoundToQueueUnlockedConcrete(round)); |
| LOG(INFO) << "Round append: " << round->id() << ", ReplicateMsg: " |
| << round->replicate_msg()->ShortDebugString(); |
| return Status::OK(); |
| } |
| |
| void SetUpGeneralExpectations() { |
| EXPECT_CALL(*peer_manager_, SignalRequest(_)) |
| .Times(AnyNumber()); |
| EXPECT_CALL(*peer_manager_, Close()) |
| .Times(AtLeast(1)); |
| EXPECT_CALL(*queue_, Close()) |
| .Times(1); |
| EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
| .Times(AnyNumber()); |
| } |
| |
| // Create a ConsensusRequestPB suitable to send to a peer. |
| ConsensusRequestPB MakeConsensusRequest(int64_t caller_term, |
| const string& caller_uuid, |
| const OpId& preceding_opid); |
| |
| // Add a single no-op with the given OpId to a ConsensusRequestPB. |
| void AddNoOpToConsensusRequest(ConsensusRequestPB* request, const OpId& noop_opid); |
| |
| scoped_refptr<ConsensusRound> AppendNoOpRound() { |
| ReplicateRefPtr replicate_ptr(make_scoped_refptr_replicate(new ReplicateMsg)); |
| replicate_ptr->get()->set_op_type(NO_OP); |
| replicate_ptr->get()->set_timestamp(clock_->Now().ToUint64()); |
| scoped_refptr<ConsensusRound> round(new ConsensusRound(consensus_.get(), replicate_ptr)); |
| round->SetConsensusReplicatedCallback( |
| Bind(&RaftConsensusSpy::NonTxRoundReplicationFinished, |
| Unretained(consensus_.get()), Unretained(round.get()), Bind(&DoNothingStatusCB))); |
| |
| CHECK_OK(consensus_->Replicate(round)); |
| LOG(INFO) << "Appended NO_OP round with opid " << round->id(); |
| return round; |
| } |
| |
| void DumpRounds() { |
| LOG(INFO) << "Dumping rounds..."; |
| for (const scoped_refptr<ConsensusRound>& round : rounds_) { |
| LOG(INFO) << "Round: OpId " << round->id() << ", ReplicateMsg: " |
| << round->replicate_msg()->ShortDebugString(); |
| } |
| } |
| |
| protected: |
| ConsensusOptions options_; |
| RaftConfigPB config_; |
| OpId initial_id_; |
| gscoped_ptr<FsManager> fs_manager_; |
| scoped_refptr<Log> log_; |
| gscoped_ptr<PeerProxyFactory> proxy_factory_; |
| scoped_refptr<server::Clock> clock_; |
| MetricRegistry metric_registry_; |
| scoped_refptr<MetricEntity> metric_entity_; |
| const Schema schema_; |
| scoped_refptr<RaftConsensusSpy> consensus_; |
| |
| vector<scoped_refptr<ConsensusRound> > rounds_; |
| |
| // Mocks. |
| // NOTE: both 'queue_' and 'peer_manager_' belong to 'consensus_' and may be deleted before |
| // the test is. |
| MockQueue* queue_; |
| MockPeerManager* peer_manager_; |
| gscoped_ptr<MockTransactionFactory> txn_factory_; |
| }; |
| |
| ConsensusRequestPB RaftConsensusTest::MakeConsensusRequest(int64_t caller_term, |
| const string& caller_uuid, |
| const OpId& preceding_opid) { |
| ConsensusRequestPB request; |
| request.set_caller_term(caller_term); |
| request.set_caller_uuid(caller_uuid); |
| request.set_tablet_id(kTestTablet); |
| *request.mutable_preceding_id() = preceding_opid; |
| return request; |
| } |
| |
| void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request, |
| const OpId& noop_opid) { |
| ReplicateMsg* noop_msg = request->add_ops(); |
| *noop_msg->mutable_id() = noop_opid; |
| noop_msg->set_op_type(NO_OP); |
| noop_msg->set_timestamp(clock_->Now().ToUint64()); |
| noop_msg->mutable_noop_request(); |
| } |
| |
| // Tests that the committed index moves along with the majority replicated |
| // index when the terms are the same. |
| TEST_F(RaftConsensusTest, TestCommittedIndexWhenInSameTerm) { |
| SetUpConsensus(); |
| SetUpGeneralExpectations(); |
| EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
| .Times(1) |
| .WillOnce(Return(Status::OK())); |
| EXPECT_CALL(*queue_, Init(_)) |
| .Times(1); |
| EXPECT_CALL(*queue_, SetLeaderMode(_, _, _)) |
| .Times(1); |
| EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
| .Times(11); |
| EXPECT_CALL(*queue_, AppendOperationsMock(_, _)) |
| .Times(11).WillRepeatedly(Return(Status::OK())); |
| |
| ConsensusBootstrapInfo info; |
| ASSERT_OK(consensus_->Start(info)); |
| ASSERT_OK(consensus_->EmulateElection()); |
| |
| // Commit the first noop round, created on EmulateElection(); |
| OpId committed_index; |
| ASSERT_FALSE(rounds_.empty()) << "rounds_ is empty!"; |
| consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index); |
| |
| ASSERT_OPID_EQ(rounds_[0]->id(), committed_index); |
| |
| // Append 10 rounds |
| for (int i = 0; i < 10; i++) { |
| scoped_refptr<ConsensusRound> round = AppendNoOpRound(); |
| // queue reports majority replicated index in the leader's term |
| // committed index should move accordingly. |
| consensus_->UpdateMajorityReplicated(round->id(), &committed_index); |
| ASSERT_OPID_EQ(round->id(), committed_index); |
| } |
| } |
| |
| // Tests that, when terms change, the commit index only advances when the majority |
| // replicated index is in the current term. |
| TEST_F(RaftConsensusTest, TestCommittedIndexWhenTermsChange) { |
| SetUpConsensus(); |
| SetUpGeneralExpectations(); |
| EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
| .Times(2) |
| .WillRepeatedly(Return(Status::OK())); |
| EXPECT_CALL(*queue_, Init(_)) |
| .Times(1); |
| EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
| .Times(3); |
| EXPECT_CALL(*queue_, AppendOperationsMock(_, _)) |
| .Times(3).WillRepeatedly(Return(Status::OK()));; |
| |
| ConsensusBootstrapInfo info; |
| ASSERT_OK(consensus_->Start(info)); |
| ASSERT_OK(consensus_->EmulateElection()); |
| |
| OpId committed_index; |
| consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index); |
| ASSERT_OPID_EQ(rounds_[0]->id(), committed_index); |
| |
| // Append another round in the current term (besides the original config round). |
| scoped_refptr<ConsensusRound> round = AppendNoOpRound(); |
| |
| // Now emulate an election, the same guy will be leader but the term |
| // will change. |
| ASSERT_OK(consensus_->EmulateElection()); |
| |
| // Now tell consensus that 'round' has been majority replicated, this _shouldn't_ |
| // advance the committed index, since that belongs to a previous term. |
| OpId new_committed_index; |
| consensus_->UpdateMajorityReplicated(round->id(), &new_committed_index); |
| ASSERT_OPID_EQ(committed_index, new_committed_index); |
| |
| const scoped_refptr<ConsensusRound>& last_config_round = rounds_[2]; |
| |
| // Now notify that the last change config was committed, this should advance the |
| // commit index to the id of the last change config. |
| consensus_->UpdateMajorityReplicated(last_config_round->id(), &committed_index); |
| |
| DumpRounds(); |
| ASSERT_OPID_EQ(last_config_round->id(), committed_index); |
| } |
| |
| // Asserts that a ConsensusRound has an OpId set in its ReplicateMsg. |
| MATCHER(HasOpId, "") { return arg->id().IsInitialized(); } |
| |
| // These matchers assert that a Status object is of a certain type. |
| MATCHER(IsOk, "") { return arg.ok(); } |
| MATCHER(IsAborted, "") { return arg.IsAborted(); } |
| |
| // Tests that consensus is able to handle pending operations. It tests this in two ways: |
| // - It tests that consensus does the right thing with pending transactions from the the WAL. |
| // - It tests that when a follower gets promoted to leader it does the right thing |
| // with the pending operations. |
| TEST_F(RaftConsensusTest, TestPendingTransactions) { |
| SetUpConsensus(10); |
| |
| // Emulate a stateful system by having a bunch of operations in flight when consensus starts. |
| // Specifically we emulate we're on term 10, with 5 operations before the last known |
| // committed operation, 10.104, which should be committed immediately, and 5 operations after the |
| // last known committed operation, which should be pending but not yet committed. |
| ConsensusBootstrapInfo info; |
| info.last_id.set_term(10); |
| for (int i = 0; i < 10; i++) { |
| auto replicate = new ReplicateMsg(); |
| replicate->set_op_type(NO_OP); |
| info.last_id.set_index(100 + i); |
| replicate->mutable_id()->CopyFrom(info.last_id); |
| info.orphaned_replicates.push_back(replicate); |
| } |
| |
| info.last_committed_id.set_term(10); |
| info.last_committed_id.set_index(104); |
| |
| { |
| InSequence dummy; |
| // On start we expect 10 NO_OPs to be enqueues, with 5 of those having |
| // their commit continuation called immediately. |
| EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_)) |
| .Times(10); |
| |
| // Queue gets initted when the peer starts. |
| EXPECT_CALL(*queue_, Init(_)) |
| .Times(1); |
| } |
| |
| ASSERT_OK(consensus_->Start(info)); |
| |
| ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_)); |
| ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get())); |
| ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_)); |
| ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get())); |
| |
| // Now we test what this peer does with the pending operations once it's elected leader. |
| { |
| InSequence dummy; |
| // Peer manager gets updated with the new set of peers to send stuff to. |
| EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
| .Times(1).WillOnce(Return(Status::OK())); |
| // The no-op should be appended to the queue. |
| // One more op will be appended for the election. |
| EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
| .Times(1); |
| EXPECT_CALL(*queue_, AppendOperationsMock(_, _)) |
| .Times(1).WillRepeatedly(Return(Status::OK()));; |
| } |
| |
| // Emulate an election, this will make this peer become leader and trigger the |
| // above set expectations. |
| ASSERT_OK(consensus_->EmulateElection()); |
| |
| ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_)); |
| ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get())); |
| ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_)); |
| |
| // Commit the 5 no-ops from the previous term, along with the one pushed to |
| // assert leadership. |
| EXPECT_CALL(*consensus_.get(), NonTxRoundReplicationFinished(HasOpId(), _, IsOk())) |
| .Times(6); |
| EXPECT_CALL(*peer_manager_, SignalRequest(_)) |
| .Times(AnyNumber()); |
| // In the end peer manager and the queue get closed. |
| EXPECT_CALL(*peer_manager_, Close()) |
| .Times(AtLeast(1)); |
| EXPECT_CALL(*queue_, Close()) |
| .Times(1); |
| |
| // Now tell consensus all original orphaned replicates were majority replicated. |
| // This should not advance the committed index because we haven't replicated |
| // anything in the current term. |
| OpId committed_index; |
| consensus_->UpdateMajorityReplicated(info.orphaned_replicates.back()->id(), |
| &committed_index); |
| // Should still be the last committed in the the wal. |
| ASSERT_OPID_EQ(committed_index, info.last_committed_id); |
| |
| // Now mark the last operation (the no-op round) as committed. |
| // This should advance the committed index, since that round in on our current term, |
| // and we should be able to commit all previous rounds. |
| OpId cc_round_id = info.orphaned_replicates.back()->id(); |
| cc_round_id.set_term(11); |
| cc_round_id.set_index(cc_round_id.index() + 1); |
| consensus_->UpdateMajorityReplicated(cc_round_id, |
| &committed_index); |
| |
| ASSERT_OPID_EQ(committed_index, cc_round_id); |
| } |
| |
| MATCHER_P2(RoundHasOpId, term, index, "") { |
| LOG(INFO) << "expected: " << MakeOpId(term, index) << ", actual: " << arg->id(); |
| return arg->id().term() == term && arg->id().index() == index; |
| } |
| |
| // Tests the case where a a leader is elected and pushed a sequence of |
| // operations of which some never get committed. Eventually a new leader in a higher |
| // term pushes operations that overwrite some of the original indexes. |
| TEST_F(RaftConsensusTest, TestAbortOperations) { |
| SetUpConsensus(1, 2); |
| |
| EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
| .Times(AnyNumber()); |
| |
| EXPECT_CALL(*peer_manager_, SignalRequest(_)) |
| .Times(AnyNumber()); |
| EXPECT_CALL(*peer_manager_, Close()) |
| .Times(AtLeast(1)); |
| EXPECT_CALL(*queue_, Close()) |
| .Times(1); |
| EXPECT_CALL(*queue_, Init(_)) |
| .Times(1); |
| EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
| .Times(1) |
| .WillRepeatedly(Return(Status::OK())); |
| |
| // We'll append to the queue 12 times, the initial noop txn + 10 initial ops while leader |
| // and the new leader's update, when we're overwriting operations. |
| EXPECT_CALL(*queue_, AppendOperationsMock(_, _)) |
| .Times(12); |
| |
| // .. but those will be overwritten later by another |
| // leader, which will push and commit 5 ops. |
| // Only these five should start as replica rounds. |
| EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_)) |
| .Times(4); |
| |
| ConsensusBootstrapInfo info; |
| ASSERT_OK(consensus_->Start(info)); |
| ASSERT_OK(consensus_->EmulateElection()); |
| |
| // Append 10 rounds: 2.2 - 2.11 |
| for (int i = 0; i < 10; i++) { |
| AppendNoOpRound(); |
| } |
| |
| // Expectations for what gets committed and what gets aborted: |
| // (note: the aborts may be triggered before the commits) |
| // 5 OK's for the 2.1-2.5 ops. |
| // 6 Aborts for the 2.6-2.11 ops. |
| // 1 OK for the 3.6 op. |
| for (int index = 1; index < 6; index++) { |
| EXPECT_CALL(*consensus_.get(), |
| NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsOk())).Times(1); |
| } |
| for (int index = 6; index < 12; index++) { |
| EXPECT_CALL(*consensus_.get(), |
| NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsAborted())).Times(1); |
| } |
| EXPECT_CALL(*consensus_.get(), |
| NonTxRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1); |
| |
| // Nothing's committed so far, so now just send an Update() message |
| // emulating another guy got elected leader and is overwriting a suffix |
| // of the previous messages. |
| // In particular this request has: |
| // - Op 2.5 from the previous leader's term |
| // - Ops 3.6-3.9 from the new leader's term |
| // - A new committed index of 3.6 |
| ConsensusRequestPB request; |
| request.set_caller_term(3); |
| const string PEER_0_UUID = "peer-0"; |
| request.set_caller_uuid(PEER_0_UUID); |
| request.set_tablet_id(kTestTablet); |
| request.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
| |
| ReplicateMsg* replicate = request.add_ops(); |
| replicate->mutable_id()->CopyFrom(MakeOpId(2, 5)); |
| replicate->set_op_type(NO_OP); |
| |
| ReplicateMsg* noop_msg = request.add_ops(); |
| noop_msg->mutable_id()->CopyFrom(MakeOpId(3, 6)); |
| noop_msg->set_op_type(NO_OP); |
| noop_msg->set_timestamp(clock_->Now().ToUint64()); |
| noop_msg->mutable_noop_request(); |
| |
| // Overwrite another 3 of the original rounds for a total of 4 overwrites. |
| for (int i = 7; i < 10; i++) { |
| ReplicateMsg* replicate = request.add_ops(); |
| replicate->mutable_id()->CopyFrom(MakeOpId(3, i)); |
| replicate->set_op_type(NO_OP); |
| replicate->set_timestamp(clock_->Now().ToUint64()); |
| } |
| |
| request.mutable_committed_index()->CopyFrom(MakeOpId(3, 6)); |
| |
| ConsensusResponsePB response; |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.has_error()); |
| |
| ASSERT_TRUE(Mock::VerifyAndClearExpectations(consensus_.get())); |
| |
| // Now we expect to commit ops 3.7 - 3.9. |
| for (int index = 7; index < 10; index++) { |
| EXPECT_CALL(*consensus_.get(), |
| NonTxRoundReplicationFinished(RoundHasOpId(3, index), _, IsOk())).Times(1); |
| } |
| |
| request.mutable_ops()->Clear(); |
| request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9)); |
| request.mutable_committed_index()->CopyFrom(MakeOpId(3, 9)); |
| |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.has_error()); |
| } |
| |
| TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) { |
| SetUpConsensus(); |
| OpId opid; |
| ASSERT_OK(consensus_->GetLastOpId(RECEIVED_OPID, &opid)); |
| ASSERT_TRUE(opid.IsInitialized()); |
| ASSERT_OPID_EQ(opid, MinimumOpId()); |
| } |
| |
| // Ensure that followers reset their "last_received_current_leader" |
| // ConsensusStatusPB field when a new term is encountered. This is a |
| // correctness test for the logic on the follower side that allows the |
| // leader-side queue to determine which op to send next in various scenarios. |
| TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) { |
| SetUpConsensus(kMinimumTerm, 3); |
| SetUpGeneralExpectations(); |
| ConsensusBootstrapInfo info; |
| ASSERT_OK(consensus_->Start(info)); |
| |
| ConsensusRequestPB request; |
| ConsensusResponsePB response; |
| int64_t caller_term = 0; |
| int64_t log_index = 0; |
| |
| caller_term = 1; |
| string caller_uuid = config_.peers(0).permanent_uuid(); |
| OpId preceding_opid = MinimumOpId(); |
| |
| // Heartbeat. This will cause the term to increment on the follower. |
| request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
| response.Clear(); |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
| ASSERT_EQ(caller_term, response.responder_term()); |
| ASSERT_OPID_EQ(response.status().last_received(), MinimumOpId()); |
| ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); |
| |
| // Replicate a no-op. |
| OpId noop_opid = MakeOpId(caller_term, ++log_index); |
| AddNoOpToConsensusRequest(&request, noop_opid); |
| response.Clear(); |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
| ASSERT_OPID_EQ(response.status().last_received(), noop_opid); |
| ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); |
| |
| // New leader heartbeat. Term increase to 2. |
| // Expect current term replicated to be nothing (MinimumOpId) but log |
| // replicated to be everything sent so far. |
| caller_term = 2; |
| caller_uuid = config_.peers(1).permanent_uuid(); |
| preceding_opid = noop_opid; |
| request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
| response.Clear(); |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
| ASSERT_EQ(caller_term, response.responder_term()); |
| ASSERT_OPID_EQ(response.status().last_received(), preceding_opid); |
| ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); |
| |
| // Append a no-op. |
| noop_opid = MakeOpId(caller_term, ++log_index); |
| AddNoOpToConsensusRequest(&request, noop_opid); |
| response.Clear(); |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
| ASSERT_OPID_EQ(response.status().last_received(), noop_opid); |
| ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); |
| |
| // New leader heartbeat. The term should rev but we should get an LMP mismatch. |
| caller_term = 3; |
| caller_uuid = config_.peers(0).permanent_uuid(); |
| preceding_opid = MakeOpId(caller_term, log_index + 1); // Not replicated yet. |
| request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
| response.Clear(); |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_EQ(caller_term, response.responder_term()); |
| ASSERT_OPID_EQ(response.status().last_received(), noop_opid); // Not preceding this time. |
| ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); |
| ASSERT_TRUE(response.status().has_error()) << response.ShortDebugString(); |
| ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, response.status().error().code()); |
| |
| // Decrement preceding and append a no-op. |
| preceding_opid = MakeOpId(2, log_index); |
| noop_opid = MakeOpId(caller_term, ++log_index); |
| request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
| AddNoOpToConsensusRequest(&request, noop_opid); |
| response.Clear(); |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
| ASSERT_OPID_EQ(response.status().last_received(), noop_opid) << response.ShortDebugString(); |
| ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid) |
| << response.ShortDebugString(); |
| |
| // Happy case. New leader with new no-op to append right off the bat. |
| // Response should be OK with all last_received* fields equal to the new no-op. |
| caller_term = 4; |
| caller_uuid = config_.peers(1).permanent_uuid(); |
| preceding_opid = noop_opid; |
| noop_opid = MakeOpId(caller_term, ++log_index); |
| request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
| AddNoOpToConsensusRequest(&request, noop_opid); |
| response.Clear(); |
| ASSERT_OK(consensus_->Update(&request, &response)); |
| ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
| ASSERT_EQ(caller_term, response.responder_term()); |
| ASSERT_OPID_EQ(response.status().last_received(), noop_opid); |
| ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); |
| } |
| |
| } // namespace consensus |
| } // namespace kudu |