blob: c224a8d4052548012689c87aea5dc053111c7073 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/consensus-test-util.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/peer_manager.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/raft_consensus_state.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/metadata.h"
#include "kudu/server/logical_clock.h"
#include "kudu/util/auto_release_pool.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_int32(raft_heartbeat_interval_ms);
DECLARE_bool(enable_leader_failure_detection);
METRIC_DECLARE_entity(tablet);
#define REPLICATE_SEQUENCE_OF_MESSAGES(a, b, c, d, e, f, g) \
ASSERT_NO_FATAL_FAILURE(ReplicateSequenceOfMessages(a, b, c, d, e, f, g))
using std::shared_ptr;
namespace kudu {
namespace rpc {
class RpcContext;
}
namespace consensus {
using log::Log;
using log::LogEntryPB;
using log::LogOptions;
using log::LogReader;
using rpc::RpcContext;
using strings::Substitute;
using strings::SubstituteAndAppend;
const char* kTestTablet = "TestTablet";
void DoNothing(const string& s) {
}
Status WaitUntilLeaderForTests(RaftConsensus* raft) {
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(MonoDelta::FromSeconds(15));
while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
if (raft->GetActiveRole() == RaftPeerPB::LEADER) {
return Status::OK();
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
return Status::TimedOut("Timed out waiting to become leader");
}
// Test suite for tests that focus on multiple peer interaction, but
// without integrating with other components, such as transactions.
class RaftConsensusQuorumTest : public KuduTest {
public:
RaftConsensusQuorumTest()
: clock_(server::LogicalClock::CreateStartingAt(Timestamp(0))),
metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-test")),
schema_(GetSimpleTestSchema()) {
options_.tablet_id = kTestTablet;
FLAGS_enable_leader_failure_detection = false;
}
// Builds an initial configuration of 'num' elements.
// All of the peers start as followers.
void BuildInitialRaftConfigPB(int num) {
config_ = BuildRaftConfigPBForTests(num);
config_.set_opid_index(kInvalidOpIdIndex);
peers_.reset(new TestPeerMapManager(config_));
}
Status BuildFsManagersAndLogs() {
// Build the fsmanagers and logs
for (int i = 0; i < config_.peers_size(); i++) {
shared_ptr<MemTracker> parent_mem_tracker =
MemTracker::CreateTracker(-1, Substitute("peer-$0", i));
parent_mem_trackers_.push_back(parent_mem_tracker);
string test_path = GetTestPath(Substitute("peer-$0-root", i));
FsManagerOpts opts;
opts.parent_mem_tracker = parent_mem_tracker;
opts.wal_path = test_path;
opts.data_paths = { test_path };
gscoped_ptr<FsManager> fs_manager(new FsManager(env_.get(), opts));
RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout());
RETURN_NOT_OK(fs_manager->Open());
scoped_refptr<Log> log;
RETURN_NOT_OK(Log::Open(LogOptions(),
fs_manager.get(),
kTestTablet,
schema_,
0, // schema_version
NULL,
&log));
logs_.push_back(log.get());
fs_managers_.push_back(fs_manager.release());
}
return Status::OK();
}
void BuildPeers() {
for (int i = 0; i < config_.peers_size(); i++) {
auto proxy_factory = new LocalTestPeerProxyFactory(peers_.get());
auto txn_factory = new TestTransactionFactory(logs_[i].get());
string peer_uuid = Substitute("peer-$0", i);
gscoped_ptr<ConsensusMetadata> cmeta;
CHECK_OK(ConsensusMetadata::Create(fs_managers_[i], kTestTablet, peer_uuid, config_,
kMinimumTerm, &cmeta));
RaftPeerPB local_peer_pb;
CHECK_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb));
gscoped_ptr<PeerMessageQueue> queue(new PeerMessageQueue(metric_entity_,
logs_[i],
local_peer_pb,
kTestTablet));
gscoped_ptr<ThreadPool> thread_pool;
CHECK_OK(ThreadPoolBuilder(Substitute("$0-raft", options_.tablet_id.substr(0, 6)))
.Build(&thread_pool));
gscoped_ptr<PeerManager> peer_manager(
new PeerManager(options_.tablet_id,
config_.peers(i).permanent_uuid(),
proxy_factory,
queue.get(),
thread_pool.get(),
logs_[i]));
scoped_refptr<RaftConsensus> peer(
new RaftConsensus(options_,
std::move(cmeta),
gscoped_ptr<PeerProxyFactory>(proxy_factory),
std::move(queue),
std::move(peer_manager),
std::move(thread_pool),
metric_entity_,
config_.peers(i).permanent_uuid(),
clock_,
txn_factory,
logs_[i],
parent_mem_trackers_[i],
Bind(&DoNothing)));
txn_factory->SetConsensus(peer.get());
txn_factories_.push_back(txn_factory);
peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
}
}
Status StartPeers() {
ConsensusBootstrapInfo boot_info;
TestPeerMap all_peers = peers_->GetPeerMapCopy();
for (const TestPeerMap::value_type& entry : all_peers) {
RETURN_NOT_OK(entry.second->Start(boot_info));
}
return Status::OK();
}
Status BuildConfig(int num) {
BuildInitialRaftConfigPB(num);
RETURN_NOT_OK(BuildFsManagersAndLogs());
BuildPeers();
return Status::OK();
}
Status BuildAndStartConfig(int num) {
RETURN_NOT_OK(BuildConfig(num));
RETURN_NOT_OK(StartPeers());
// Automatically elect the last node in the list.
const int kLeaderIdx = num - 1;
scoped_refptr<RaftConsensus> leader;
RETURN_NOT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
RETURN_NOT_OK(leader->EmulateElection());
return Status::OK();
}
LocalTestPeerProxy* GetLeaderProxyToPeer(int peer_idx, int leader_idx) {
scoped_refptr<RaftConsensus> follower;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &follower));
scoped_refptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
for (LocalTestPeerProxy* proxy : down_cast<LocalTestPeerProxyFactory*>(
leader->peer_proxy_factory_.get())->GetProxies()) {
if (proxy->GetTarget() == follower->peer_uuid()) {
return proxy;
}
}
CHECK(false) << "Proxy not found";
return nullptr;
}
Status AppendDummyMessage(int peer_idx,
scoped_refptr<ConsensusRound>* round) {
gscoped_ptr<ReplicateMsg> msg(new ReplicateMsg());
msg->set_op_type(NO_OP);
msg->mutable_noop_request();
msg->set_timestamp(clock_->Now().ToUint64());
scoped_refptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
// Use a latch in place of a Transaction callback.
gscoped_ptr<Synchronizer> sync(new Synchronizer());
*round = peer->NewRound(std::move(msg), sync->AsStatusCallback());
InsertOrDie(&syncs_, round->get(), sync.release());
RETURN_NOT_OK_PREPEND(peer->Replicate(round->get()),
Substitute("Unable to replicate to peer $0", peer_idx));
return Status::OK();
}
static void FireSharedSynchronizer(const shared_ptr<Synchronizer>& sync, const Status& s) {
sync->StatusCB(s);
}
Status CommitDummyMessage(int peer_idx,
ConsensusRound* round,
shared_ptr<Synchronizer>* commit_sync = nullptr) {
StatusCallback commit_callback;
if (commit_sync != nullptr) {
commit_sync->reset(new Synchronizer());
commit_callback = Bind(&FireSharedSynchronizer, *commit_sync);
} else {
commit_callback = Bind(&DoNothingStatusCB);
}
gscoped_ptr<CommitMsg> msg(new CommitMsg());
msg->set_op_type(NO_OP);
msg->mutable_commited_op_id()->CopyFrom(round->id());
CHECK_OK(logs_[peer_idx]->AsyncAppendCommit(std::move(msg), commit_callback));
return Status::OK();
}
Status WaitForReplicate(ConsensusRound* round) {
return FindOrDie(syncs_, round)->Wait();
}
Status TimedWaitForReplicate(ConsensusRound* round, const MonoDelta& delta) {
return FindOrDie(syncs_, round)->WaitFor(delta);
}
void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) {
scoped_refptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
ReplicaState* state = peer->GetReplicaStateForTests();
while (true) {
{
ReplicaState::UniqueLock lock;
CHECK_OK(state->LockForRead(&lock));
if (OpIdCompare(state->GetLastReceivedOpIdUnlocked(), to_wait_for) >= 0) {
return;
}
}
SleepFor(MonoDelta::FromMilliseconds(1));
}
}
// Waits for an operation to be (database) committed in the replica at index
// 'peer_idx'. If the operation was already committed this returns immediately.
void WaitForCommitIfNotAlreadyPresent(const OpId& to_wait_for,
int peer_idx,
int leader_idx) {
MonoDelta timeout(MonoDelta::FromSeconds(10));
MonoTime start(MonoTime::Now(MonoTime::FINE));
scoped_refptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
ReplicaState* state = peer->GetReplicaStateForTests();
int backoff_exp = 0;
const int kMaxBackoffExp = 8;
OpId committed_op_id;
while (true) {
{
ReplicaState::UniqueLock lock;
CHECK_OK(state->LockForRead(&lock));
committed_op_id = state->GetCommittedOpIdUnlocked();
if (OpIdCompare(committed_op_id, to_wait_for) >= 0) {
return;
}
}
MonoDelta elapsed = MonoTime::Now(MonoTime::FINE).GetDeltaSince(start);
if (elapsed.MoreThan(timeout)) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
}
LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while waiting for commit of "
<< "op " << to_wait_for << " on replica. Last committed op on replica: "
<< committed_op_id << ". Dumping state and quitting.";
vector<string> lines;
scoped_refptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
for (const string& line : lines) {
LOG(ERROR) << line;
}
// Gather the replica and leader operations for printing
vector<LogEntryPB*> replica_ops;
ElementDeleter repl0_deleter(&replica_ops);
GatherLogEntries(peer_idx, logs_[peer_idx], &replica_ops);
vector<LogEntryPB*> leader_ops;
ElementDeleter leader_deleter(&leader_ops);
GatherLogEntries(leader_idx, logs_[leader_idx], &leader_ops);
SCOPED_TRACE(PrintOnError(replica_ops, Substitute("local peer ($0)", peer->peer_uuid())));
SCOPED_TRACE(PrintOnError(leader_ops, Substitute("leader (peer-$0)", leader_idx)));
FAIL() << "Replica did not commit.";
}
// Used in ReplicateSequenceOfMessages() to specify whether
// we should wait for all replicas to have replicated the
// sequence or just a majority.
enum ReplicateWaitMode {
WAIT_FOR_ALL_REPLICAS,
WAIT_FOR_MAJORITY
};
// Used in ReplicateSequenceOfMessages() to specify whether
// we should also commit the messages in the sequence
enum CommitMode {
DONT_COMMIT,
COMMIT_ONE_BY_ONE
};
// Replicates a sequence of messages to the peer passed as leader.
// Optionally waits for the messages to be replicated to followers.
// 'last_op_id' is set to the id of the last replicated operation.
// The operations are only committed if 'commit_one_by_one' is true.
void ReplicateSequenceOfMessages(int seq_size,
int leader_idx,
ReplicateWaitMode wait_mode,
CommitMode commit_mode,
OpId* last_op_id,
vector<scoped_refptr<ConsensusRound> >* rounds,
shared_ptr<Synchronizer>* commit_sync = nullptr) {
for (int i = 0; i < seq_size; i++) {
scoped_refptr<ConsensusRound> round;
ASSERT_OK(AppendDummyMessage(leader_idx, &round));
ASSERT_OK(WaitForReplicate(round.get()));
last_op_id->CopyFrom(round->id());
if (commit_mode == COMMIT_ONE_BY_ONE) {
CommitDummyMessage(leader_idx, round.get(), commit_sync);
}
rounds->push_back(round);
}
if (wait_mode == WAIT_FOR_ALL_REPLICAS) {
scoped_refptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
TestPeerMap all_peers = peers_->GetPeerMapCopy();
int i = 0;
for (const TestPeerMap::value_type& entry : all_peers) {
if (entry.second->peer_uuid() != leader->peer_uuid()) {
WaitForReplicateIfNotAlreadyPresent(*last_op_id, i);
}
i++;
}
}
}
void GatherLogEntries(int idx, const scoped_refptr<Log>& log, vector<LogEntryPB* >* entries) {
ASSERT_OK(log->WaitUntilAllFlushed());
log->Close();
shared_ptr<LogReader> log_reader;
ASSERT_OK(log::LogReader::Open(fs_managers_[idx],
scoped_refptr<log::LogIndex>(),
kTestTablet,
metric_entity_.get(),
&log_reader));
vector<LogEntryPB*> ret;
ElementDeleter deleter(&ret);
log::SegmentSequence segments;
ASSERT_OK(log_reader->GetSegmentsSnapshot(&segments));
for (const log::SegmentSequence::value_type& entry : segments) {
ASSERT_OK(entry->ReadEntries(&ret));
}
entries->swap(ret);
}
// Verifies that the replica's log match the leader's. This deletes the
// peers (so we're sure that no further writes occur) and closes the logs
// so it must be the very last thing to run, in a test.
void VerifyLogs(int leader_idx, int first_replica_idx, int last_replica_idx) {
// Wait for in-flight transactions to be done. We're destroying the
// peers next and leader transactions won't be able to commit anymore.
for (TestTransactionFactory* factory : txn_factories_) {
factory->WaitDone();
}
// Shut down all the peers.
TestPeerMap all_peers = peers_->GetPeerMapCopy();
for (const TestPeerMap::value_type& entry : all_peers) {
entry.second->Shutdown();
}
vector<LogEntryPB*> leader_entries;
ElementDeleter leader_entry_deleter(&leader_entries);
GatherLogEntries(leader_idx, logs_[leader_idx], &leader_entries);
scoped_refptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) {
vector<LogEntryPB*> replica_entries;
ElementDeleter replica_entry_deleter(&replica_entries);
GatherLogEntries(replica_idx, logs_[replica_idx], &replica_entries);
scoped_refptr<RaftConsensus> replica;
CHECK_OK(peers_->GetPeerByIdx(replica_idx, &replica));
VerifyReplica(leader_entries,
replica_entries,
leader->peer_uuid(),
replica->peer_uuid());
}
}
void ExtractReplicateIds(const vector<LogEntryPB*>& entries,
vector<OpId>* ids) {
ids->reserve(entries.size() / 2);
for (const LogEntryPB* entry : entries) {
if (entry->has_replicate()) {
ids->push_back(entry->replicate().id());
}
}
}
void VerifyReplicateOrderMatches(const vector<LogEntryPB*>& leader_entries,
const vector<LogEntryPB*>& replica_entries) {
vector<OpId> leader_ids, replica_ids;
ExtractReplicateIds(leader_entries, &leader_ids);
ExtractReplicateIds(replica_entries, &replica_ids);
ASSERT_EQ(leader_ids.size(), replica_ids.size());
for (int i = 0; i < leader_ids.size(); i++) {
ASSERT_EQ(leader_ids[i].ShortDebugString(),
replica_ids[i].ShortDebugString());
}
}
void VerifyNoCommitsBeforeReplicates(const vector<LogEntryPB*>& entries) {
unordered_set<OpId,
OpIdHashFunctor,
OpIdEqualsFunctor> replication_ops;
for (const LogEntryPB* entry : entries) {
if (entry->has_replicate()) {
ASSERT_TRUE(InsertIfNotPresent(&replication_ops, entry->replicate().id()))
<< "REPLICATE op id showed up twice: " << entry->ShortDebugString();
} else if (entry->has_commit()) {
ASSERT_EQ(1, replication_ops.erase(entry->commit().commited_op_id()))
<< "COMMIT came before associated REPLICATE: " << entry->ShortDebugString();
}
}
}
void VerifyReplica(const vector<LogEntryPB*>& leader_entries,
const vector<LogEntryPB*>& replica_entries,
const string& leader_name,
const string& replica_name) {
SCOPED_TRACE(PrintOnError(leader_entries, Substitute("Leader: $0", leader_name)));
SCOPED_TRACE(PrintOnError(replica_entries, Substitute("Replica: $0", replica_name)));
// Check that the REPLICATE messages come in the same order on both nodes.
VerifyReplicateOrderMatches(leader_entries, replica_entries);
// Check that no COMMIT precedes its related REPLICATE on both the replica
// and leader.
VerifyNoCommitsBeforeReplicates(replica_entries);
VerifyNoCommitsBeforeReplicates(leader_entries);
}
string PrintOnError(const vector<LogEntryPB*>& replica_entries,
const string& replica_id) {
string ret = "";
SubstituteAndAppend(&ret, "$1 log entries for replica $0:\n",
replica_id, replica_entries.size());
for (LogEntryPB* replica_entry : replica_entries) {
StrAppend(&ret, "Replica log entry: ", replica_entry->ShortDebugString(), "\n");
}
return ret;
}
// Read the ConsensusMetadata for the given peer from disk.
gscoped_ptr<ConsensusMetadata> ReadConsensusMetadataFromDisk(int peer_index) {
string peer_uuid = Substitute("peer-$0", peer_index);
gscoped_ptr<ConsensusMetadata> cmeta;
CHECK_OK(ConsensusMetadata::Load(fs_managers_[peer_index], kTestTablet, peer_uuid, &cmeta));
return std::move(cmeta);
}
// Assert that the durable term == term and that the peer that got the vote == voted_for.
void AssertDurableTermAndVote(int peer_index, int64_t term, const std::string& voted_for) {
gscoped_ptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
ASSERT_EQ(term, cmeta->current_term());
ASSERT_EQ(voted_for, cmeta->voted_for());
}
// Assert that the durable term == term and that the peer has not yet voted.
void AssertDurableTermWithoutVote(int peer_index, int64_t term) {
gscoped_ptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
ASSERT_EQ(term, cmeta->current_term());
ASSERT_FALSE(cmeta->has_voted_for());
}
~RaftConsensusQuorumTest() {
peers_->Clear();
STLDeleteElements(&txn_factories_);
// We need to clear the logs before deleting the fs_managers_ or we'll
// get a SIGSEGV when closing the logs.
logs_.clear();
STLDeleteElements(&fs_managers_);
STLDeleteValues(&syncs_);
}
protected:
ConsensusOptions options_;
RaftConfigPB config_;
OpId initial_id_;
vector<shared_ptr<MemTracker> > parent_mem_trackers_;
vector<FsManager*> fs_managers_;
vector<scoped_refptr<Log> > logs_;
gscoped_ptr<TestPeerMapManager> peers_;
vector<TestTransactionFactory*> txn_factories_;
scoped_refptr<server::Clock> clock_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
const Schema schema_;
unordered_map<ConsensusRound*, Synchronizer*> syncs_;
};
// Tests Replicate/Commit a single message through the leader.
TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitMessage) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
vector<scoped_refptr<ConsensusRound> > rounds;
shared_ptr<Synchronizer> commit_sync;
REPLICATE_SEQUENCE_OF_MESSAGES(1,
kLeaderIdx,
WAIT_FOR_ALL_REPLICAS,
DONT_COMMIT,
&last_op_id,
&rounds,
&commit_sync);
// Commit the operation
ASSERT_OK(CommitDummyMessage(kLeaderIdx, rounds[0].get(), &commit_sync));
// Wait for everyone to commit the operations.
// We need to make sure the CommitMsg lands on the leaders log or the
// verification will fail. Since CommitMsgs are appended to the replication
// queue there is a scenario where they land in the followers log before
// landing on the leader's log. However we know that they are durable
// on the leader when the commit callback is triggered.
// We thus wait for the commit callback to trigger, ensuring durability
// on the leader and then for the commits to be present on the replicas.
ASSERT_OK(commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
// Tests Replicate/Commit a sequence of messages through the leader.
// First a sequence of replicates and then a sequence of commits.
TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitSequence) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
int seq_size = AllowSlowTests() ? 1000 : 100;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
vector<scoped_refptr<ConsensusRound> > rounds;
shared_ptr<Synchronizer> commit_sync;
REPLICATE_SEQUENCE_OF_MESSAGES(seq_size,
kLeaderIdx,
WAIT_FOR_ALL_REPLICAS,
DONT_COMMIT,
&last_op_id,
&rounds,
&commit_sync);
// Commit the operations, but wait for the replicates to finish first
for (const scoped_refptr<ConsensusRound>& round : rounds) {
ASSERT_OK(CommitDummyMessage(kLeaderIdx, round.get(), &commit_sync));
}
// See comment at the end of TestFollowersReplicateAndCommitMessage
// for an explanation on this waiting sequence.
ASSERT_OK(commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_replicate;
vector<scoped_refptr<ConsensusRound> > rounds;
{
// lock one of the replicas down by obtaining the state lock
// and never letting it go.
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
ReplicaState::UniqueLock lock;
ASSERT_OK(follower0_rs->LockForRead(&lock));
// If the locked replica would stop consensus we would hang here
// as we wait for operations to be replicated to a majority.
ASSERT_NO_FATAL_FAILURE(ReplicateSequenceOfMessages(
10,
kLeaderIdx,
WAIT_FOR_MAJORITY,
COMMIT_ONE_BY_ONE,
&last_replicate,
&rounds));
// Follower 1 should be fine (Were we to wait for follower0's replicate
// this would hang here). We know he must have replicated but make sure
// by calling Wait().
WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower1Idx);
WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower1Idx, kLeaderIdx);
}
// After we let the lock go the remaining follower should get up-to-date
WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower0Idx);
WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower0Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
scoped_refptr<ConsensusRound> round;
{
// lock two of the replicas down by obtaining the state locks
// and never letting them go.
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
ReplicaState::UniqueLock lock0;
ASSERT_OK(follower0_rs->LockForRead(&lock0));
scoped_refptr<RaftConsensus> follower1;
CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
ReplicaState* follower1_rs = follower1->GetReplicaStateForTests();
ReplicaState::UniqueLock lock1;
ASSERT_OK(follower1_rs->LockForRead(&lock1));
// Append a single message to the queue
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
last_op_id.CopyFrom(round->id());
// This should timeout.
Status status = TimedWaitForReplicate(round.get(), MonoDelta::FromMilliseconds(500));
ASSERT_TRUE(status.IsTimedOut());
}
// After we release the locks the operation should replicate to all replicas
// and we commit.
ASSERT_OK(WaitForReplicate(round.get()));
CommitDummyMessage(kLeaderIdx, round.get());
// Assert that everything was ok
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
// If some communication error happens the leader will resend the request to the
// peers. This tests that the peers handle repeated requests.
TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
// Append a dummy message, with faults injected on the first attempt
// to send the message.
scoped_refptr<ConsensusRound> round;
GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
// We should successfully replicate it due to retries.
ASSERT_OK(WaitForReplicate(round.get()));
GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
ASSERT_OK(CommitDummyMessage(kLeaderIdx, round.get()));
// The commit should eventually reach both followers as well.
last_op_id = round->id();
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
// Append a sequence of messages, and keep injecting errors into the
// replica proxies.
vector<scoped_refptr<ConsensusRound> > rounds;
shared_ptr<Synchronizer> commit_sync;
for (int i = 0; i < 100; i++) {
scoped_refptr<ConsensusRound> round;
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
ConsensusRound* round_ptr = round.get();
last_op_id.CopyFrom(round->id());
rounds.push_back(round);
// inject comm faults
if (i % 2 == 0) {
GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
} else {
GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
}
ASSERT_OK(WaitForReplicate(round_ptr));
ASSERT_OK(CommitDummyMessage(kLeaderIdx, round_ptr, &commit_sync));
}
// Assert last operation was correctly replicated and committed.
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
// See comment at the end of TestFollowersReplicateAndCommitMessage
// for an explanation on this waiting sequence.
ASSERT_OK(commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
// In this test we test the ability of the leader to send heartbeats
// to replicas by simply pushing nothing after the configuration round
// and still expecting for the replicas Update() hooks to be called.
TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildConfig(3));
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
scoped_refptr<RaftConsensus> follower1;
CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
shared_ptr<CounterHooks> counter_hook_rpl0(
new CounterHooks(follower0->GetFaultHooks()));
shared_ptr<CounterHooks> counter_hook_rpl1(
new CounterHooks(follower1->GetFaultHooks()));
// Replace the default fault hooks on the replicas with counter hooks
// before we start the configuration.
follower0->SetFaultHooks(counter_hook_rpl0);
follower1->SetFaultHooks(counter_hook_rpl1);
ASSERT_OK(StartPeers());
scoped_refptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
ASSERT_OK(leader->EmulateElection());
// Wait for the config round to get committed and count the number
// of update calls, calls after that will be heartbeats.
OpId config_round;
config_round.set_term(1);
config_round.set_index(1);
WaitForCommitIfNotAlreadyPresent(config_round, kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(config_round, kFollower1Idx, kLeaderIdx);
int repl0_init_count = counter_hook_rpl0->num_pre_update_calls();
int repl1_init_count = counter_hook_rpl1->num_pre_update_calls();
// Now wait for about 4 times the hearbeat period the counters
// should have increased 3/4 times.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4));
int repl0_final_count = counter_hook_rpl0->num_pre_update_calls();
int repl1_final_count = counter_hook_rpl1->num_pre_update_calls();
ASSERT_GE(repl0_final_count - repl0_init_count, 3);
ASSERT_LE(repl0_final_count - repl0_init_count, 4);
ASSERT_GE(repl1_final_count - repl1_init_count, 3);
ASSERT_LE(repl1_final_count - repl1_init_count, 4);
VerifyLogs(2, 0, 1);
}
// After creating the initial configuration, this test writes a small sequence
// of messages to the initial leader. It then shuts down the current
// leader, makes another peer become leader and writes a sequence of
// messages to it. The new leader and the follower should agree on the
// sequence of messages.
TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
const int kInitialNumPeers = 5;
ASSERT_OK(BuildAndStartConfig(kInitialNumPeers));
OpId last_op_id;
shared_ptr<Synchronizer> last_commit_sync;
vector<scoped_refptr<ConsensusRound> > rounds;
// Loop twice, successively shutting down the previous leader.
for (int current_config_size = kInitialNumPeers;
current_config_size >= kInitialNumPeers - 1;
current_config_size--) {
REPLICATE_SEQUENCE_OF_MESSAGES(10,
current_config_size - 1, // The index of the leader.
WAIT_FOR_ALL_REPLICAS,
COMMIT_ONE_BY_ONE,
&last_op_id,
&rounds,
&last_commit_sync);
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
for (int i = 0; i < current_config_size - 1; i++) {
WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 1);
}
// Now shutdown the current leader.
LOG(INFO) << "Shutting down current leader with index " << (current_config_size - 1);
scoped_refptr<RaftConsensus> current_leader;
CHECK_OK(peers_->GetPeerByIdx(current_config_size - 1, &current_leader));
current_leader->Shutdown();
peers_->RemovePeer(current_leader->peer_uuid());
// ... and make the peer before it become leader.
scoped_refptr<RaftConsensus> new_leader;
CHECK_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader));
// This will force an election in which we expect to make the last
// non-shutdown peer in the list become leader.
LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE));
WaitUntilLeaderForTests(new_leader.get());
LOG(INFO) << "Election won";
// ... replicating a set of messages to the new leader should now be possible.
REPLICATE_SEQUENCE_OF_MESSAGES(10,
current_config_size - 2, // The index of the new leader.
WAIT_FOR_MAJORITY,
COMMIT_ONE_BY_ONE,
&last_op_id,
&rounds,
&last_commit_sync);
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
for (int i = 0; i < current_config_size - 2; i++) {
WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 2);
}
}
// We can only verify the logs of the peers that were not killed, due to the
// old leaders being out-of-date now.
VerifyLogs(2, 0, 1);
}
TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
shared_ptr<Synchronizer> last_commit_sync;
vector<scoped_refptr<ConsensusRound> > rounds;
REPLICATE_SEQUENCE_OF_MESSAGES(10,
2, // The index of the initial leader.
WAIT_FOR_ALL_REPLICAS,
COMMIT_ONE_BY_ONE,
&last_op_id,
&rounds,
&last_commit_sync);
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
// Now replicas should only accept operations with
// 'last_id' as the preceding id.
ConsensusRequestPB req;
ConsensusResponsePB resp;
scoped_refptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(2, &leader));
scoped_refptr<RaftConsensus> follower;
CHECK_OK(peers_->GetPeerByIdx(0, &follower));
req.set_caller_uuid(leader->peer_uuid());
req.set_caller_term(last_op_id.term());
req.mutable_preceding_id()->CopyFrom(last_op_id);
req.mutable_committed_index()->CopyFrom(last_op_id);
ReplicateMsg* replicate = req.add_ops();
replicate->set_timestamp(clock_->Now().ToUint64());
OpId* id = replicate->mutable_id();
id->set_term(last_op_id.term());
id->set_index(last_op_id.index() + 1);
replicate->set_op_type(NO_OP);
// Appending this message to peer0 should work and update
// its 'last_received' to 'id'.
ASSERT_OK(follower->Update(&req, &resp));
ASSERT_TRUE(OpIdEquals(resp.status().last_received(), *id));
// Now skip one message in the same term. The replica should
// complain with the right error message.
req.mutable_preceding_id()->set_index(id->index() + 1);
id->set_index(id->index() + 2);
// Appending this message to peer0 should return a Status::OK
// but should contain an error referring to the log matching property.
ASSERT_OK(follower->Update(&req, &resp));
ASSERT_TRUE(resp.has_status());
ASSERT_TRUE(resp.status().has_error());
ASSERT_EQ(resp.status().error().code(), ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
ASSERT_STR_CONTAINS(resp.status().error().status().message(),
"Log matching property violated");
}
// Test that RequestVote performs according to "spec".
TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
shared_ptr<Synchronizer> last_commit_sync;
vector<scoped_refptr<ConsensusRound> > rounds;
REPLICATE_SEQUENCE_OF_MESSAGES(10,
2, // The index of the initial leader.
WAIT_FOR_ALL_REPLICAS,
COMMIT_ONE_BY_ONE,
&last_op_id,
&rounds,
&last_commit_sync);
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
// Ensure last-logged OpId is > (0,0).
ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id));
const int kPeerIndex = 1;
scoped_refptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer));
VoteRequestPB request;
request.set_tablet_id(kTestTablet);
request.mutable_candidate_status()->mutable_last_received()->CopyFrom(last_op_id);
// Test that the replica won't vote since it has recently heard from
// a valid leader.
VoteResponsePB response;
request.set_candidate_uuid("peer-0");
request.set_candidate_term(last_op_id.term() + 1);
ASSERT_OK(peer->RequestVote(&request, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, response.consensus_error().code());
// Test that replicas only vote yes for a single peer per term.
// Indicate that replicas should vote even if they think another leader is alive.
// This will allow the rest of the requests in the test to go through.
request.set_ignore_live_leader(true);
ASSERT_OK(peer->RequestVote(&request, &response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0"));
// Ensure we get same response for same term and same UUID.
response.Clear();
ASSERT_OK(peer->RequestVote(&request, &response));
ASSERT_TRUE(response.vote_granted());
// Ensure we get a "no" for a different candidate UUID for that term.
response.Clear();
request.set_candidate_uuid("peer-2");
ASSERT_OK(peer->RequestVote(&request, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code());
ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0"));
//
// Test that replicas refuse votes for an old term.
//
// Increase the term of our candidate, which will cause the voter replica to
// increase its own term to match.
request.set_candidate_uuid("peer-0");
request.set_candidate_term(last_op_id.term() + 2);
response.Clear();
ASSERT_OK(peer->RequestVote(&request, &response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0"));
// Now try the old term.
// Note: Use the peer who "won" the election on the previous term (peer-0),
// although in practice the impl does not store historical vote data.
request.set_candidate_term(last_op_id.term() + 1);
response.Clear();
ASSERT_OK(peer->RequestVote(&request, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0"));
//
// Ensure replicas vote no for an old op index.
//
request.set_candidate_uuid("peer-0");
request.set_candidate_term(last_op_id.term() + 3);
request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
response.Clear();
ASSERT_OK(peer->RequestVote(&request, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, response.consensus_error().code());
ASSERT_EQ(last_op_id.term() + 3, response.responder_term());
ASSERT_NO_FATAL_FAILURE(AssertDurableTermWithoutVote(kPeerIndex, last_op_id.term() + 3));
// Send a "heartbeat" to the peer. It should be rejected.
ConsensusRequestPB req;
req.set_caller_term(last_op_id.term());
req.set_caller_uuid("peer-0");
req.mutable_committed_index()->CopyFrom(last_op_id);
ConsensusResponsePB res;
Status s = peer->Update(&req, &res);
ASSERT_EQ(last_op_id.term() + 3, res.responder_term());
ASSERT_TRUE(res.status().has_error());
ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, res.status().error().code());
LOG(INFO) << "Follower rejected old heartbeat, as expected: " << res.ShortDebugString();
}
} // namespace consensus
} // namespace kudu