blob: 1a1994a1079e8262b896458e5f623a6a231d85ef [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus-test-util.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/consensus_queue.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/util/async_util.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
#include "kudu/util/threadpool.h"
DECLARE_int32(raft_heartbeat_interval_ms);
DECLARE_bool(enable_leader_failure_detection);
METRIC_DECLARE_entity(tablet);
using kudu::log::Log;
using kudu::log::LogEntryPB;
using kudu::log::LogOptions;
using kudu::log::LogReader;
using kudu::pb_util::SecureShortDebugString;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
using strings::SubstituteAndAppend;
namespace kudu {
namespace consensus {
const char* kTestTablet = "TestTablet";
void DoNothing(const string& s) {
}
Status WaitUntilLeaderForTests(RaftConsensus* raft) {
MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(15);
while (MonoTime::Now() < deadline) {
if (raft->role() == RaftPeerPB::LEADER) {
return Status::OK();
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
return Status::TimedOut("Timed out waiting to become leader");
}
// Test suite for tests that focus on multiple peer interaction, but
// without integrating with other components, such as ops.
class RaftConsensusQuorumTest : public KuduTest {
public:
typedef vector<unique_ptr<LogEntryPB>> LogEntries;
RaftConsensusQuorumTest()
: clock_(Timestamp(1)),
metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-test")),
schema_(GetSimpleTestSchema()) {
options_.tablet_id = kTestTablet;
FLAGS_enable_leader_failure_detection = false;
CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
}
// Builds an initial configuration of 'num' elements.
// All of the peers start as followers.
void BuildInitialRaftConfigPB(int num) {
config_ = BuildRaftConfigPB(num);
config_.set_opid_index(kInvalidOpIdIndex);
peers_.reset(new TestPeerMapManager(config_));
}
Status BuildFsManagersAndLogs(int num) {
// Build the fsmanagers and logs
for (int i = 0; i < num; i++) {
shared_ptr<MemTracker> parent_mem_tracker =
MemTracker::CreateTracker(-1, Substitute("peer-$0", i));
parent_mem_trackers_.push_back(parent_mem_tracker);
string test_path = GetTestPath(Substitute("peer-$0-root", i));
FsManagerOpts opts;
opts.parent_mem_tracker = parent_mem_tracker;
opts.wal_root = test_path;
opts.data_roots = { test_path };
unique_ptr<FsManager> fs_manager(new FsManager(env_, opts));
RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout());
RETURN_NOT_OK(fs_manager->Open());
scoped_refptr<Log> log;
RETURN_NOT_OK(Log::Open(LogOptions(),
fs_manager.get(),
/*file_cache*/nullptr,
kTestTablet,
schema_,
0, // schema_version
/*metric_entity*/nullptr,
&log));
logs_.emplace_back(std::move(log));
fs_managers_.push_back(std::move(fs_manager));
}
return Status::OK();
}
// Builds a configuration of 'num' voters.
RaftConfigPB BuildRaftConfigPB(int num) {
RaftConfigPB raft_config;
for (int i = 0; i < num; i++) {
RaftPeerPB* peer_pb = raft_config.add_peers();
peer_pb->set_member_type(RaftPeerPB::VOTER);
peer_pb->set_permanent_uuid(fs_managers_[i]->uuid());
HostPortPB* hp = peer_pb->mutable_last_known_addr();
hp->set_host(Substitute("peer-$0.fake-domain-for-tests", i));
hp->set_port(0);
}
return raft_config;
}
Status BuildPeers() {
CHECK_EQ(config_.peers_size(), fs_managers_.size());
for (int i = 0; i < config_.peers_size(); i++) {
FsManager* fs = fs_managers_[i].get();
scoped_refptr<ConsensusMetadataManager> cmeta_manager(
new ConsensusMetadataManager(fs));
RETURN_NOT_OK(cmeta_manager->Create(kTestTablet, config_, kMinimumTerm));
RaftPeerPB* local_peer_pb;
RETURN_NOT_OK(GetRaftConfigMember(&config_, fs->uuid(), &local_peer_pb));
shared_ptr<RaftConsensus> peer;
ServerContext ctx({ /*quiescing*/nullptr,
/*num_leaders*/nullptr,
raft_pool_.get() });
RETURN_NOT_OK(RaftConsensus::Create(options_,
config_.peers(i),
std::move(cmeta_manager),
std::move(ctx),
&peer));
peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
}
return Status::OK();
}
Status StartPeers() {
ConsensusBootstrapInfo boot_info;
TestPeerMap all_peers = peers_->GetPeerMapCopy();
for (int i = 0; i < config_.peers_size(); i++) {
shared_ptr<RaftConsensus> peer;
RETURN_NOT_OK(peers_->GetPeerByIdx(i, &peer));
unique_ptr<PeerProxyFactory> proxy_factory(
new LocalTestPeerProxyFactory(peers_.get()));
unique_ptr<TimeManager> time_manager(
new TimeManager(&clock_, Timestamp::kMin));
unique_ptr<TestOpFactory> op_factory(
new TestOpFactory(logs_[i].get()));
op_factory->SetConsensus(peer.get());
op_factories_.emplace_back(std::move(op_factory));
RETURN_NOT_OK(peer->Start(
boot_info,
std::move(proxy_factory),
logs_[i],
std::move(time_manager),
op_factories_.back().get(),
metric_entity_,
&DoNothing));
}
return Status::OK();
}
Status BuildConfig(int num) {
RETURN_NOT_OK(BuildFsManagersAndLogs(num));
BuildInitialRaftConfigPB(num);
RETURN_NOT_OK(BuildPeers());
return Status::OK();
}
Status BuildAndStartConfig(int num) {
RETURN_NOT_OK(BuildConfig(num));
RETURN_NOT_OK(StartPeers());
// Automatically elect the last node in the list.
const int kLeaderIdx = num - 1;
shared_ptr<RaftConsensus> leader;
RETURN_NOT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
RETURN_NOT_OK(leader->EmulateElectionForTests());
return Status::OK();
}
LocalTestPeerProxy* GetLeaderProxyToPeer(int peer_idx, int leader_idx) {
shared_ptr<RaftConsensus> follower;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &follower));
shared_ptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
for (LocalTestPeerProxy* proxy : down_cast<LocalTestPeerProxyFactory*>(
leader->peer_proxy_factory_.get())->GetProxies()) {
if (proxy->GetTarget() == follower->peer_uuid()) {
return proxy;
}
}
CHECK(false) << "Proxy not found";
return nullptr;
}
Status AppendDummyMessage(int peer_idx,
scoped_refptr<ConsensusRound>* round) {
unique_ptr<ReplicateMsg> msg(new ReplicateMsg());
msg->set_op_type(NO_OP);
msg->mutable_noop_request();
msg->set_timestamp(clock_.Now().ToUint64());
shared_ptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
// Use a latch in place of a op callback.
unique_ptr<Synchronizer> sync(new Synchronizer());
*round = peer->NewRound(std::move(msg), sync->AsStatusCallback());
EmplaceOrDie(&syncs_, round->get(), std::move(sync));
RETURN_NOT_OK_PREPEND(peer->Replicate(round->get()),
Substitute("Unable to replicate to peer $0", peer_idx));
return Status::OK();
}
static void FireSharedSynchronizer(const shared_ptr<Synchronizer>& sync, const Status& s) {
sync->StatusCB(s);
}
Status CommitDummyMessage(int peer_idx,
ConsensusRound* round,
shared_ptr<Synchronizer>* commit_sync = nullptr) {
StatusCallback commit_callback;
if (commit_sync != nullptr) {
shared_ptr<Synchronizer> sync(std::make_shared<Synchronizer>());
commit_callback = [sync](const Status& s) { FireSharedSynchronizer(sync, s); };
*commit_sync = std::move(sync);
} else {
commit_callback = &DoNothingStatusCB;
}
CommitMsg msg;
msg.set_op_type(NO_OP);
msg.mutable_commited_op_id()->CopyFrom(round->id());
CHECK_OK(logs_[peer_idx]->AsyncAppendCommit(msg, commit_callback));
return Status::OK();
}
Status WaitForReplicate(ConsensusRound* round) {
return FindOrDie(syncs_, round)->Wait();
}
Status TimedWaitForReplicate(ConsensusRound* round, const MonoDelta& delta) {
return FindOrDie(syncs_, round)->WaitFor(delta);
}
void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) {
shared_ptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
while (true) {
if (OpIdCompare(peer->queue_->GetLastOpIdInLog(), to_wait_for) >= 0) {
return;
}
SleepFor(MonoDelta::FromMilliseconds(1));
}
}
// Waits for an operation to be (database) committed in the replica at index
// 'peer_idx'. If the operation was already committed this returns immediately.
void WaitForCommitIfNotAlreadyPresent(int64_t to_wait_for,
int peer_idx,
int leader_idx) {
MonoDelta timeout(MonoDelta::FromSeconds(10));
MonoTime start(MonoTime::Now());
shared_ptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
int backoff_exp = 0;
const int kMaxBackoffExp = 8;
OpId committed = MinimumOpId();
while (true) {
boost::optional<OpId> opt_committed = peer->GetLastOpId(COMMITTED_OPID);
if (opt_committed) {
committed = *opt_committed;
if (committed.index() >= to_wait_for) {
return;
}
}
if (MonoTime::Now() > (start + timeout)) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(1LL << backoff_exp));
backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
}
LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while waiting for commit of "
<< "op " << to_wait_for << " on replica. Last committed op on replica: "
<< committed.index() << ". Dumping state and quitting.";
vector<string> lines;
shared_ptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
for (const string& line : lines) {
LOG(ERROR) << line;
}
// Gather the replica and leader operations for printing
LogEntries replica_ops;
GatherLogEntries(peer_idx, logs_[peer_idx], &replica_ops);
LogEntries leader_ops;
GatherLogEntries(leader_idx, logs_[leader_idx], &leader_ops);
SCOPED_TRACE(PrintOnError(replica_ops, Substitute("local peer ($0)", peer->peer_uuid())));
SCOPED_TRACE(PrintOnError(leader_ops, Substitute("leader (peer-$0)", leader_idx)));
FAIL() << "Replica did not commit.";
}
// Used in ReplicateSequenceOfMessages() to specify whether
// we should wait for all replicas to have replicated the
// sequence or just a majority.
enum ReplicateWaitMode {
WAIT_FOR_ALL_REPLICAS,
WAIT_FOR_MAJORITY
};
// Used in ReplicateSequenceOfMessages() to specify whether
// we should also commit the messages in the sequence
enum CommitMode {
DONT_COMMIT,
COMMIT_ONE_BY_ONE
};
// Replicates a sequence of messages to the peer passed as leader.
// Optionally waits for the messages to be replicated to followers.
// 'last_op_id' is set to the id of the last replicated operation.
// The operations are only committed if 'commit_one_by_one' is true.
void ReplicateSequenceOfMessages(int seq_size,
int leader_idx,
ReplicateWaitMode wait_mode,
CommitMode commit_mode,
OpId* last_op_id,
vector<scoped_refptr<ConsensusRound>>* rounds,
shared_ptr<Synchronizer>* commit_sync = nullptr) {
for (int i = 0; i < seq_size; i++) {
scoped_refptr<ConsensusRound> round;
ASSERT_OK(AppendDummyMessage(leader_idx, &round));
ASSERT_OK(WaitForReplicate(round.get()));
last_op_id->CopyFrom(round->id());
if (commit_mode == COMMIT_ONE_BY_ONE) {
CommitDummyMessage(leader_idx, round.get(), commit_sync);
}
rounds->push_back(round);
}
if (wait_mode == WAIT_FOR_ALL_REPLICAS) {
shared_ptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
TestPeerMap all_peers = peers_->GetPeerMapCopy();
int i = 0;
for (const TestPeerMap::value_type& entry : all_peers) {
if (entry.second->peer_uuid() != leader->peer_uuid()) {
WaitForReplicateIfNotAlreadyPresent(*last_op_id, i);
}
i++;
}
}
}
void GatherLogEntries(int idx, const scoped_refptr<Log>& log,
LogEntries* entries) {
ASSERT_OK(log->WaitUntilAllFlushed());
log->Close();
shared_ptr<LogReader> log_reader;
ASSERT_OK(log::LogReader::Open(fs_managers_[idx].get(),
/*index*/nullptr,
kTestTablet,
metric_entity_.get(),
/*file_cache*/nullptr,
&log_reader));
log::SegmentSequence segments;
log_reader->GetSegmentsSnapshot(&segments);
LogEntries ret;
for (const log::SegmentSequence::value_type& entry : segments) {
ASSERT_OK(entry->ReadEntries(&ret));
}
*entries = std::move(ret);
}
// Verifies that the replica's log match the leader's. This deletes the
// peers (so we're sure that no further writes occur) and closes the logs
// so it must be the very last thing to run, in a test.
void VerifyLogs(int leader_idx, int first_replica_idx, int last_replica_idx) {
// Wait for in-flight ops to be done. We're destroying the
// peers next and leader ops won't be able to commit anymore.
for (const auto& factory : op_factories_) {
factory->WaitDone();
}
// Shut down all the peers.
TestPeerMap all_peers = peers_->GetPeerMapCopy();
for (const TestPeerMap::value_type& entry : all_peers) {
entry.second->Shutdown();
}
LogEntries leader_entries;
GatherLogEntries(leader_idx, logs_[leader_idx], &leader_entries);
shared_ptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) {
LogEntries replica_entries;
GatherLogEntries(replica_idx, logs_[replica_idx], &replica_entries);
shared_ptr<RaftConsensus> replica;
CHECK_OK(peers_->GetPeerByIdx(replica_idx, &replica));
VerifyReplica(leader_entries,
replica_entries,
leader->peer_uuid(),
replica->peer_uuid());
}
}
void ExtractReplicateIds(const LogEntries& entries,
vector<OpId>* ids) {
ids->reserve(entries.size() / 2);
for (const auto& entry : entries) {
if (entry->has_replicate()) {
ids->push_back(entry->replicate().id());
}
}
}
void VerifyReplicateOrderMatches(const LogEntries& leader_entries,
const LogEntries& replica_entries) {
vector<OpId> leader_ids, replica_ids;
ExtractReplicateIds(leader_entries, &leader_ids);
ExtractReplicateIds(replica_entries, &replica_ids);
ASSERT_EQ(leader_ids.size(), replica_ids.size());
for (int i = 0; i < leader_ids.size(); i++) {
ASSERT_EQ(SecureShortDebugString(leader_ids[i]),
SecureShortDebugString(replica_ids[i]));
}
}
void VerifyNoCommitsBeforeReplicates(const LogEntries& entries) {
std::unordered_set<OpId, OpIdHashFunctor, OpIdEqualsFunctor> replication_ops;
for (const auto& entry : entries) {
if (entry->has_replicate()) {
ASSERT_TRUE(InsertIfNotPresent(&replication_ops, entry->replicate().id()))
<< "REPLICATE op id showed up twice: " << SecureShortDebugString(*entry);
} else if (entry->has_commit()) {
ASSERT_EQ(1, replication_ops.erase(entry->commit().commited_op_id()))
<< "COMMIT came before associated REPLICATE: " << SecureShortDebugString(*entry);
}
}
}
void VerifyReplica(const LogEntries& leader_entries,
const LogEntries& replica_entries,
const string& leader_name,
const string& replica_name) {
SCOPED_TRACE(PrintOnError(leader_entries, Substitute("Leader: $0", leader_name)));
SCOPED_TRACE(PrintOnError(replica_entries, Substitute("Replica: $0", replica_name)));
// Check that the REPLICATE messages come in the same order on both nodes.
VerifyReplicateOrderMatches(leader_entries, replica_entries);
// Check that no COMMIT precedes its related REPLICATE on both the replica
// and leader.
VerifyNoCommitsBeforeReplicates(replica_entries);
VerifyNoCommitsBeforeReplicates(leader_entries);
}
string PrintOnError(const LogEntries& replica_entries,
const string& replica_id) {
string ret = "";
SubstituteAndAppend(&ret, "$1 log entries for replica $0:\n",
replica_id, replica_entries.size());
for (const auto& replica_entry : replica_entries) {
StrAppend(&ret, "Replica log entry: ", SecureShortDebugString(*replica_entry), "\n");
}
return ret;
}
// Read the ConsensusMetadata for the given peer from disk.
scoped_refptr<ConsensusMetadata> ReadConsensusMetadataFromDisk(int peer_index) {
FsManager* fs = fs_managers_[peer_index].get();
scoped_refptr<ConsensusMetadata> cmeta;
CHECK_OK(ConsensusMetadata::Load(fs, kTestTablet, fs->uuid(), &cmeta));
return cmeta;
}
// Assert that the durable term == term and that the peer that got the vote == voted_for.
void AssertDurableTermAndVote(int peer_index, int64_t term, const std::string& voted_for) {
scoped_refptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
ASSERT_EQ(term, cmeta->current_term());
ASSERT_EQ(voted_for, cmeta->voted_for());
}
// Assert that the durable term == term and that the peer has not yet voted.
void AssertDurableTermWithoutVote(int peer_index, int64_t term) {
scoped_refptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
ASSERT_EQ(term, cmeta->current_term());
ASSERT_FALSE(cmeta->has_voted_for());
}
~RaftConsensusQuorumTest() {
peers_->Clear();
}
protected:
ConsensusOptions options_;
RaftConfigPB config_;
OpId initial_id_;
vector<shared_ptr<MemTracker>> parent_mem_trackers_;
vector<unique_ptr<FsManager>> fs_managers_;
vector<scoped_refptr<Log> > logs_;
unique_ptr<ThreadPool> raft_pool_;
unique_ptr<TestPeerMapManager> peers_;
vector<unique_ptr<TestOpFactory>> op_factories_;
clock::LogicalClock clock_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
const Schema schema_;
std::unordered_map<ConsensusRound*, unique_ptr<Synchronizer>> syncs_;
};
// Tests Replicate/Commit a single message through the leader.
TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitMessage) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
vector<scoped_refptr<ConsensusRound>> rounds;
shared_ptr<Synchronizer> commit_sync;
NO_FATALS(ReplicateSequenceOfMessages(
1, kLeaderIdx, WAIT_FOR_ALL_REPLICAS, DONT_COMMIT,
&last_op_id, &rounds, &commit_sync));
// Commit the operation
ASSERT_OK(CommitDummyMessage(kLeaderIdx, rounds[0].get(), &commit_sync));
// Wait for everyone to commit the operations.
// We need to make sure the CommitMsg lands on the leaders log or the
// verification will fail. Since CommitMsgs are appended to the replication
// queue there is a scenario where they land in the followers log before
// landing on the leader's log. However we know that they are durable
// on the leader when the commit callback is triggered.
// We thus wait for the commit callback to trigger, ensuring durability
// on the leader and then for the commits to be present on the replicas.
ASSERT_OK(commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
// Tests Replicate/Commit a sequence of messages through the leader.
// First a sequence of replicates and then a sequence of commits.
TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitSequence) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
int seq_size = AllowSlowTests() ? 1000 : 100;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
vector<scoped_refptr<ConsensusRound>> rounds;
shared_ptr<Synchronizer> commit_sync;
NO_FATALS(ReplicateSequenceOfMessages(
seq_size, kLeaderIdx, WAIT_FOR_ALL_REPLICAS, DONT_COMMIT,
&last_op_id, &rounds, &commit_sync));
// Commit the operations, but wait for the replicates to finish first
for (const scoped_refptr<ConsensusRound>& round : rounds) {
ASSERT_OK(CommitDummyMessage(kLeaderIdx, round.get(), &commit_sync));
}
// See comment at the end of TestFollowersReplicateAndCommitMessage
// for an explanation on this waiting sequence.
ASSERT_OK(commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_replicate;
vector<scoped_refptr<ConsensusRound>> rounds;
{
// lock one of the replicas down by obtaining the state lock
// and never letting it go.
shared_ptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
RaftConsensus::LockGuard l(follower0->lock_);
// If the locked replica would stop consensus we would hang here
// as we wait for operations to be replicated to a majority.
NO_FATALS(ReplicateSequenceOfMessages(
10, kLeaderIdx, WAIT_FOR_MAJORITY, COMMIT_ONE_BY_ONE,
&last_replicate, &rounds));
// Follower 1 should be fine (Were we to wait for follower0's replicate
// this would hang here). We know he must have replicated but make sure
// by calling Wait().
WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower1Idx);
WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower1Idx, kLeaderIdx);
}
// After we let the lock go the remaining follower should get up-to-date
WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower0Idx);
WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower0Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
scoped_refptr<ConsensusRound> round;
{
// lock two of the replicas down by obtaining the state locks
// and never letting them go.
shared_ptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
RaftConsensus::LockGuard l_0(follower0->lock_);
shared_ptr<RaftConsensus> follower1;
CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
RaftConsensus::LockGuard l_1(follower1->lock_);
// Append a single message to the queue
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
last_op_id.CopyFrom(round->id());
// This should timeout.
Status status = TimedWaitForReplicate(round.get(), MonoDelta::FromMilliseconds(500));
ASSERT_TRUE(status.IsTimedOut());
}
// After we release the locks the operation should replicate to all replicas
// and we commit.
ASSERT_OK(WaitForReplicate(round.get()));
CommitDummyMessage(kLeaderIdx, round.get());
// Assert that everything was ok
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
// If some communication error happens the leader will resend the request to the
// peers. This tests that the peers handle repeated requests.
TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
// Append a dummy message, with faults injected on the first attempt
// to send the message.
scoped_refptr<ConsensusRound> round;
GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
// We should successfully replicate it due to retries.
ASSERT_OK(WaitForReplicate(round.get()));
GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
ASSERT_OK(CommitDummyMessage(kLeaderIdx, round.get()));
// The commit should eventually reach both followers as well.
last_op_id = round->id();
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
// Append a sequence of messages, and keep injecting errors into the
// replica proxies.
vector<scoped_refptr<ConsensusRound>> rounds;
shared_ptr<Synchronizer> commit_sync;
for (int i = 0; i < 100; i++) {
scoped_refptr<ConsensusRound> round;
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
ConsensusRound* round_ptr = round.get();
last_op_id.CopyFrom(round->id());
rounds.push_back(round);
// inject comm faults
if (i % 2 == 0) {
GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
} else {
GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
}
ASSERT_OK(WaitForReplicate(round_ptr));
ASSERT_OK(CommitDummyMessage(kLeaderIdx, round_ptr, &commit_sync));
}
// Assert last operation was correctly replicated and committed.
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
// See comment at the end of TestFollowersReplicateAndCommitMessage
// for an explanation on this waiting sequence.
ASSERT_OK(commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
VerifyLogs(2, 0, 1);
}
// In this test we test the ability of the leader to send heartbeats
// to replicas by simply pushing nothing after the configuration round
// and still expecting for the replicas' Update() method to be called.
TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
// Constants with the indexes of peers with certain roles,
// since peers don't change roles in this test.
const int kFollower0Idx = 0;
const int kFollower1Idx = 1;
const int kLeaderIdx = 2;
ASSERT_OK(BuildConfig(3));
shared_ptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
shared_ptr<RaftConsensus> follower1;
CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
ASSERT_OK(StartPeers());
shared_ptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
ASSERT_OK(leader->EmulateElectionForTests());
// Wait for the config round to get committed and count the number
// of update calls, calls after that will be heartbeats.
OpId config_round;
config_round.set_term(1);
config_round.set_index(1);
WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower0Idx, kLeaderIdx);
WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower1Idx, kLeaderIdx);
int repl0_init_count = follower0->update_calls_for_tests();
int repl1_init_count = follower1->update_calls_for_tests();
// Now wait for about 4 times the heartbeat period; the counters
// should have increased between 3 to 8 times.
//
// Why the variance? Heartbeat timing is jittered such that the period
// between heartbeats can be anywhere from 3/4 to 5/4 the interval.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4LL));
int repl0_final_count = follower0->update_calls_for_tests();
int repl1_final_count = follower1->update_calls_for_tests();
ASSERT_GE(repl0_final_count - repl0_init_count, 3);
ASSERT_LE(repl0_final_count - repl0_init_count, 8);
ASSERT_GE(repl1_final_count - repl1_init_count, 3);
ASSERT_LE(repl1_final_count - repl1_init_count, 8);
VerifyLogs(2, 0, 1);
}
// After creating the initial configuration, this test writes a small sequence
// of messages to the initial leader. It then shuts down the current
// leader, makes another peer become leader and writes a sequence of
// messages to it. The new leader and the follower should agree on the
// sequence of messages.
TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
const int kInitialNumPeers = 5;
ASSERT_OK(BuildAndStartConfig(kInitialNumPeers));
OpId last_op_id;
shared_ptr<Synchronizer> last_commit_sync;
vector<scoped_refptr<ConsensusRound>> rounds;
// Loop twice, successively shutting down the previous leader.
for (int current_config_size = kInitialNumPeers;
current_config_size >= kInitialNumPeers - 1;
current_config_size--) {
NO_FATALS(ReplicateSequenceOfMessages(
10, current_config_size - 1, WAIT_FOR_ALL_REPLICAS, COMMIT_ONE_BY_ONE,
&last_op_id, &rounds, &last_commit_sync));
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
for (int i = 0; i < current_config_size - 1; i++) {
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, current_config_size - 1);
}
// Now shutdown the current leader.
LOG(INFO) << "Shutting down current leader with index " << (current_config_size - 1);
shared_ptr<RaftConsensus> current_leader;
CHECK_OK(peers_->GetPeerByIdx(current_config_size - 1, &current_leader));
current_leader->Shutdown();
peers_->RemovePeer(current_leader->peer_uuid());
// ... and make the peer before it become leader.
shared_ptr<RaftConsensus> new_leader;
CHECK_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader));
// This will force an election in which we expect to make the last
// non-shutdown peer in the list become leader.
int64_t flush_count_before =
new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
ASSERT_OK(new_leader->StartElection(RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
RaftConsensus::EXTERNAL_REQUEST));
WaitUntilLeaderForTests(new_leader.get());
LOG(INFO) << "Election won";
int64_t flush_count_after =
new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
ASSERT_EQ(flush_count_after, flush_count_before + 1)
<< "Expected only one consensus metadata flush for a leader election";
// ... replicating a set of messages to the new leader should now be possible.
NO_FATALS(ReplicateSequenceOfMessages(
10, current_config_size - 2, WAIT_FOR_MAJORITY, COMMIT_ONE_BY_ONE,
&last_op_id, &rounds, &last_commit_sync));
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
for (int i = 0; i < current_config_size - 2; i++) {
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, current_config_size - 2);
}
}
// We can only verify the logs of the peers that were not killed, due to the
// old leaders being out-of-date now.
VerifyLogs(2, 0, 1);
}
TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
shared_ptr<Synchronizer> last_commit_sync;
vector<scoped_refptr<ConsensusRound>> rounds;
NO_FATALS(ReplicateSequenceOfMessages(
10, 2, WAIT_FOR_ALL_REPLICAS, COMMIT_ONE_BY_ONE,
&last_op_id, &rounds, &last_commit_sync));
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2);
// Now replicas should only accept operations with
// 'last_id' as the preceding id.
ConsensusRequestPB req;
ConsensusResponsePB resp;
shared_ptr<RaftConsensus> leader;
CHECK_OK(peers_->GetPeerByIdx(2, &leader));
shared_ptr<RaftConsensus> follower;
CHECK_OK(peers_->GetPeerByIdx(0, &follower));
req.set_caller_uuid(leader->peer_uuid());
req.set_caller_term(last_op_id.term());
req.mutable_preceding_id()->CopyFrom(last_op_id);
req.set_committed_index(last_op_id.index());
req.set_all_replicated_index(0);
// Send a request with the next index.
ReplicateMsg* replicate = req.add_ops();
replicate->set_timestamp(clock_.Now().ToUint64());
OpId* id = replicate->mutable_id();
id->set_term(last_op_id.term());
id->set_index(last_op_id.index() + 1);
replicate->set_op_type(NO_OP);
// Since the req adds the next op, the leader must have also appended it.
req.set_last_idx_appended_to_leader(id->index());
// Appending this message to peer0 should work and update
// its 'last_received' to 'id'.
ASSERT_OK(follower->Update(&req, &resp));
ASSERT_TRUE(OpIdEquals(resp.status().last_received(), *id));
ASSERT_EQ(0, follower->queue_->metrics_.num_ops_behind_leader->value());
// Now skip one message in the same term. The replica should
// complain with the right error message.
req.mutable_preceding_id()->set_index(id->index() + 1);
id->set_index(id->index() + 2);
// Appending this message to peer0 should return a Status::OK
// but should contain an error referring to the log matching property.
ASSERT_OK(follower->Update(&req, &resp));
ASSERT_TRUE(resp.has_status());
ASSERT_TRUE(resp.status().has_error());
ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, resp.status().error().code());
ASSERT_STR_CONTAINS(resp.status().error().status().message(),
"Log matching property violated");
}
// Test that RequestVote performs according to "spec".
TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
ASSERT_OK(BuildAndStartConfig(3));
OpId last_op_id;
shared_ptr<Synchronizer> last_commit_sync;
vector<scoped_refptr<ConsensusRound>> rounds;
NO_FATALS(ReplicateSequenceOfMessages(
10, 2, WAIT_FOR_ALL_REPLICAS, COMMIT_ONE_BY_ONE,
&last_op_id, &rounds, &last_commit_sync));
// Make sure the last operation is committed everywhere
ASSERT_OK(last_commit_sync->Wait());
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2);
WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2);
// Ensure last-logged OpId is > (0,0).
ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id));
const int kPeerIndex = 1;
shared_ptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer));
auto flush_count = [&]() {
return peer->consensus_metadata_for_tests()->flush_count_for_tests();
};
VoteRequestPB request;
request.set_tablet_id(kTestTablet);
request.mutable_candidate_status()->mutable_last_received()->CopyFrom(last_op_id);
// Test that the replica won't vote since it has recently heard from
// a valid leader.
int flush_count_before = flush_count();
VoteResponsePB response;
request.set_candidate_uuid(fs_managers_[0]->uuid());
request.set_candidate_term(last_op_id.term() + 1);
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_FALSE(response.vote_granted());
ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, response.consensus_error().code());
ASSERT_EQ(0, flush_count() - flush_count_before)
<< "A rejected vote should not flush metadata";
// Test that replicas only vote yes for a single peer per term.
// Indicate that replicas should vote even if they think another leader is alive.
// This will allow the rest of the requests in the test to go through.
flush_count_before = flush_count();
request.set_ignore_live_leader(true);
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1,
fs_managers_[0]->uuid()));
ASSERT_EQ(1, flush_count() - flush_count_before)
<< "A granted vote should flush only once";
// Ensure we get same response for same term and same UUID.
response.Clear();
flush_count_before = flush_count();
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(0, flush_count() - flush_count_before)
<< "Confirming a previous vote should not flush";
// Ensure we get a "no" for a different candidate UUID for that term.
flush_count_before = flush_count();
response.Clear();
request.set_candidate_uuid(fs_managers_[2]->uuid());
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code());
ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1,
fs_managers_[0]->uuid()));
ASSERT_EQ(0, flush_count() - flush_count_before)
<< "Rejected votes for same term should not flush";
//
// Test that replicas refuse votes for an old term.
//
// Increase the term of our candidate, which will cause the voter replica to
// increase its own term to match.
flush_count_before = flush_count();
request.set_candidate_uuid(fs_managers_[0]->uuid());
request.set_candidate_term(last_op_id.term() + 2);
response.Clear();
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
fs_managers_[0]->uuid()));
ASSERT_EQ(1, flush_count() - flush_count_before)
<< "Accepted votes with increased term should flush once";
// Now try the old term.
// Note: Use the peer who "won" the election on the previous term (peer 0),
// although in practice the impl does not store historical vote data.
flush_count_before = flush_count();
request.set_candidate_term(last_op_id.term() + 1);
response.Clear();
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
fs_managers_[0]->uuid()));
ASSERT_EQ(0, flush_count() - flush_count_before)
<< "Rejected votes for old terms should not flush";
// Ensure that replicas don't change term or flush any metadata for a pre-election
// request, even when they vote "yes".
flush_count_before = flush_count();
request.set_candidate_term(last_op_id.term() + 3);
request.set_is_pre_election(true);
response.Clear();
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_TRUE(response.vote_granted());
ASSERT_FALSE(response.has_consensus_error());
ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
fs_managers_[0]->uuid()));
ASSERT_EQ(0, flush_count() - flush_count_before)
<< "Pre-elections should not flush";
request.set_is_pre_election(false);
//
// Ensure replicas vote no for an old op index.
//
flush_count_before = flush_count();
request.set_candidate_uuid(fs_managers_[0]->uuid());
request.set_candidate_term(last_op_id.term() + 3);
request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
response.Clear();
ASSERT_OK(peer->RequestVote(&request,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, response.consensus_error().code());
ASSERT_EQ(last_op_id.term() + 3, response.responder_term());
NO_FATALS(AssertDurableTermWithoutVote(kPeerIndex, last_op_id.term() + 3));
ASSERT_EQ(1, flush_count() - flush_count_before)
<< "Rejected votes for old op index but new term should flush once.";
// Send a "heartbeat" to the peer. It should be rejected.
ConsensusRequestPB req;
req.set_caller_term(last_op_id.term());
req.set_caller_uuid(fs_managers_[0]->uuid());
req.set_committed_index(last_op_id.index());
req.set_all_replicated_index(0);
ConsensusResponsePB res;
Status s = peer->Update(&req, &res);
ASSERT_EQ(last_op_id.term() + 3, res.responder_term());
ASSERT_TRUE(res.status().has_error());
ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, res.status().error().code());
LOG(INFO) << "Follower rejected old heartbeat, as expected: " << SecureShortDebugString(res);
}
} // namespace consensus
} // namespace kudu