blob: 5b6f359ec363cf1bfe3610cfd0ee4a49e3649497 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/consensus_queue.h"
#include <atomic>
#include <cstdint>
#include <deque>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <gtest/gtest_prod.h>
#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus-test-util.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log-test-base.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/async_util.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
#include "kudu/util/threadpool.h"
DECLARE_int32(consensus_max_batch_size_bytes);
DECLARE_int32(follower_unavailable_considered_failed_sec);
DECLARE_double(consensus_fail_log_read_ops);
using kudu::consensus::HealthReportPB;
using std::atomic;
using std::deque;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace consensus {
static const char* kLeaderUuid = "peer-0";
static const char* kPeerUuid = "peer-1";
static const char* kTestTablet = "test-tablet";
class ConsensusQueueTest : public KuduTest {
public:
ConsensusQueueTest()
: schema_(GetSimpleTestSchema()),
metric_entity_server_(METRIC_ENTITY_server.Instantiate(
&metric_registry_, "consensus-queue-test::server")),
metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
&metric_registry_, "consensus-queue-test::tablet")),
registry_(new log::LogAnchorRegistry),
quiescing_(false),
allow_status_msg_for_failed_peer_(false) {
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
fs_manager_.reset(new FsManager(env_, FsManagerOpts(GetTestPath("fs_root"))));
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
CHECK_OK(log::Log::Open(log::LogOptions(),
fs_manager_.get(),
/*file_cache*/nullptr,
kTestTablet,
schema_,
/*schema_version*/0,
/*metric_entity*/nullptr,
&log_));
clock_.reset(new clock::HybridClock(metric_entity_server_));
ASSERT_OK(clock_->Init());
ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
time_manager_.reset(new TimeManager(clock_.get(), Timestamp::kMin));
CloseAndReopenQueue(MinimumOpId(), MinimumOpId());
}
void CloseAndReopenQueue(const OpId& replicated_opid, const OpId& committed_opid) {
queue_.reset(new PeerMessageQueue(
metric_entity_tablet_,
log_.get(),
time_manager_.get(),
FakeRaftPeerPB(kLeaderUuid),
kTestTablet,
raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
&quiescing_,
replicated_opid,
committed_opid,
&allow_status_msg_for_failed_peer_));
}
virtual void TearDown() OVERRIDE {
log_->WaitUntilAllFlushed();
queue_->Close();
}
Status AppendReplicateMsg(int term, int index, int payload_size) {
return queue_->AppendOperation(
make_scoped_refptr_replicate(CreateDummyReplicate(term,
index,
clock_->Now(),
payload_size).release()));
}
RaftPeerPB MakePeer(const std::string& peer_uuid,
RaftPeerPB::MemberType member_type) {
RaftPeerPB peer_pb;
*peer_pb.mutable_permanent_uuid() = peer_uuid;
peer_pb.set_member_type(member_type);
return peer_pb;
}
// Updates the peer's watermark in the queue so that it matches
// the operation we want, since the queue always assumes that
// when a peer gets tracked it's always tracked starting at the
// last operation in the queue
void UpdatePeerWatermarkToOp(ConsensusRequestPB* request,
ConsensusResponsePB* response,
const OpId& last_received,
const OpId& last_received_current_leader,
int last_committed_idx,
bool* send_more_immediately) {
queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
response->set_responder_uuid(kPeerUuid);
// Ask for a request. The queue assumes the peer is up-to-date so
// this should contain no operations.
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(request->ops_size(), 0);
// Refuse saying that the log matching property check failed and
// that our last operation is actually 'last_received'.
RefuseWithLogPropertyMismatch(response, last_received, last_received_current_leader);
response->mutable_status()->set_last_committed_idx(last_committed_idx);
*send_more_immediately = queue_->ResponseFromPeer(response->responder_uuid(), *response);
request->Clear();
response->mutable_status()->Clear();
}
// Like the above but uses the last received index as the committed index.
void UpdatePeerWatermarkToOp(ConsensusRequestPB* request,
ConsensusResponsePB* response,
const OpId& last_received,
const OpId& last_received_current_leader,
bool* send_more_immediately) {
return UpdatePeerWatermarkToOp(request, response, last_received,
last_received_current_leader,
last_received.index(), send_more_immediately);
}
void RefuseWithLogPropertyMismatch(ConsensusResponsePB* response,
const OpId& last_received,
const OpId& last_received_current_leader) {
ConsensusStatusPB* status = response->mutable_status();
status->mutable_last_received()->CopyFrom(last_received);
status->mutable_last_received_current_leader()->CopyFrom(last_received_current_leader);
ConsensusErrorPB* error = status->mutable_error();
error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
StatusToPB(Status::IllegalState("LMP failed."), error->mutable_status());
}
void WaitForLocalPeerToAckIndex(int index) {
while (true) {
const auto leader = queue_->GetTrackedPeerForTests(kLeaderUuid);
if (leader.last_received.index() >= index) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
}
// Sets the last received op on the response, as well as the last committed index.
void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response,
const OpId& last_received,
const OpId& last_received_current_leader,
int last_committed_idx) {
*response->mutable_status()->mutable_last_received() = last_received;
*response->mutable_status()->mutable_last_received_current_leader() =
last_received_current_leader;
response->mutable_status()->set_last_committed_idx(last_committed_idx);
}
// Like the above but uses the same last_received for current term.
void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response,
const OpId& last_received,
int last_committed_idx) {
SetLastReceivedAndLastCommitted(response, last_received, last_received, last_committed_idx);
}
// Like the above but just sets the last committed index to have the same index
// as the last received op.
void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response,
const OpId& last_received) {
SetLastReceivedAndLastCommitted(response, last_received, last_received.index());
}
protected:
const Schema schema_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_server_;
scoped_refptr<MetricEntity> metric_entity_tablet_;
unique_ptr<FsManager> fs_manager_;
scoped_refptr<log::Log> log_;
unique_ptr<ThreadPool> raft_pool_;
unique_ptr<TimeManager> time_manager_;
unique_ptr<PeerMessageQueue> queue_;
scoped_refptr<log::LogAnchorRegistry> registry_;
unique_ptr<clock::Clock> clock_;
atomic<bool> quiescing_;
bool allow_status_msg_for_failed_peer_;
};
// Observer of a PeerMessageQueue that tracks the notifications sent to
// observers.
class SimpleObserver : public PeerMessageQueueObserver {
public:
SimpleObserver() = default;
void NotifyPeerToStartElection(const string& peer_uuid) override {
peers_to_start_election_.emplace_back(peer_uuid);
}
// Other notifications aren't implemented. Just no-op.
void NotifyCommitIndex(int64_t /*commit_index*/) override {}
void NotifyTermChange(int64_t /*term*/) override {}
void NotifyFailedFollower(const string& /*peer_uuid*/, int64_t /*term*/,
const string& /*reason*/) override {}
void NotifyPeerToPromote(const string& /*peer_uuid*/) override {}
void NotifyPeerHealthChange() override {}
private:
FRIEND_TEST(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate);
// The following track the notifications sent in chronological order.
deque<string> peers_to_start_election_;
};
// Test that the leader consensus queue will only attempt to trigger elections
// when appropriate.
TEST_F(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate) {
SimpleObserver observer;
queue_->RegisterObserver(&observer);
RaftConfigPB config = BuildRaftConfigPBForTests(/*num_voters*/2);
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, config);
RaftPeerPB follower = MakePeer(kPeerUuid, RaftPeerPB::VOTER);
queue_->TrackPeer(follower);
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
WaitForLocalPeerToAckIndex(10);
ConsensusResponsePB peer_response;
peer_response.set_responder_term(1);
peer_response.set_responder_uuid(kPeerUuid);
SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 9), MinimumOpId().index());
int elections_so_far = 0;
// Simulates receiving the peer's response and checks that, upon receiving
// it, the PeerMessageQueue either did or didn't notify that the peer should
// start an election.
auto verify_elections = [&] (bool election_happened) {
ASSERT_TRUE(queue_->ResponseFromPeer(kPeerUuid, peer_response));
// Notifications are communicated via the Raft threadpool, so wait for any
// such notifying tasks to finish.
raft_pool_->Wait();
if (election_happened) {
elections_so_far++;
}
ASSERT_EQ(elections_so_far, observer.peers_to_start_election_.size());
};
// We haven't begun watching for a successor yet and our conditions aren't
// met for this peer to become a leader.
NO_FATALS(verify_elections(/*election_happened*/false));
// Even after waiting for a successor, this peer isn't ready yet.
queue_->BeginWatchForSuccessor(boost::none);
NO_FATALS(verify_elections(/*election_happened*/false));
// Once the peer says it's gotten the last-appended op, we should be good to
// transfer leadership to it.
SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 10), MinimumOpId().index());
NO_FATALS(verify_elections(/*election_happened*/true));
// After we've triggered our election, we shouldn't trigger another.
NO_FATALS(verify_elections(/*election_happened*/false));
// And if we try to step down but specify a different peer, we also won't try
// electing the peer in-hand.
queue_->BeginWatchForSuccessor(boost::make_optional<string>("different-peer"));
NO_FATALS(verify_elections(/*election_happened*/false));
// Even if we begin quiescing, because we're looking for a specific
// successor, we shouldn't see an election.
quiescing_ = true;
NO_FATALS(verify_elections(/*election_happened*/false));
// If we stop watching for that successor and we're quiescing, we'll trigger
// elections.
queue_->EndWatchForSuccessor();
for (int i = 0; i < 3; i++) {
NO_FATALS(verify_elections(/*election_happened*/true));
}
// If the peer weren't a voter, we would also not trigger elections.
config.mutable_peers(1)->set_member_type(RaftPeerPB::NON_VOTER);
queue_->SetLeaderMode(10, 1, config);
NO_FATALS(verify_elections(/*election_happened*/false));
// Now undo that.
config.mutable_peers(1)->set_member_type(RaftPeerPB::VOTER);
queue_->SetLeaderMode(10, 1, config);
NO_FATALS(verify_elections(/*election_happened*/true));
// If the peer reported itself as quiescing, we also wouldn't trigger an
// election.
peer_response.set_server_quiescing(true);
NO_FATALS(verify_elections(/*election_happened*/false));
// If the peer stops reporting its server as quiescing, elections will start
// up again.
peer_response.clear_server_quiescing();
NO_FATALS(verify_elections(/*election_happened*/true));
peer_response.set_server_quiescing(false);
NO_FATALS(verify_elections(/*election_happened*/true));
}
// Test that verifies status-only request messages are sent to the peer even if it's in
// FAILED_UNRECOVERABLE state.
TEST_F(ConsensusQueueTest, TestStatusMessagesToFailedUnrecoverablePeer) {
allow_status_msg_for_failed_peer_ = true;
RaftConfigPB config = BuildRaftConfigPBForTests(/*num_voters*/2);
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, config);
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
WaitForLocalPeerToAckIndex(10);
RaftPeerPB follower = MakePeer(kPeerUuid, RaftPeerPB::VOTER);
queue_->TrackPeer(follower);
ConsensusRequestPB request;
ConsensusResponsePB response;
response.set_responder_uuid(kPeerUuid);
// Send request to a new peer.
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(0, request.ops_size());
SetLastReceivedAndLastCommitted(&response, MinimumOpId());
bool send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately) << "Queue still had requests pending";
// Inject failure to read log messages. This will put the peer in FAILED_UNRECOVERABLE state.
FLAGS_consensus_fail_log_read_ops = 1.0;
Status s = queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ("INJECTED FAILURE", s.message().ToString());
auto health_map = queue_->ReportHealthOfPeers();
ASSERT_NE(health_map.count(kPeerUuid), 0);
auto actual_health = health_map[kPeerUuid].overall_health();
ASSERT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, actual_health);
// Verify status-only message can be sent.
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(0, request.ops_size());
}
// Tests that the queue is able to track a peer when it starts tracking a peer
// after the initial message in the queue. In particular this creates a queue
// with several messages and then starts to track a peer whose watermark
// falls in the middle of the current messages in the queue.
TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 100);
ConsensusRequestPB request;
ConsensusResponsePB response;
response.set_responder_uuid(kPeerUuid);
bool send_more_immediately = false;
// Peer already has some messages, last one being 7.50
OpId last_received = MakeOpId(7, 50);
OpId last_received_current_leader = MinimumOpId();
UpdatePeerWatermarkToOp(&request, &response, last_received,
last_received_current_leader, &send_more_immediately);
ASSERT_TRUE(send_more_immediately);
// Getting a new request should get all operations after 7.50
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(50, request.ops_size());
SetLastReceivedAndLastCommitted(&response, request.ops(49).id());
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_FALSE(send_more_immediately) << "Queue still had requests pending";
// if we ask for a new request, it should come back empty
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(0, request.ops_size());
// extract the ops from the request to avoid double free
request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
}
// Tests that the peers gets the messages pages, with the size of a page
// being 'consensus_max_batch_size_bytes'
TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
// helper to estimate request size so that we can set the max batch size appropriately
// Note: This estimator must be precise, as it is used to set the max batch size.
// In order for the estimate to be correct, all members of the request protobuf must be set.
// If not all fields are set, this will set the batch size to be too small to hold the expected
// number of ops.
ConsensusRequestPB page_size_estimator;
page_size_estimator.set_caller_term(14);
page_size_estimator.set_committed_index(0);
page_size_estimator.set_all_replicated_index(0);
page_size_estimator.set_last_idx_appended_to_leader(0);
page_size_estimator.mutable_preceding_id()->CopyFrom(MinimumOpId());
// We're going to add 100 messages to the queue so we make each page fetch 9 of those,
// for a total of 12 pages. The last page should have a single op.
const int kOpsPerRequest = 9;
for (int i = 0; i < kOpsPerRequest; i++) {
page_size_estimator.mutable_ops()->AddAllocated(
CreateDummyReplicate(0, 0, clock_->Now(), 0).release());
}
// Save the current flag state.
google::FlagSaver saver;
FLAGS_consensus_max_batch_size_bytes =
static_cast<int32_t>(page_size_estimator.ByteSizeLong());
ConsensusRequestPB request;
ConsensusResponsePB response;
response.set_responder_uuid(kPeerUuid);
bool send_more_immediately = false;
UpdatePeerWatermarkToOp(&request, &response, MinimumOpId(), MinimumOpId(),
&send_more_immediately);
ASSERT_TRUE(send_more_immediately);
// Append the messages after the queue is tracked. Otherwise the ops might
// get evicted from the cache immediately and the requests below would
// result in async log reads instead of cache hits.
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 100);
OpId last;
for (int i = 0; i < 11; i++) {
VLOG(1) << "Making request " << i;
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(kOpsPerRequest, request.ops_size());
last = request.ops(request.ops_size() -1).id();
SetLastReceivedAndLastCommitted(&response, last);
VLOG(1) << "Faking received up through " << last;
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately);
}
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(1, request.ops_size());
last = request.ops(request.ops_size() -1).id();
SetLastReceivedAndLastCommitted(&response, last);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_FALSE(send_more_immediately);
// extract the ops from the request to avoid double free
request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
}
TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 100);
// Wait for the local peer to append all messages
WaitForLocalPeerToAckIndex(100);
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 0);
// Since we're tracking a single peer still this should have moved the all
// replicated watermark to the last op appended to the local log.
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 100);
// Start to track the peer after the queue has some messages in it
// at a point that is halfway through the current messages in the queue.
OpId first_msg = MakeOpId(7, 50);
ConsensusRequestPB request;
ConsensusResponsePB response;
response.set_responder_uuid(kPeerUuid);
bool send_more_immediately = false;
UpdatePeerWatermarkToOp(&request, &response, first_msg, MinimumOpId(), &send_more_immediately);
ASSERT_TRUE(send_more_immediately);
// Tracking a peer a new peer should have moved the all replicated watermark back.
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 0);
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 0);
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(50, request.ops_size());
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 101, 100);
SetLastReceivedAndLastCommitted(&response, request.ops(49).id());
response.set_responder_term(28);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately) << "Queue didn't have anymore requests pending";
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 100);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 100);
// if we ask for a new request, it should come back with the rest of the messages
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(100, request.ops_size());
OpId expected = request.ops(99).id();
SetLastReceivedAndLastCommitted(&response, expected);
response.set_responder_term(expected.term());
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_FALSE(send_more_immediately) << "Queue didn't have anymore requests pending";
WaitForLocalPeerToAckIndex(expected.index());
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected.index());
ASSERT_EQ(queue_->GetAllReplicatedIndex(), expected.index());
// extract the ops from the request to avoid double free
request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
}
TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
// Track 4 additional peers (in addition to the local peer)
queue_->TrackPeer(MakePeer("peer-1", RaftPeerPB::VOTER));
queue_->TrackPeer(MakePeer("peer-2", RaftPeerPB::VOTER));
queue_->TrackPeer(MakePeer("peer-3", RaftPeerPB::VOTER));
queue_->TrackPeer(MakePeer("peer-4", RaftPeerPB::VOTER));
// Append 10 messages to the queue.
// This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
WaitForLocalPeerToAckIndex(10);
// Since only the local log has ACKed at this point,
// the committed_index should be MinimumOpId().
ASSERT_EQ(queue_->GetCommittedIndex(), 0);
// NOTE: We don't need to get operations from the queue. The queue
// only cares about what the peer reported as received, not what was sent.
ConsensusResponsePB response;
response.set_responder_term(1);
bool send_more_immediately;
OpId last_sent = MakeOpId(0, 5);
// Ack the first five operations for peer-1.
response.set_responder_uuid("peer-1");
SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately);
// Committed index should be the same
ASSERT_EQ(queue_->GetCommittedIndex(), 0);
// Ack the first five operations for peer-2.
response.set_responder_uuid("peer-2");
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately);
// A majority has now replicated up to 0.5: local, 'peer-1', and 'peer-2'.
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 5);
// However, this leader has appended operations in term 1, so we can't
// advance the committed index yet.
ASSERT_EQ(queue_->GetCommittedIndex(), 0);
// Moreover, 'peer-3' and 'peer-4' have not acked yet, so the "all-replicated"
// index also cannot advance.
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 0);
// Ack all operations for peer-3.
response.set_responder_uuid("peer-3");
last_sent = MakeOpId(1, 10);
SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
// peer-3 now has all operations, and the commit index hasn't advanced.
EXPECT_FALSE(send_more_immediately);
// Watermarks should remain the same as above: we still have not majority-replicated
// anything in the current term, so committed index cannot advance.
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 5);
ASSERT_EQ(queue_->GetCommittedIndex(), 0);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 0);
// Ack the remaining operations for peer-4.
response.set_responder_uuid("peer-4");
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
EXPECT_TRUE(send_more_immediately);
// Now that a majority of peers have replicated an operation in the queue's
// term the committed index should advance.
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 10);
ASSERT_EQ(queue_->GetCommittedIndex(), 10);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 5);
}
// Ensure that the acks for a non-voter don't count toward the majority.
TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) {
const auto kOtherVoterPeer = "peer-1";
const auto kNonVoterPeer = "non-voter-peer-0";
// 1. Add a non-voter to the config where there are 2 voters.
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm,
BuildRaftConfigPBForTests(/*num_voters=*/ 2,
/*num_non_voters=*/ 1));
// Track 2 additional peers (in addition to the local peer)
queue_->TrackPeer(MakePeer(kOtherVoterPeer, RaftPeerPB::VOTER));
queue_->TrackPeer(MakePeer(kNonVoterPeer, RaftPeerPB::NON_VOTER));
// 2. Add some writes. Only the local leader immediately acks them, which is
// not enough to commit in a 2-voter + 1 non-voter config.
//
// Append 10 messages to the queue.
// This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
const int kNumMessages = 10;
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(),
/*first=*/ 1, /*count=*/ kNumMessages);
WaitForLocalPeerToAckIndex(kNumMessages);
// Since only the local log has acked at this point, the committed_index
// should be 0.
const int64_t kNoneCommittedIndex = 0;
ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
// 3. Ack the operations from the NON_VOTER peer. The writes will not have
// been committed yet, because the 2nd VOTER has not yet acked them.
ConsensusResponsePB response;
response.set_responder_uuid(kNonVoterPeer);
const int64_t kCurrentTerm = 1;
response.set_responder_term(kCurrentTerm);
SetLastReceivedAndLastCommitted(&response,
/*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
/*last_committed_idx=*/ kNoneCommittedIndex);
bool send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_FALSE(send_more_immediately);
// Committed index should be the same.
ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
// 4. Send an identical ack from the 2nd VOTER peer. This should cause the
// operation to be committed.
response.set_responder_uuid(kOtherVoterPeer);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately); // The committed index has increased.
// The committed index should include the full set of ops now.
ASSERT_EQ(kNumMessages, queue_->GetCommittedIndex());
SetLastReceivedAndLastCommitted(&response,
/*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
/*last_committed_idx=*/ kNumMessages);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_FALSE(send_more_immediately);
}
// In this test we append a sequence of operations to a log
// and then start tracking a peer whose first required operation
// is before the first operation in the queue.
TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
OpId opid = MakeOpId(1, 1);
const int kOpsToAppend = 100;
for (int i = 1; i <= kOpsToAppend; i++) {
ASSERT_OK(log::AppendNoOpToLogSync(clock_.get(), log_.get(), &opid));
// Roll the log every 10 ops
if (i % 10 == 0) {
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
}
}
ASSERT_OK(log_->WaitUntilAllFlushed());
ASSERT_OPID_EQ(MakeOpId(1, kOpsToAppend + 1), opid);
OpId last_logged_opid = MakeOpId(opid.term(), opid.index() - 1);
// Now reset the queue so that we can pass a new committed index,
// the last operation in the log.
CloseAndReopenQueue(last_logged_opid, last_logged_opid);
queue_->SetLeaderMode(last_logged_opid.index(),
last_logged_opid.term(),
BuildRaftConfigPBForTests(3));
ConsensusRequestPB request;
ConsensusResponsePB response;
response.set_responder_uuid(kPeerUuid);
bool send_more_immediately = false;
// The peer will actually be behind the first operation in the queue.
// In this case about 50 operations before.
OpId peers_last_op;
peers_last_op.set_term(1);
peers_last_op.set_index(50);
// Now we start tracking the peer, this negotiation round should let
// the queue know how far along the peer is.
NO_FATALS(UpdatePeerWatermarkToOp(&request,
&response,
peers_last_op,
MinimumOpId(),
&send_more_immediately));
// The queue should reply that there are more messages for the peer.
ASSERT_TRUE(send_more_immediately);
// When we get another request for the peer the queue should load
// the missing operations.
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(request.ops_size(), 50);
// The messages still belong to the queue so we have to release them.
request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
}
// This tests that the queue is able to handle operation overwriting, i.e. when a
// newly tracked peer reports the last received operations as some operation that
// doesn't exist in the leader's log. In particular it tests the case where a
// new leader starts at term 2 with only a part of the operations of the previous
// leader having been committed.
TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
OpId opid = MakeOpId(1, 1);
// Append 10 messages in term 1 to the log.
for (int i = 1; i <= 10; i++) {
ASSERT_OK(log::AppendNoOpToLogSync(clock_.get(), log_.get(), &opid));
// Roll the log every 3 ops
if (i % 3 == 0) {
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
}
}
opid = MakeOpId(2, 11);
// Now append 10 more messages in term 2.
for (int i = 11; i <= 20; i++) {
ASSERT_OK(log::AppendNoOpToLogSync(clock_.get(), log_.get(), &opid));
// Roll the log every 3 ops
if (i % 3 == 0) {
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
}
}
OpId last_in_log = MakeOpId(opid.term(), opid.index() - 1);
int64_t committed_index = 15;
// Now reset the queue so that we can pass a new committed index (15).
CloseAndReopenQueue(last_in_log, MakeOpId(2, committed_index));
queue_->SetLeaderMode(committed_index,
last_in_log.term(),
BuildRaftConfigPBForTests(3));
// Now get a request for a simulated old leader, which contains more operations
// in term 1 than the new leader has.
// The queue should realize that the old leader's last received doesn't exist
// and send it operations starting at the old leader's committed index.
ConsensusRequestPB request;
ConsensusResponsePB response;
vector<ReplicateRefPtr> refs;
response.set_responder_uuid(kPeerUuid);
bool send_more_immediately = false;
queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
// Ask for a request. The queue assumes the peer is up-to-date so
// this should contain no operations.
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(request.ops_size(), 0);
ASSERT_OPID_EQ(request.preceding_id(), MakeOpId(2, 20));
ASSERT_EQ(request.committed_index(), committed_index);
// The old leader was still in term 1 but it increased its term with our request.
response.set_responder_term(2);
// We emulate that the old leader had 25 total operations in Term 1 (15 more than we knew about)
// which were never committed, and that its last known committed index was 5.
ConsensusStatusPB* status = response.mutable_status();
status->mutable_last_received()->CopyFrom(MakeOpId(1, 25));
status->mutable_last_received_current_leader()->CopyFrom(MinimumOpId());
status->set_last_committed_idx(5);
ConsensusErrorPB* error = status->mutable_error();
error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
StatusToPB(Status::IllegalState("LMP failed."), error->mutable_status());
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
request.Clear();
// The queue should reply that there are more operations pending.
ASSERT_TRUE(send_more_immediately);
// We're waiting for a two nodes. The all committed watermark should be
// 0.0 since we haven't had a successful exchange with the 'remote' peer.
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 0);
// Test even when a correct peer responds (meaning we actually get to execute
// watermark advancement) we sill have the same all-replicated watermark.
ReplicateMsg* replicate = CreateDummyReplicate(2, 21, clock_->Now(), 0).release();
ASSERT_OK(queue_->AppendOperation(make_scoped_refptr(new RefCountedReplicate(replicate))));
WaitForLocalPeerToAckIndex(21);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 0);
// Generate another request for the remote peer, which should include
// all of the ops since the peer's last-known committed index.
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_OPID_EQ(MakeOpId(1, 5), request.preceding_id());
ASSERT_EQ(16, request.ops_size());
// Now when we respond the watermarks should advance.
response.mutable_status()->clear_error();
SetLastReceivedAndLastCommitted(&response, MakeOpId(2, 21), 5);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately);
// Now the watermark should have advanced.
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 21);
// The messages still belong to the queue so we have to release them.
request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
}
// Test for a bug where we wouldn't move any watermark back, when overwriting
// operations, which would cause a check failure on the write immediately
// following the overwriting write.
TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
// Append a bunch of messages and update as if they were also appeneded to the leader.
queue_->UpdateLastIndexAppendedToLeader(10);
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
log_->WaitUntilAllFlushed();
// Now rewrite some of the operations and wait for the log to append.
Synchronizer synch;
CHECK_OK(queue_->AppendOperations(
{ make_scoped_refptr(new RefCountedReplicate(
CreateDummyReplicate(2, 5, clock_->Now(), 0).release())) },
synch.AsStatusCallback()));
// Wait for the operation to be in the log.
ASSERT_OK(synch.Wait());
// Having appended index 5, the follower is still 5 ops behind the leader.
ASSERT_EQ(5, queue_->metrics_.num_ops_behind_leader->value());
// Without the fix the following append would trigger a check failure
// in log cache.
synch.Reset();
CHECK_OK(queue_->AppendOperations(
{ make_scoped_refptr(new RefCountedReplicate(
CreateDummyReplicate(2, 6, clock_->Now(), 0).release())) },
synch.AsStatusCallback()));
// Wait for the operation to be in the log.
ASSERT_OK(synch.Wait());
// Having appended index 6, the follower is still 4 ops behind the leader.
ASSERT_EQ(4, queue_->metrics_.num_ops_behind_leader->value());
// The replication watermark on a follower should not advance by virtue of appending
// entries to the log.
ASSERT_EQ(0, queue_->GetAllReplicatedIndex());
}
// Tests that we're advancing the watermarks properly and only when the peer
// has a prefix of our log. This also tests for a specific bug that we had. Here's
// the scenario:
// Peer would report:
// - last received 75.49
// - last committed 72.31
//
// Queue has messages:
// 72.31-72.45
// 73.46-73.51
// 76.52-76.53
//
// The queue has more messages than the peer, but the peer has messages
// that the queue doesn't and which will be overwritten.
//
// In the first round of negotiation the peer would report LMP mismatch.
// In the second round the queue would try to send it messages starting at 75.49
// but since that message didn't exist in the queue's log it would instead send
// messages starting at 72.31. However, because the batches were big it was only
// able to send a few messages (e.g. up to 72.40).
//
// Since in this last exchange everything went ok (the peer still doesn't know
// that messages will be overwritten later), the queue would mark the exchange
// as successful and the peer's last received would be taken into account when
// calculating watermarks, which was incorrect.
TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog) {
FLAGS_consensus_max_batch_size_bytes = 1024 * 10;
const int kInitialCommittedIndex = 30;
CloseAndReopenQueue(MakeOpId(72, 30), MakeOpId(82, 30));
queue_->SetLeaderMode(kInitialCommittedIndex, 76, BuildRaftConfigPBForTests(3));
ConsensusRequestPB request;
ConsensusResponsePB response;
vector<ReplicateRefPtr> refs;
bool send_more_immediately;
// We expect the majority replicated watermark to start at the committed index.
int64_t expected_majority_replicated = kInitialCommittedIndex;
// We expect the all replicated watermark to be reset when we track a new peer.
int64_t expected_all_replicated = 0;
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), expected_all_replicated);
UpdatePeerWatermarkToOp(&request, &response, MakeOpId(75, 49), MinimumOpId(), 31,
&send_more_immediately);
ASSERT_TRUE(send_more_immediately);
for (int i = 31; i <= 53; i++) {
if (i <= 45) {
AppendReplicateMsg(72, i, 1024);
continue;
}
if (i <= 51) {
AppendReplicateMsg(73, i, 1024);
continue;
}
AppendReplicateMsg(76, i, 1024);
}
WaitForLocalPeerToAckIndex(53);
// When we get operations for this peer we should get them starting immediately after
// the committed index, for a total of 9 operations.
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(request.ops_size(), 9);
ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(72, 32));
const OpId* last_op = &request.ops(request.ops_size() - 1).id();
// When the peer acks that it received an operation that is not in our current
// term, it gets ignored in terms of watermark advancement.
SetLastReceivedAndLastCommitted(&response, MakeOpId(75, 49), *last_op, 31);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_TRUE(send_more_immediately);
// We've sent (and received and ack) up to 72.40 from the remote peer
expected_majority_replicated = expected_all_replicated = 40;
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), expected_all_replicated);
// Another request for this peer should get another page of messages. Still not
// on the queue's term (and thus without advancing watermarks).
request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(request.ops_size(), 9);
ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(72, 41));
last_op = &request.ops(request.ops_size() - 1).id();
SetLastReceivedAndLastCommitted(&response, MakeOpId(75, 49), *last_op, 31);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
// We've now sent (and received an ack) up to 73.39
expected_majority_replicated = expected_all_replicated = 49;
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), expected_all_replicated);
// The last page of request should overwrite the peer's operations and the
// response should finally advance the watermarks.
request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
ASSERT_EQ(request.ops_size(), 4);
ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(73, 50));
// We're done, both watermarks should be at the end.
expected_majority_replicated = expected_all_replicated = 53;
SetLastReceivedAndLastCommitted(&response, MakeOpId(76, 53), 31);
send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
ASSERT_EQ(queue_->GetAllReplicatedIndex(), expected_all_replicated);
request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
}
// Test that Tablet Copy is triggered when a "tablet not found" error occurs.
TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 100);
ConsensusRequestPB request;
ConsensusResponsePB response;
response.set_responder_uuid(kPeerUuid);
queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
// Create request for new peer.
vector<ReplicateRefPtr> refs;
bool needs_tablet_copy;
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_FALSE(needs_tablet_copy);
// Peer responds with tablet not found.
queue_->UpdatePeerStatus(kPeerUuid, PeerStatus::TABLET_NOT_FOUND,
Status::NotFound("No such tablet"));
// On the next request, we should find out that the queue wants us to initiate Tablet Copy.
request.Clear();
ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
ASSERT_TRUE(needs_tablet_copy);
StartTabletCopyRequestPB tc_req;
ASSERT_OK(queue_->GetTabletCopyRequestForPeer(kPeerUuid, &tc_req));
ASSERT_TRUE(tc_req.IsInitialized()) << pb_util::SecureShortDebugString(tc_req);
ASSERT_EQ(kTestTablet, tc_req.tablet_id());
ASSERT_EQ(kLeaderUuid, tc_req.copy_peer_uuid());
ASSERT_EQ(pb_util::SecureShortDebugString(FakeRaftPeerPB(kLeaderUuid).last_known_addr()),
pb_util::SecureShortDebugString(tc_req.copy_peer_addr()));
}
TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
// Emulate a follower sending a request to replicate 10 messages.
queue_->UpdateLastIndexAppendedToLeader(10);
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
WaitForLocalPeerToAckIndex(10);
// The committed_index should be MinimumOpId() since UpdateFollowerWatermarks
// has not been called.
ASSERT_EQ(0, queue_->GetCommittedIndex());
// Update the committed index. In real life, this would be done by the consensus
// implementation when it receives an updated committed index from the leader.
queue_->UpdateFollowerWatermarks(/*committed_index=*/ 10,
/*all_replicated_index=*/ 10);
ASSERT_EQ(10, queue_->GetCommittedIndex());
// Check the metrics have the right values based on the updated committed index.
ASSERT_EQ(0, queue_->metrics_.num_majority_done_ops->value());
ASSERT_EQ(0, queue_->metrics_.num_in_progress_ops->value());
ASSERT_EQ(0, queue_->metrics_.num_ops_behind_leader->value());
// Emulate the leader appending up to index 15. The num_ops_behind_leader should jump to 5.
queue_->UpdateLastIndexAppendedToLeader(15);
ASSERT_EQ(5, queue_->metrics_.num_ops_behind_leader->value());
}
// Unit test for the PeerMessageQueue::PeerHealthStatus() method.
TEST(ConsensusQueueUnitTest, PeerHealthStatus) {
static constexpr PeerStatus kPeerStatusesForUnknown[] = {
PeerStatus::NEW,
PeerStatus::REMOTE_ERROR,
PeerStatus::RPC_LAYER_ERROR,
PeerStatus::TABLET_NOT_FOUND,
PeerStatus::INVALID_TERM,
PeerStatus::CANNOT_PREPARE,
PeerStatus::LMP_MISMATCH,
};
RaftPeerPB peer_pb;
PeerMessageQueue::TrackedPeer peer(peer_pb);
EXPECT_EQ(HealthReportPB::UNKNOWN, PeerMessageQueue::PeerHealthStatus(peer));
for (auto status : kPeerStatusesForUnknown) {
peer.last_exchange_status = status;
EXPECT_EQ(HealthReportPB::UNKNOWN, PeerMessageQueue::PeerHealthStatus(peer));
}
peer.last_exchange_status = PeerStatus::TABLET_FAILED;
EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, PeerMessageQueue::PeerHealthStatus(peer));
peer.last_exchange_status = PeerStatus::OK;
EXPECT_EQ(HealthReportPB::HEALTHY, PeerMessageQueue::PeerHealthStatus(peer));
peer.wal_catchup_possible = false;
EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, PeerMessageQueue::PeerHealthStatus(peer));
peer.wal_catchup_possible = true;
EXPECT_EQ(HealthReportPB::HEALTHY, PeerMessageQueue::PeerHealthStatus(peer));
peer.last_communication_time -=
MonoDelta::FromSeconds(FLAGS_follower_unavailable_considered_failed_sec + 1);
EXPECT_EQ(HealthReportPB::FAILED, PeerMessageQueue::PeerHealthStatus(peer));
for (auto status : kPeerStatusesForUnknown) {
peer.last_exchange_status = status;
EXPECT_EQ(HealthReportPB::FAILED, PeerMessageQueue::PeerHealthStatus(peer));
}
peer.last_exchange_status = PeerStatus::TABLET_FAILED;
EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, PeerMessageQueue::PeerHealthStatus(peer));
peer.last_exchange_status = PeerStatus::OK;
EXPECT_EQ(HealthReportPB::FAILED, PeerMessageQueue::PeerHealthStatus(peer));
peer.wal_catchup_possible = false;
EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, PeerMessageQueue::PeerHealthStatus(peer));
for (auto status : kPeerStatusesForUnknown) {
peer.last_exchange_status = status;
EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, PeerMessageQueue::PeerHealthStatus(peer));
}
peer.last_exchange_status = PeerStatus::OK;
EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, PeerMessageQueue::PeerHealthStatus(peer));
peer.last_exchange_status = PeerStatus::TABLET_FAILED;
EXPECT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, PeerMessageQueue::PeerHealthStatus(peer));
}
} // namespace consensus
} // namespace kudu