blob: e874d5dfb1b891fb311494375cd648a35298b698 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/consensus-test-util.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/peer_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/server/logical_clock.h"
#include "kudu/util/async_util.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_bool(enable_leader_failure_detection);
METRIC_DECLARE_entity(tablet);
using std::shared_ptr;
using std::string;
namespace kudu {
namespace consensus {
using log::Log;
using log::LogOptions;
using ::testing::_;
using ::testing::AnyNumber;
using ::testing::AtLeast;
using ::testing::Eq;
using ::testing::InSequence;
using ::testing::Invoke;
using ::testing::Mock;
using ::testing::Property;
using ::testing::Return;
const char* kTestTablet = "TestTablet";
const char* kLocalPeerUuid = "peer-0";
// A simple map to collect the results of a sequence of transactions.
typedef std::map<OpId, Status, OpIdCompareFunctor> StatusesMap;
class MockQueue : public PeerMessageQueue {
public:
explicit MockQueue(const scoped_refptr<MetricEntity>& metric_entity, log::Log* log)
: PeerMessageQueue(metric_entity, log, FakeRaftPeerPB(kLocalPeerUuid), kTestTablet) {}
MOCK_METHOD1(Init, void(const OpId& locally_replicated_index));
MOCK_METHOD3(SetLeaderMode, void(const OpId& committed_opid,
int64_t current_term,
const RaftConfigPB& active_config));
MOCK_METHOD0(SetNonLeaderMode, void());
virtual Status AppendOperations(const vector<ReplicateRefPtr>& msgs,
const StatusCallback& callback) OVERRIDE {
return AppendOperationsMock(msgs, callback);
}
MOCK_METHOD2(AppendOperationsMock, Status(const vector<ReplicateRefPtr>& msgs,
const StatusCallback& callback));
MOCK_METHOD1(TrackPeer, void(const string&));
MOCK_METHOD1(UntrackPeer, void(const string&));
MOCK_METHOD4(RequestForPeer, Status(const std::string& uuid,
ConsensusRequestPB* request,
std::vector<ReplicateRefPtr>* msg_refs,
bool* needs_remote_bootstrap));
MOCK_METHOD3(ResponseFromPeer, void(const std::string& peer_uuid,
const ConsensusResponsePB& response,
bool* more_pending));
MOCK_METHOD0(Close, void());
};
class MockPeerManager : public PeerManager {
public:
MockPeerManager() : PeerManager("", "", nullptr, nullptr, nullptr, nullptr) {}
MOCK_METHOD1(UpdateRaftConfig, Status(const consensus::RaftConfigPB& config));
MOCK_METHOD1(SignalRequest, void(bool force_if_queue_empty));
MOCK_METHOD0(Close, void());
};
class RaftConsensusSpy : public RaftConsensus {
public:
typedef Callback<Status(const scoped_refptr<ConsensusRound>& round)> AppendCallback;
RaftConsensusSpy(const ConsensusOptions& options,
gscoped_ptr<ConsensusMetadata> cmeta,
gscoped_ptr<PeerProxyFactory> proxy_factory,
gscoped_ptr<PeerMessageQueue> queue,
gscoped_ptr<PeerManager> peer_manager,
gscoped_ptr<ThreadPool> thread_pool,
const scoped_refptr<MetricEntity>& metric_entity,
const std::string& peer_uuid,
const scoped_refptr<server::Clock>& clock,
ReplicaTransactionFactory* txn_factory,
const scoped_refptr<log::Log>& log,
const shared_ptr<MemTracker>& parent_mem_tracker,
const Callback<void(const std::string& reason)>& mark_dirty_clbk)
: RaftConsensus(options,
std::move(cmeta),
std::move(proxy_factory),
std::move(queue),
std::move(peer_manager),
std::move(thread_pool),
metric_entity,
peer_uuid,
clock,
txn_factory,
log,
parent_mem_tracker,
mark_dirty_clbk) {
// These "aliases" allow us to count invocations and assert on them.
ON_CALL(*this, StartConsensusOnlyRoundUnlocked(_))
.WillByDefault(Invoke(this,
&RaftConsensusSpy::StartNonLeaderConsensusRoundUnlockedConcrete));
ON_CALL(*this, NonTxRoundReplicationFinished(_, _, _))
.WillByDefault(Invoke(this, &RaftConsensusSpy::NonTxRoundReplicationFinishedConcrete));
}
MOCK_METHOD1(AppendNewRoundToQueueUnlocked, Status(const scoped_refptr<ConsensusRound>& round));
Status AppendNewRoundToQueueUnlockedConcrete(const scoped_refptr<ConsensusRound>& round) {
return RaftConsensus::AppendNewRoundToQueueUnlocked(round);
}
MOCK_METHOD1(StartConsensusOnlyRoundUnlocked, Status(const ReplicateRefPtr& msg));
Status StartNonLeaderConsensusRoundUnlockedConcrete(const ReplicateRefPtr& msg) {
return RaftConsensus::StartConsensusOnlyRoundUnlocked(msg);
}
MOCK_METHOD3(NonTxRoundReplicationFinished, void(ConsensusRound* round,
const StatusCallback& client_cb,
const Status& status));
void NonTxRoundReplicationFinishedConcrete(ConsensusRound* round,
const StatusCallback& client_cb,
const Status& status) {
LOG(INFO) << "Committing round with opid " << round->id()
<< " given Status " << status.ToString();
RaftConsensus::NonTxRoundReplicationFinished(round, client_cb, status);
}
private:
DISALLOW_COPY_AND_ASSIGN(RaftConsensusSpy);
};
void DoNothing(const string& s) {
}
class RaftConsensusTest : public KuduTest {
public:
RaftConsensusTest()
: clock_(server::LogicalClock::CreateStartingAt(Timestamp(0))),
metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-consensus-test")),
schema_(GetSimpleTestSchema()) {
FLAGS_enable_leader_failure_detection = false;
options_.tablet_id = kTestTablet;
}
virtual void SetUp() OVERRIDE {
LogOptions options;
string test_path = GetTestPath("test-peer-root");
// TODO mock the Log too, since we're gonna mock the queue
// monitors and pretty much everything else.
fs_manager_.reset(new FsManager(env_.get(), test_path));
CHECK_OK(fs_manager_->CreateInitialFileSystemLayout());
CHECK_OK(fs_manager_->Open());
CHECK_OK(Log::Open(LogOptions(),
fs_manager_.get(),
kTestTablet,
schema_,
0, // schema_version
NULL,
&log_));
queue_ = new MockQueue(metric_entity_, log_.get());
peer_manager_ = new MockPeerManager;
txn_factory_.reset(new MockTransactionFactory);
ON_CALL(*queue_, AppendOperationsMock(_, _))
.WillByDefault(Invoke(this, &RaftConsensusTest::AppendToLog));
}
void SetUpConsensus(int64_t initial_term = consensus::kMinimumTerm, int num_peers = 1) {
config_ = BuildRaftConfigPBForTests(num_peers);
config_.set_opid_index(kInvalidOpIdIndex);
gscoped_ptr<PeerProxyFactory> proxy_factory(new LocalTestPeerProxyFactory(nullptr));
string peer_uuid = config_.peers(num_peers - 1).permanent_uuid();
gscoped_ptr<ConsensusMetadata> cmeta;
CHECK_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid,
config_, initial_term, &cmeta));
gscoped_ptr<ThreadPool> thread_pool;
CHECK_OK(ThreadPoolBuilder("raft-pool") .Build(&thread_pool));
consensus_.reset(new RaftConsensusSpy(options_,
std::move(cmeta),
std::move(proxy_factory),
gscoped_ptr<PeerMessageQueue>(queue_),
gscoped_ptr<PeerManager>(peer_manager_),
std::move(thread_pool),
metric_entity_,
peer_uuid,
clock_,
txn_factory_.get(),
log_.get(),
MemTracker::GetRootTracker(),
Bind(&DoNothing)));
ON_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
.WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRound));
}
Status AppendToLog(const vector<ReplicateRefPtr>& msgs,
const StatusCallback& callback) {
return log_->AsyncAppendReplicates(msgs,
Bind(LogAppendCallback, callback));
}
static void LogAppendCallback(const StatusCallback& callback,
const Status& s) {
CHECK_OK(s);
callback.Run(s);
}
Status MockAppendNewRound(const scoped_refptr<ConsensusRound>& round) {
rounds_.push_back(round);
RETURN_NOT_OK(consensus_->AppendNewRoundToQueueUnlockedConcrete(round));
LOG(INFO) << "Round append: " << round->id() << ", ReplicateMsg: "
<< round->replicate_msg()->ShortDebugString();
return Status::OK();
}
void SetUpGeneralExpectations() {
EXPECT_CALL(*peer_manager_, SignalRequest(_))
.Times(AnyNumber());
EXPECT_CALL(*peer_manager_, Close())
.Times(AtLeast(1));
EXPECT_CALL(*queue_, Close())
.Times(1);
EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
.Times(AnyNumber());
}
// Create a ConsensusRequestPB suitable to send to a peer.
ConsensusRequestPB MakeConsensusRequest(int64_t caller_term,
const string& caller_uuid,
const OpId& preceding_opid);
// Add a single no-op with the given OpId to a ConsensusRequestPB.
void AddNoOpToConsensusRequest(ConsensusRequestPB* request, const OpId& noop_opid);
scoped_refptr<ConsensusRound> AppendNoOpRound() {
ReplicateRefPtr replicate_ptr(make_scoped_refptr_replicate(new ReplicateMsg));
replicate_ptr->get()->set_op_type(NO_OP);
replicate_ptr->get()->set_timestamp(clock_->Now().ToUint64());
scoped_refptr<ConsensusRound> round(new ConsensusRound(consensus_.get(), replicate_ptr));
round->SetConsensusReplicatedCallback(
Bind(&RaftConsensusSpy::NonTxRoundReplicationFinished,
Unretained(consensus_.get()), Unretained(round.get()), Bind(&DoNothingStatusCB)));
CHECK_OK(consensus_->Replicate(round));
LOG(INFO) << "Appended NO_OP round with opid " << round->id();
return round;
}
void DumpRounds() {
LOG(INFO) << "Dumping rounds...";
for (const scoped_refptr<ConsensusRound>& round : rounds_) {
LOG(INFO) << "Round: OpId " << round->id() << ", ReplicateMsg: "
<< round->replicate_msg()->ShortDebugString();
}
}
protected:
ConsensusOptions options_;
RaftConfigPB config_;
OpId initial_id_;
gscoped_ptr<FsManager> fs_manager_;
scoped_refptr<Log> log_;
gscoped_ptr<PeerProxyFactory> proxy_factory_;
scoped_refptr<server::Clock> clock_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
const Schema schema_;
scoped_refptr<RaftConsensusSpy> consensus_;
vector<scoped_refptr<ConsensusRound> > rounds_;
// Mocks.
// NOTE: both 'queue_' and 'peer_manager_' belong to 'consensus_' and may be deleted before
// the test is.
MockQueue* queue_;
MockPeerManager* peer_manager_;
gscoped_ptr<MockTransactionFactory> txn_factory_;
};
ConsensusRequestPB RaftConsensusTest::MakeConsensusRequest(int64_t caller_term,
const string& caller_uuid,
const OpId& preceding_opid) {
ConsensusRequestPB request;
request.set_caller_term(caller_term);
request.set_caller_uuid(caller_uuid);
request.set_tablet_id(kTestTablet);
*request.mutable_preceding_id() = preceding_opid;
return request;
}
void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request,
const OpId& noop_opid) {
ReplicateMsg* noop_msg = request->add_ops();
*noop_msg->mutable_id() = noop_opid;
noop_msg->set_op_type(NO_OP);
noop_msg->set_timestamp(clock_->Now().ToUint64());
noop_msg->mutable_noop_request();
}
// Tests that the committed index moves along with the majority replicated
// index when the terms are the same.
TEST_F(RaftConsensusTest, TestCommittedIndexWhenInSameTerm) {
SetUpConsensus();
SetUpGeneralExpectations();
EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
.Times(1)
.WillOnce(Return(Status::OK()));
EXPECT_CALL(*queue_, Init(_))
.Times(1);
EXPECT_CALL(*queue_, SetLeaderMode(_, _, _))
.Times(1);
EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
.Times(11);
EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
.Times(11).WillRepeatedly(Return(Status::OK()));
ConsensusBootstrapInfo info;
ASSERT_OK(consensus_->Start(info));
ASSERT_OK(consensus_->EmulateElection());
// Commit the first noop round, created on EmulateElection();
OpId committed_index;
ASSERT_FALSE(rounds_.empty()) << "rounds_ is empty!";
consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index);
ASSERT_OPID_EQ(rounds_[0]->id(), committed_index);
// Append 10 rounds
for (int i = 0; i < 10; i++) {
scoped_refptr<ConsensusRound> round = AppendNoOpRound();
// queue reports majority replicated index in the leader's term
// committed index should move accordingly.
consensus_->UpdateMajorityReplicated(round->id(), &committed_index);
ASSERT_OPID_EQ(round->id(), committed_index);
}
}
// Tests that, when terms change, the commit index only advances when the majority
// replicated index is in the current term.
TEST_F(RaftConsensusTest, TestCommittedIndexWhenTermsChange) {
SetUpConsensus();
SetUpGeneralExpectations();
EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
.Times(2)
.WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(*queue_, Init(_))
.Times(1);
EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
.Times(3);
EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
.Times(3).WillRepeatedly(Return(Status::OK()));;
ConsensusBootstrapInfo info;
ASSERT_OK(consensus_->Start(info));
ASSERT_OK(consensus_->EmulateElection());
OpId committed_index;
consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index);
ASSERT_OPID_EQ(rounds_[0]->id(), committed_index);
// Append another round in the current term (besides the original config round).
scoped_refptr<ConsensusRound> round = AppendNoOpRound();
// Now emulate an election, the same guy will be leader but the term
// will change.
ASSERT_OK(consensus_->EmulateElection());
// Now tell consensus that 'round' has been majority replicated, this _shouldn't_
// advance the committed index, since that belongs to a previous term.
OpId new_committed_index;
consensus_->UpdateMajorityReplicated(round->id(), &new_committed_index);
ASSERT_OPID_EQ(committed_index, new_committed_index);
const scoped_refptr<ConsensusRound>& last_config_round = rounds_[2];
// Now notify that the last change config was committed, this should advance the
// commit index to the id of the last change config.
consensus_->UpdateMajorityReplicated(last_config_round->id(), &committed_index);
DumpRounds();
ASSERT_OPID_EQ(last_config_round->id(), committed_index);
}
// Asserts that a ConsensusRound has an OpId set in its ReplicateMsg.
MATCHER(HasOpId, "") { return arg->id().IsInitialized(); }
// These matchers assert that a Status object is of a certain type.
MATCHER(IsOk, "") { return arg.ok(); }
MATCHER(IsAborted, "") { return arg.IsAborted(); }
// Tests that consensus is able to handle pending operations. It tests this in two ways:
// - It tests that consensus does the right thing with pending transactions from the the WAL.
// - It tests that when a follower gets promoted to leader it does the right thing
// with the pending operations.
TEST_F(RaftConsensusTest, TestPendingTransactions) {
SetUpConsensus(10);
// Emulate a stateful system by having a bunch of operations in flight when consensus starts.
// Specifically we emulate we're on term 10, with 5 operations before the last known
// committed operation, 10.104, which should be committed immediately, and 5 operations after the
// last known committed operation, which should be pending but not yet committed.
ConsensusBootstrapInfo info;
info.last_id.set_term(10);
for (int i = 0; i < 10; i++) {
auto replicate = new ReplicateMsg();
replicate->set_op_type(NO_OP);
info.last_id.set_index(100 + i);
replicate->mutable_id()->CopyFrom(info.last_id);
info.orphaned_replicates.push_back(replicate);
}
info.last_committed_id.set_term(10);
info.last_committed_id.set_index(104);
{
InSequence dummy;
// On start we expect 10 NO_OPs to be enqueues, with 5 of those having
// their commit continuation called immediately.
EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
.Times(10);
// Queue gets initted when the peer starts.
EXPECT_CALL(*queue_, Init(_))
.Times(1);
}
ASSERT_OK(consensus_->Start(info));
ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get()));
// Now we test what this peer does with the pending operations once it's elected leader.
{
InSequence dummy;
// Peer manager gets updated with the new set of peers to send stuff to.
EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
.Times(1).WillOnce(Return(Status::OK()));
// The no-op should be appended to the queue.
// One more op will be appended for the election.
EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
.Times(1);
EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
.Times(1).WillRepeatedly(Return(Status::OK()));;
}
// Emulate an election, this will make this peer become leader and trigger the
// above set expectations.
ASSERT_OK(consensus_->EmulateElection());
ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
// Commit the 5 no-ops from the previous term, along with the one pushed to
// assert leadership.
EXPECT_CALL(*consensus_.get(), NonTxRoundReplicationFinished(HasOpId(), _, IsOk()))
.Times(6);
EXPECT_CALL(*peer_manager_, SignalRequest(_))
.Times(AnyNumber());
// In the end peer manager and the queue get closed.
EXPECT_CALL(*peer_manager_, Close())
.Times(AtLeast(1));
EXPECT_CALL(*queue_, Close())
.Times(1);
// Now tell consensus all original orphaned replicates were majority replicated.
// This should not advance the committed index because we haven't replicated
// anything in the current term.
OpId committed_index;
consensus_->UpdateMajorityReplicated(info.orphaned_replicates.back()->id(),
&committed_index);
// Should still be the last committed in the the wal.
ASSERT_OPID_EQ(committed_index, info.last_committed_id);
// Now mark the last operation (the no-op round) as committed.
// This should advance the committed index, since that round in on our current term,
// and we should be able to commit all previous rounds.
OpId cc_round_id = info.orphaned_replicates.back()->id();
cc_round_id.set_term(11);
cc_round_id.set_index(cc_round_id.index() + 1);
consensus_->UpdateMajorityReplicated(cc_round_id,
&committed_index);
ASSERT_OPID_EQ(committed_index, cc_round_id);
}
MATCHER_P2(RoundHasOpId, term, index, "") {
LOG(INFO) << "expected: " << MakeOpId(term, index) << ", actual: " << arg->id();
return arg->id().term() == term && arg->id().index() == index;
}
// Tests the case where a a leader is elected and pushed a sequence of
// operations of which some never get committed. Eventually a new leader in a higher
// term pushes operations that overwrite some of the original indexes.
TEST_F(RaftConsensusTest, TestAbortOperations) {
SetUpConsensus(1, 2);
EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
.Times(AnyNumber());
EXPECT_CALL(*peer_manager_, SignalRequest(_))
.Times(AnyNumber());
EXPECT_CALL(*peer_manager_, Close())
.Times(AtLeast(1));
EXPECT_CALL(*queue_, Close())
.Times(1);
EXPECT_CALL(*queue_, Init(_))
.Times(1);
EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
.Times(1)
.WillRepeatedly(Return(Status::OK()));
// We'll append to the queue 12 times, the initial noop txn + 10 initial ops while leader
// and the new leader's update, when we're overwriting operations.
EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
.Times(12);
// .. but those will be overwritten later by another
// leader, which will push and commit 5 ops.
// Only these five should start as replica rounds.
EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
.Times(4);
ConsensusBootstrapInfo info;
ASSERT_OK(consensus_->Start(info));
ASSERT_OK(consensus_->EmulateElection());
// Append 10 rounds: 2.2 - 2.11
for (int i = 0; i < 10; i++) {
AppendNoOpRound();
}
// Expectations for what gets committed and what gets aborted:
// (note: the aborts may be triggered before the commits)
// 5 OK's for the 2.1-2.5 ops.
// 6 Aborts for the 2.6-2.11 ops.
// 1 OK for the 3.6 op.
for (int index = 1; index < 6; index++) {
EXPECT_CALL(*consensus_.get(),
NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsOk())).Times(1);
}
for (int index = 6; index < 12; index++) {
EXPECT_CALL(*consensus_.get(),
NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsAborted())).Times(1);
}
EXPECT_CALL(*consensus_.get(),
NonTxRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1);
// Nothing's committed so far, so now just send an Update() message
// emulating another guy got elected leader and is overwriting a suffix
// of the previous messages.
// In particular this request has:
// - Op 2.5 from the previous leader's term
// - Ops 3.6-3.9 from the new leader's term
// - A new committed index of 3.6
ConsensusRequestPB request;
request.set_caller_term(3);
const string PEER_0_UUID = "peer-0";
request.set_caller_uuid(PEER_0_UUID);
request.set_tablet_id(kTestTablet);
request.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
ReplicateMsg* replicate = request.add_ops();
replicate->mutable_id()->CopyFrom(MakeOpId(2, 5));
replicate->set_op_type(NO_OP);
ReplicateMsg* noop_msg = request.add_ops();
noop_msg->mutable_id()->CopyFrom(MakeOpId(3, 6));
noop_msg->set_op_type(NO_OP);
noop_msg->set_timestamp(clock_->Now().ToUint64());
noop_msg->mutable_noop_request();
// Overwrite another 3 of the original rounds for a total of 4 overwrites.
for (int i = 7; i < 10; i++) {
ReplicateMsg* replicate = request.add_ops();
replicate->mutable_id()->CopyFrom(MakeOpId(3, i));
replicate->set_op_type(NO_OP);
replicate->set_timestamp(clock_->Now().ToUint64());
}
request.mutable_committed_index()->CopyFrom(MakeOpId(3, 6));
ConsensusResponsePB response;
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.has_error());
ASSERT_TRUE(Mock::VerifyAndClearExpectations(consensus_.get()));
// Now we expect to commit ops 3.7 - 3.9.
for (int index = 7; index < 10; index++) {
EXPECT_CALL(*consensus_.get(),
NonTxRoundReplicationFinished(RoundHasOpId(3, index), _, IsOk())).Times(1);
}
request.mutable_ops()->Clear();
request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9));
request.mutable_committed_index()->CopyFrom(MakeOpId(3, 9));
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.has_error());
}
TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) {
SetUpConsensus();
OpId opid;
ASSERT_OK(consensus_->GetLastOpId(RECEIVED_OPID, &opid));
ASSERT_TRUE(opid.IsInitialized());
ASSERT_OPID_EQ(opid, MinimumOpId());
}
// Ensure that followers reset their "last_received_current_leader"
// ConsensusStatusPB field when a new term is encountered. This is a
// correctness test for the logic on the follower side that allows the
// leader-side queue to determine which op to send next in various scenarios.
TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) {
SetUpConsensus(kMinimumTerm, 3);
SetUpGeneralExpectations();
ConsensusBootstrapInfo info;
ASSERT_OK(consensus_->Start(info));
ConsensusRequestPB request;
ConsensusResponsePB response;
int64_t caller_term = 0;
int64_t log_index = 0;
caller_term = 1;
string caller_uuid = config_.peers(0).permanent_uuid();
OpId preceding_opid = MinimumOpId();
// Heartbeat. This will cause the term to increment on the follower.
request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
response.Clear();
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
ASSERT_EQ(caller_term, response.responder_term());
ASSERT_OPID_EQ(response.status().last_received(), MinimumOpId());
ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
// Replicate a no-op.
OpId noop_opid = MakeOpId(caller_term, ++log_index);
AddNoOpToConsensusRequest(&request, noop_opid);
response.Clear();
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
// New leader heartbeat. Term increase to 2.
// Expect current term replicated to be nothing (MinimumOpId) but log
// replicated to be everything sent so far.
caller_term = 2;
caller_uuid = config_.peers(1).permanent_uuid();
preceding_opid = noop_opid;
request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
response.Clear();
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
ASSERT_EQ(caller_term, response.responder_term());
ASSERT_OPID_EQ(response.status().last_received(), preceding_opid);
ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
// Append a no-op.
noop_opid = MakeOpId(caller_term, ++log_index);
AddNoOpToConsensusRequest(&request, noop_opid);
response.Clear();
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
// New leader heartbeat. The term should rev but we should get an LMP mismatch.
caller_term = 3;
caller_uuid = config_.peers(0).permanent_uuid();
preceding_opid = MakeOpId(caller_term, log_index + 1); // Not replicated yet.
request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
response.Clear();
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_EQ(caller_term, response.responder_term());
ASSERT_OPID_EQ(response.status().last_received(), noop_opid); // Not preceding this time.
ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
ASSERT_TRUE(response.status().has_error()) << response.ShortDebugString();
ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, response.status().error().code());
// Decrement preceding and append a no-op.
preceding_opid = MakeOpId(2, log_index);
noop_opid = MakeOpId(caller_term, ++log_index);
request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
AddNoOpToConsensusRequest(&request, noop_opid);
response.Clear();
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
ASSERT_OPID_EQ(response.status().last_received(), noop_opid) << response.ShortDebugString();
ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid)
<< response.ShortDebugString();
// Happy case. New leader with new no-op to append right off the bat.
// Response should be OK with all last_received* fields equal to the new no-op.
caller_term = 4;
caller_uuid = config_.peers(1).permanent_uuid();
preceding_opid = noop_opid;
noop_opid = MakeOpId(caller_term, ++log_index);
request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
AddNoOpToConsensusRequest(&request, noop_opid);
response.Clear();
ASSERT_OK(consensus_->Update(&request, &response));
ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
ASSERT_EQ(caller_term, response.responder_term());
ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
}
} // namespace consensus
} // namespace kudu