blob: 0daf78a89e34df0550700be8fc4733ce61c2b703 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/raft_consensus.h"
#include <algorithm>
#include <cmath>
#include <cstdint>
#include <functional>
#include <iterator>
#include <memory>
#include <mutex>
#include <ostream>
#include <unordered_map>
#include <unordered_set>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <google/protobuf/util/message_differencer.h>
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/leader_election.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/peer_manager.h"
#include "kudu/consensus/pending_rounds.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/periodic.h"
#include "kudu/util/async_util.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/process_memory.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
#include "kudu/util/url-coding.h"
DEFINE_int32(raft_heartbeat_interval_ms, 500,
"The heartbeat interval for Raft replication. The leader produces heartbeats "
"to followers at this interval. The followers expect a heartbeat at this interval "
"and consider a leader to have failed if it misses several in a row.");
TAG_FLAG(raft_heartbeat_interval_ms, advanced);
DEFINE_double(leader_failure_max_missed_heartbeat_periods, 3.0,
"Maximum heartbeat periods that the leader can fail to heartbeat in before we "
"consider the leader to be failed. The total failure timeout in milliseconds is "
"raft_heartbeat_interval_ms times leader_failure_max_missed_heartbeat_periods. "
"The value passed to this flag may be fractional.");
TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced);
DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000,
"Maximum time to sleep in between leader election retries, in addition to the "
"regular timeout. When leader election fails the interval in between retries "
"increases exponentially, up to this value.");
TAG_FLAG(leader_failure_exp_backoff_max_delta_ms, experimental);
DEFINE_bool(enable_leader_failure_detection, true,
"Whether to enable failure detection of tablet leaders. If enabled, attempts will be "
"made to elect a follower as a new leader when the leader is detected to have failed.");
TAG_FLAG(enable_leader_failure_detection, unsafe);
DEFINE_bool(evict_failed_followers, true,
"Whether to evict followers from the Raft config that have fallen "
"too far behind the leader's log to catch up normally or have been "
"unreachable by the leader for longer than "
"follower_unavailable_considered_failed_sec");
TAG_FLAG(evict_failed_followers, advanced);
DEFINE_bool(follower_reject_update_consensus_requests, false,
"Whether a follower will return an error for all UpdateConsensus() requests. "
"Warning! This is only intended for testing.");
TAG_FLAG(follower_reject_update_consensus_requests, unsafe);
DEFINE_bool(follower_fail_all_prepare, false,
"Whether a follower will fail preparing all ops. "
"Warning! This is only intended for testing.");
TAG_FLAG(follower_fail_all_prepare, unsafe);
DEFINE_bool(raft_enable_pre_election, true,
"When enabled, candidates will call a pre-election before "
"running a real leader election.");
TAG_FLAG(raft_enable_pre_election, experimental);
TAG_FLAG(raft_enable_pre_election, runtime);
DEFINE_bool(raft_enable_tombstoned_voting, true,
"When enabled, tombstoned tablets may vote in elections.");
TAG_FLAG(raft_enable_tombstoned_voting, experimental);
TAG_FLAG(raft_enable_tombstoned_voting, runtime);
// Enable improved re-replication (KUDU-1097).
DEFINE_bool(raft_prepare_replacement_before_eviction, true,
"When enabled, failed replicas will only be evicted after a "
"replacement has been prepared for them.");
TAG_FLAG(raft_prepare_replacement_before_eviction, advanced);
TAG_FLAG(raft_prepare_replacement_before_eviction, experimental);
DECLARE_int32(memory_limit_warn_threshold_percentage);
// Metrics
// ---------
METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections,
"Follower Memory Pressure Rejections",
kudu::MetricUnit::kRequests,
"Number of RPC requests rejected due to "
"memory pressure while FOLLOWER.",
kudu::MetricLevel::kWarn);
METRIC_DEFINE_gauge_int64(tablet, raft_term,
"Current Raft Consensus Term",
kudu::MetricUnit::kUnits,
"Current Term of the Raft Consensus algorithm. This number increments "
"each time a leader election is started.",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_gauge_int64(tablet, failed_elections_since_stable_leader,
"Failed Elections Since Stable Leader",
kudu::MetricUnit::kUnits,
"Number of failed elections on this node since there was a stable "
"leader. This number increments on each failed election and resets on "
"each successful one.",
kudu::MetricLevel::kWarn);
METRIC_DEFINE_gauge_int64(tablet, time_since_last_leader_heartbeat,
"Time Since Last Leader Heartbeat",
kudu::MetricUnit::kMilliseconds,
"The time elapsed since the last heartbeat from the leader "
"in milliseconds. This metric is identically zero on a leader replica.",
kudu::MetricLevel::kDebug);
using boost::optional;
using google::protobuf::util::MessageDifferencer;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::PeriodicTimer;
using kudu::tserver::TabletServerErrorPB;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::weak_ptr;
using strings::Substitute;
namespace kudu {
namespace consensus {
RaftConsensus::RaftConsensus(
ConsensusOptions options,
RaftPeerPB local_peer_pb,
scoped_refptr<ConsensusMetadataManager> cmeta_manager,
ServerContext server_ctx)
: options_(std::move(options)),
local_peer_pb_(std::move(local_peer_pb)),
cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
server_ctx_(std::move(server_ctx)),
state_(kNew),
rng_(GetRandomSeed32()),
leader_is_ready_(false),
leader_transfer_in_progress_(false),
withhold_votes_until_(MonoTime::Min()),
last_received_cur_leader_(MinimumOpId()),
failed_elections_since_stable_leader_(0),
shutdown_(false),
update_calls_for_tests_(0) {
DCHECK(local_peer_pb_.has_permanent_uuid());
}
Status RaftConsensus::Init() {
LockGuard l(lock_);
DCHECK_EQ(kNew, state_) << State_Name(state_);
RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
SetStateUnlocked(kInitialized);
return Status::OK();
}
RaftConsensus::~RaftConsensus() {
Shutdown();
}
Status RaftConsensus::Create(ConsensusOptions options,
RaftPeerPB local_peer_pb,
scoped_refptr<ConsensusMetadataManager> cmeta_manager,
ServerContext server_ctx,
shared_ptr<RaftConsensus>* consensus_out) {
shared_ptr<RaftConsensus> consensus(RaftConsensus::make_shared(std::move(options),
std::move(local_peer_pb),
std::move(cmeta_manager),
std::move(server_ctx)));
RETURN_NOT_OK_PREPEND(consensus->Init(), "Unable to initialize Raft consensus");
*consensus_out = std::move(consensus);
return Status::OK();
}
Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
unique_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
unique_ptr<TimeManager> time_manager,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
MarkDirtyCallback cb) {
DCHECK(metric_entity);
peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
log_ = DCHECK_NOTNULL(std::move(log));
time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
round_handler_ = DCHECK_NOTNULL(round_handler);
mark_dirty_clbk_ = std::move(cb);
term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term,
CurrentTerm(),
MergeType::kMax);
follower_memory_pressure_rejections_ =
metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections);
num_failed_elections_metric_ =
metric_entity->FindOrCreateGauge(&METRIC_failed_elections_since_stable_leader,
failed_elections_since_stable_leader_);
METRIC_time_since_last_leader_heartbeat.InstantiateFunctionGauge(
metric_entity, [this]() { return this->GetMillisSinceLastLeaderHeartbeat(); },
MergeType::kMax)
->AutoDetach(&metric_detacher_);
// A single Raft thread pool token is shared between RaftConsensus and
// PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
// raw pointer to the token, to emphasize that RaftConsensus is responsible
// for destroying the token.
ThreadPool* raft_pool = server_ctx_.raft_pool;
raft_pool_token_ = raft_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
// The message queue that keeps track of which operations need to be replicated
// where.
//
// Note: the message queue receives a dedicated Raft thread pool token so that
// its submissions don't block other submissions by RaftConsensus (such as
// heartbeat processing).
//
// TODO(adar): the token is SERIAL to match the previous single-thread
// observer pool behavior, but CONCURRENT may be safe here.
unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue(
metric_entity,
log_,
time_manager_.get(),
local_peer_pb_,
options_.tablet_id,
raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL),
server_ctx_.quiescing,
info.last_id,
info.last_committed_id,
server_ctx_.allow_status_msg_for_failed_peer));
// A manager for the set of peers that actually send the operations both remotely
// and to the local wal.
unique_ptr<PeerManager> peer_manager(new PeerManager(options_.tablet_id,
peer_uuid(),
peer_proxy_factory_.get(),
queue.get(),
raft_pool_token_.get(),
log_));
unique_ptr<PendingRounds> pending(new PendingRounds(
LogPrefixThreadSafe(), time_manager_.get()));
// Capture a weak_ptr reference into the functor so it can safely handle
// outliving the consensus instance.
weak_ptr<RaftConsensus> w = shared_from_this();
PeriodicTimer::Options opts;
opts.one_shot = true;
failure_detector_ = PeriodicTimer::Create(
peer_proxy_factory_->messenger(),
[w]() {
if (auto consensus = w.lock()) {
consensus->ReportFailureDetected();
}
},
MinimumElectionTimeout(),
opts);
transfer_period_timer_ = PeriodicTimer::Create(
peer_proxy_factory_->messenger(),
[w]() {
if (auto consensus = w.lock()) {
consensus->EndLeaderTransferPeriod();
}
},
MinimumElectionTimeout(),
opts);
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
<< State_Name(state_);
queue_ = std::move(queue);
peer_manager_ = std::move(peer_manager);
pending_ = std::move(pending);
ClearLeaderUnlocked();
// Our last persisted term can be higher than the last persisted operation
// (i.e. if we called an election) but reverse should never happen.
if (info.last_id.term() > CurrentTermUnlocked()) {
return Status::Corruption(Substitute("Unable to start RaftConsensus: "
"The last op in the WAL with id $0 has a term ($1) that is greater "
"than the latest recorded term, which is $2",
OpIdToString(info.last_id),
info.last_id.term(),
CurrentTermUnlocked()));
}
// Append any uncommitted replicate messages found during log replay to the queue.
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
<< info.orphaned_replicates.size()
<< " pending ops. Active config: "
<< SecureShortDebugString(cmeta_->ActiveConfig());
for (ReplicateMsg* replicate : info.orphaned_replicates) {
ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
RETURN_NOT_OK(StartFollowerOpUnlocked(replicate_ptr));
}
// Set the initial committed opid for the PendingRounds only after
// appending any uncommitted replicate messages to the queue.
pending_->SetInitialCommittedOpId(info.last_committed_id);
// If this is the first term expire the FD immediately so that we have a
// fast first election, otherwise we just let the timer expire normally.
boost::optional<MonoDelta> fd_initial_delta;
if (CurrentTermUnlocked() == 0) {
// The failure detector is initialized to a low value to trigger an early
// election (unless someone else requested a vote from us first, which
// resets the election timer).
//
// We do it this way instead of immediately running an election to get a
// higher likelihood of enough servers being available when the first one
// attempts an election to avoid multiple election cycles on startup,
// while keeping that "waiting period" random.
if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Consensus starting up: Expiring failure detector timer "
"to make a prompt election more likely";
fd_initial_delta = MonoDelta::FromMilliseconds(
rng_.Uniform(FLAGS_raft_heartbeat_interval_ms));
}
}
// Now assume non-leader replica duties.
RETURN_NOT_OK(BecomeReplicaUnlocked(fd_initial_delta));
SetStateUnlocked(kRunning);
}
if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
WARN_NOT_OK_EVERY_N_SECS(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION),
"Couldn't start leader election", 10);
}
// Report become visible to the Master.
MarkDirty("RaftConsensus started");
return Status::OK();
}
bool RaftConsensus::IsRunning() const {
return state_ == kRunning;
}
Status RaftConsensus::EmulateElectionForTests() {
TRACE_EVENT2("consensus", "RaftConsensus::EmulateElectionForTests",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RETURN_NOT_OK(CheckRunningUnlocked());
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
// Assume leadership of new term.
RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1));
SetLeaderUuidUnlocked(peer_uuid());
return BecomeLeaderUnlocked();
}
namespace {
const char* ModeString(RaftConsensus::ElectionMode mode) {
switch (mode) {
case RaftConsensus::NORMAL_ELECTION:
return "leader election";
case RaftConsensus::PRE_ELECTION:
return "pre-election";
case RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE:
return "forced leader election";
}
__builtin_unreachable(); // silence gcc warnings
}
string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uuid) {
switch (reason) {
case RaftConsensus::INITIAL_SINGLE_NODE_ELECTION:
return "initial election of a single-replica configuration";
case RaftConsensus::EXTERNAL_REQUEST:
return "received explicit request";
case RaftConsensus::ELECTION_TIMEOUT_EXPIRED:
if (leader_uuid.empty()) {
return "no leader contacted us within the election timeout";
}
return Substitute("detected failure of leader $0", leader_uuid);
}
__builtin_unreachable(); // silence gcc warnings
}
} // anonymous namespace
Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
if (server_ctx_.quiescing && server_ctx_.quiescing->load()) {
return Status::IllegalState("leader elections are disabled");
}
const char* const mode_str = ModeString(mode);
TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
"peer", LogPrefixThreadSafe(),
"mode", mode_str);
scoped_refptr<LeaderElection> election;
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RETURN_NOT_OK(CheckRunningUnlocked());
RaftPeerPB::Role active_role = cmeta_->active_role();
if (active_role == RaftPeerPB::LEADER) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << Substitute(
"Not starting $0 -- already a leader", mode_str);
return Status::OK();
}
if (PREDICT_FALSE(!consensus::IsVoterRole(active_role))) {
// A non-voter should not start leader elections. The leader failure
// detector should be re-enabled once the non-voter replica is promoted
// to voter replica.
return Status::IllegalState("only voting members can start elections",
SecureShortDebugString(cmeta_->ActiveConfig()));
}
if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
SnoozeFailureDetector();
return Status::IllegalState("Not starting election: node is currently "
"a non-participant in the Raft config",
SecureShortDebugString(cmeta_->ActiveConfig()));
}
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Starting " << mode_str
<< " (" << ReasonString(reason, GetLeaderUuidUnlocked()) << ")";
// Snooze to avoid the election timer firing again as much as possible.
// We do not disable the election timer while running an election, so that
// if the election times out, we will try again.
MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked();
SnoozeFailureDetector(string("starting election"), timeout);
// Increment the term and vote for ourselves, unless it's a pre-election.
if (mode != PRE_ELECTION) {
// TODO(mpercy): Consider using a separate Mutex for voting, which must sync to disk.
// We skip flushing the term to disk because setting the vote just below also
// flushes to disk, and the double fsync doesn't buy us anything.
RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1,
SKIP_FLUSH_TO_DISK));
RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid()));
}
RaftConfigPB active_config = cmeta_->ActiveConfig();
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: "
<< SecureShortDebugString(active_config);
// Initialize the VoteCounter.
int num_voters = CountVoters(active_config);
int majority_size = MajoritySize(num_voters);
VoteCounter counter(num_voters, majority_size);
// Vote for ourselves.
bool duplicate;
RETURN_NOT_OK(counter.RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
CHECK(!duplicate) << LogPrefixUnlocked()
<< "Inexplicable duplicate self-vote for term "
<< CurrentTermUnlocked();
VoteRequestPB request;
request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
request.set_candidate_uuid(peer_uuid());
if (mode == PRE_ELECTION) {
// In a pre-election, we haven't bumped our own term yet, so we need to be
// asking for votes for the next term.
request.set_is_pre_election(true);
request.set_candidate_term(CurrentTermUnlocked() + 1);
} else {
request.set_candidate_term(CurrentTermUnlocked());
}
request.set_tablet_id(options_.tablet_id);
*request.mutable_candidate_status()->mutable_last_received() =
queue_->GetLastOpIdInLog();
auto self = shared_from_this();
election.reset(new LeaderElection(
std::move(active_config),
// The RaftConsensus ref passed below ensures that this raw pointer
// remains safe to use for the entirety of LeaderElection's life.
peer_proxy_factory_.get(),
std::move(request), std::move(counter), timeout,
[self, reason](const ElectionResult& result) {
self->ElectionCallback(reason, result);
}));
}
// Start the election outside the lock.
election->Run();
return Status::OK();
}
Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
MonoTime deadline = MonoTime::Now() + timeout;
while (role() != consensus::RaftPeerPB::LEADER) {
if (MonoTime::Now() >= deadline) {
return Status::TimedOut(Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3",
peer_uuid(), options_.tablet_id, timeout.ToString(),
role()));
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
return Status::OK();
}
Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
DCHECK((queue_->IsInLeaderMode() && cmeta_->active_role() == RaftPeerPB::LEADER) ||
(!queue_->IsInLeaderMode() && cmeta_->active_role() != RaftPeerPB::LEADER));
RETURN_NOT_OK(CheckRunningUnlocked());
if (cmeta_->active_role() != RaftPeerPB::LEADER) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to step down while not leader";
resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
StatusToPB(Status::IllegalState("Not currently leader"),
resp->mutable_error()->mutable_status());
// We return OK so that the tablet service won't overwrite the error code.
return Status::OK();
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received request to step down";
RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1,
SKIP_FLUSH_TO_DISK));
// Snooze the failure detector for an extra leader failure timeout.
// This should ensure that a different replica is elected leader after this one steps down.
SnoozeFailureDetector(string("explicit stepdown request"), MonoDelta::FromMilliseconds(
2 * MinimumElectionTimeout().ToMilliseconds()));
return Status::OK();
}
Status RaftConsensus::TransferLeadership(const boost::optional<string>& new_leader_uuid,
LeaderStepDownResponsePB* resp) {
TRACE_EVENT0("consensus", "RaftConsensus::TransferLeadership");
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received request to transfer leadership"
<< (new_leader_uuid ?
Substitute(" to TS $0", *new_leader_uuid) :
"");
DCHECK((queue_->IsInLeaderMode() && cmeta_->active_role() == RaftPeerPB::LEADER) ||
(!queue_->IsInLeaderMode() && cmeta_->active_role() != RaftPeerPB::LEADER));
RETURN_NOT_OK(CheckRunningUnlocked());
if (cmeta_->active_role() != RaftPeerPB::LEADER) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transer leadership while not leader";
resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
StatusToPB(Status::IllegalState("not currently leader"),
resp->mutable_error()->mutable_status());
// We return OK so that the tablet service won't overwrite the error code.
return Status::OK();
}
if (new_leader_uuid) {
if (*new_leader_uuid == peer_uuid()) {
// Short-circuit as we are transferring leadership to ourselves and we
// already checked that we are leader.
return Status::OK();
}
if (!IsRaftConfigVoter(*new_leader_uuid, cmeta_->ActiveConfig())) {
const string msg = Substitute("tablet server $0 is not a voter in the active config",
*new_leader_uuid);
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transfer leadership "
<< "because " << msg;
return Status::InvalidArgument(msg);
}
}
return BeginLeaderTransferPeriodUnlocked(new_leader_uuid);
}
Status RaftConsensus::BeginLeaderTransferPeriodUnlocked(
const boost::optional<string>& successor_uuid) {
DCHECK(lock_.is_locked());
if (std::atomic_exchange(&leader_transfer_in_progress_, true)) {
return Status::ServiceUnavailable(
Substitute("leadership transfer for $0 already in progress",
options_.tablet_id));
}
queue_->BeginWatchForSuccessor(successor_uuid);
transfer_period_timer_->Start();
return Status::OK();
}
void RaftConsensus::EndLeaderTransferPeriod() {
transfer_period_timer_->Stop();
queue_->EndWatchForSuccessor();
leader_transfer_in_progress_ = false;
}
scoped_refptr<ConsensusRound> RaftConsensus::NewRound(
unique_ptr<ReplicateMsg> replicate_msg,
ConsensusReplicatedCallback replicated_cb) {
return make_scoped_refptr(new ConsensusRound(this,
std::move(replicate_msg),
std::move(replicated_cb)));
}
void RaftConsensus::ReportFailureDetectedTask() {
Status s = StartElection(FLAGS_raft_enable_pre_election ?
PRE_ELECTION : NORMAL_ELECTION, ELECTION_TIMEOUT_EXPIRED);
if (PREDICT_FALSE(!s.ok())) {
WARN_NOT_OK_EVERY_N_SECS(
s, LogPrefixThreadSafe() + "failed to trigger leader election", 10);
// Normally the failure detector would be enabled at the end of the election,
// but since the election failed to start, we must reenable explicitly.
EnableFailureDetector();
}
}
void RaftConsensus::ReportFailureDetected() {
// We're running on a timer thread; start an election on a different thread.
//
// There's no need to reenable the failure detector; if this fails, it's a
// sign that RaftConsensus has stopped and we no longer need failure detection.
auto self = shared_from_this();
Status s = raft_pool_token_->Submit([self]() { self->ReportFailureDetectedTask(); });
if (PREDICT_FALSE(!s.ok())) {
static const char* msg = "failed to submit failure detected task";
CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
WARN_NOT_OK(s, LogPrefixThreadSafe() + msg);
}
}
Status RaftConsensus::BecomeLeaderUnlocked() {
DCHECK(lock_.is_locked());
TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked();
// Disable FD while we are leader.
DisableFailureDetector();
// Don't vote for anyone if we're a leader.
withhold_votes_until_ = MonoTime::Max();
// Leadership never starts in a transfer period.
EndLeaderTransferPeriod();
queue_->RegisterObserver(this);
bool was_leader = queue_->IsInLeaderMode();
RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
if (!was_leader && server_ctx_.num_leaders) {
server_ctx_.num_leaders->Increment();
}
// Initiate a NO_OP op that is sent at the beginning of every term
// change in raft.
auto replicate = new ReplicateMsg;
replicate->set_op_type(NO_OP);
replicate->mutable_noop_request(); // Define the no-op request field.
CHECK_OK(time_manager_->AssignTimestamp(replicate));
scoped_refptr<ConsensusRound> round(
new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate))));
auto* round_raw = round.get();
round->SetConsensusReplicatedCallback(
[this, round_raw](const Status& s) {
this->NonTxRoundReplicationFinished(round_raw, &DoNothingStatusCB, s);
});
last_leader_communication_time_micros_ = 0;
RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
leader_is_ready_ = true;
return Status::OK();
}
Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta) {
DCHECK(lock_.is_locked());
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: "
<< ToStringUnlocked();
ClearLeaderUnlocked();
// Enable/disable leader failure detection if becoming VOTER/NON_VOTER replica
// correspondingly.
UpdateFailureDetectorState(std::move(fd_delta));
// Now that we're a replica, we can allow voting for other nodes.
withhold_votes_until_ = MonoTime::Min();
// Deregister ourselves from the queue. We no longer need to track what gets
// replicated since we're stepping down.
queue_->UnRegisterObserver(this);
bool was_leader = queue_->IsInLeaderMode();
queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
if (was_leader && server_ctx_.num_leaders) {
server_ctx_.num_leaders->IncrementBy(-1);
}
peer_manager_->Close();
return Status::OK();
}
Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
std::lock_guard<simple_spinlock> lock(update_lock_);
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
RETURN_NOT_OK(round->CheckBoundTerm(CurrentTermUnlocked()));
RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
}
peer_manager_->SignalRequest();
return Status::OK();
}
Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
#ifndef NDEBUG
const auto& msg = *round->replicate_msg();
DCHECK(!msg.has_id()) << "should not have an ID yet: "
<< SecureShortDebugString(msg);
#endif
// Get a snapshot of an atomic member for consistency between the condition
// and the error message, if any.
const State state = state_;
if (PREDICT_FALSE(state != kRunning)) {
return Status::IllegalState("RaftConsensus is not running",
Substitute("state $0", State_Name(state)));
}
// The order of reading role_and_term and leader_is_ready is essential to
// deal with situations when leader_is_ready and role_and_term are read
// in different Raft terms.
// * It's safe to bind to a stale term even if we've asserted leadership
// in a newer term: the stale op will be rejected on followers anyway
// because Raft guarantees that a majority of replicas have accepted
// the new term.
// * It's safe for the term to be stale if we haven't asserted leadership
// for a newer term because we'll exit out below.
// * It's unsafe to bind to a newer term if we've asserted leadership
// for a stale term, hence this ordering.
const auto role_and_term = cmeta_->GetRoleAndTerm();
const bool leader_is_ready = leader_is_ready_;
const auto role = role_and_term.first;
switch (role) {
case RaftPeerPB::LEADER:
if (leader_transfer_in_progress_) {
return Status::ServiceUnavailable("leader transfer in progress");
}
if (!leader_is_ready) {
// Leader replica should not accept write operations before scheduling
// the replication of a NO_OP to assert its leadership in the current
// term. Otherwise there might be a race, so the very first accepted
// write operation might have timestamp lower than the timestamp
// of the NO_OP. That would trigger an assertion in MVCC.
return Status::ServiceUnavailable("leader is not yet ready");
}
break;
default:
return Status::IllegalState(
Substitute("replica $0 is not leader of this config: current role $1",
peer_uuid(), RaftPeerPB::Role_Name(role)));
}
const auto term = role_and_term.second;
round->BindToTerm(term);
return Status::OK();
}
Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
DCHECK(lock_.is_locked());
*round->replicate_msg()->mutable_id() = queue_->GetNextOpId();
RETURN_NOT_OK(AddPendingOperationUnlocked(round));
// The only reasons for a bad status would be if the log itself were shut down,
// or if we had an actual IO error, which we currently don't handle.
CHECK_OK_PREPEND(queue_->AppendOperation(round->replicate_scoped_refptr()),
Substitute("$0: could not append to queue", LogPrefixUnlocked()));
return Status::OK();
}
Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) {
DCHECK(lock_.is_locked());
DCHECK(pending_);
// If we are adding a pending config change, we need to propagate it to the
// metadata.
if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
// Fill in the opid for the proposed new configuration. This has to be done
// here rather than when it's first created because we didn't yet have an
// OpId assigned at creation time.
ChangeConfigRecordPB* change_record = round->replicate_msg()->mutable_change_config_record();
change_record->mutable_new_config()->set_opid_index(round->replicate_msg()->id().index());
DCHECK(change_record->IsInitialized())
<< "change_config_record missing required fields: "
<< change_record->InitializationErrorString();
const RaftConfigPB& new_config = change_record->new_config();
if (!new_config.unsafe_config_change()) {
Status s = CheckNoConfigChangePendingUnlocked();
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndAppend(Substitute("\n New config: $0", SecureShortDebugString(new_config)));
LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
return s;
}
}
// Check if the pending Raft config has an OpId less than the committed
// config. If so, this is a replay at startup in which the COMMIT
// messages were delayed.
int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
if (round->replicate_msg()->id().index() > committed_config_opid_index) {
RETURN_NOT_OK(SetPendingConfigUnlocked(new_config));
if (cmeta_->active_role() == RaftPeerPB::LEADER) {
RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
}
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Ignoring setting pending config change with OpId "
<< round->replicate_msg()->id() << " because the committed config has OpId index "
<< committed_config_opid_index << ". The config change we are ignoring is: "
<< "Old config: { " << SecureShortDebugString(change_record->old_config()) << " }. "
<< "New config: { " << SecureShortDebugString(new_config) << " }";
}
}
return pending_->AddPendingOperation(round);
}
void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex",
"tablet", options_.tablet_id,
"commit_index", commit_index);
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
// We will process commit notifications while shutting down because a replica
// which has initiated a Prepare() / Replicate() may eventually commit even if
// its state has changed after the initial Append() / Update().
const State state = state_;
if (PREDICT_FALSE(state != kRunning && state != kStopping)) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
<< "Replica not in running state: "
<< State_Name(state);
return;
}
pending_->AdvanceCommittedIndex(commit_index);
if (cmeta_->active_role() == RaftPeerPB::LEADER) {
peer_manager_->SignalRequest(false);
}
}
void RaftConsensus::NotifyTermChange(int64_t term) {
TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange",
"tablet", options_.tablet_id,
"term", term);
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
Status s = CheckRunningUnlocked();
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term "
<< "(" << term << "): " << s.ToString();
return;
}
WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
}
void RaftConsensus::NotifyFailedFollower(const string& uuid,
int64_t term,
const string& reason) {
// Common info used in all of the log messages within this method.
string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ",
uuid, term, reason);
if (!FLAGS_evict_failed_followers) {
LOG(INFO) << LogPrefixThreadSafe() << fail_msg
<< "Eviction of failed followers is disabled. Doing nothing.";
return;
}
RaftConfigPB committed_config;
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
int64_t current_term = CurrentTermUnlocked();
if (current_term != term) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
<< "previous term " << term << ", but a leader election "
<< "likely occurred since the failure was detected. "
<< "Doing nothing.";
return;
}
if (cmeta_->has_pending_config()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "There is already a config change operation "
<< "in progress. Unable to evict follower until it completes. "
<< "Doing nothing.";
return;
}
committed_config = cmeta_->CommittedConfig();
}
// Run config change on thread pool after dropping lock.
auto self = shared_from_this();
WARN_NOT_OK(raft_pool_token_->Submit(
[self, uuid, committed_config, reason]() {
self->TryRemoveFollowerTask(uuid, committed_config, reason);
}),
LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
}
void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
// Run the config change on the raft thread pool.
auto self = shared_from_this();
WARN_NOT_OK(raft_pool_token_->Submit(
[self, peer_uuid]() { self->TryPromoteNonVoterTask(peer_uuid); }),
LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
}
void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
const auto& log_prefix = LogPrefixThreadSafe();
LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election";
auto self = shared_from_this();
WARN_NOT_OK(raft_pool_token_->Submit(
[self, peer_uuid]() { self->TryStartElectionOnPeerTask(peer_uuid); }),
log_prefix + "Unable to start TryStartElectionOnPeerTask");
}
void RaftConsensus::NotifyPeerHealthChange() {
MarkDirty("Peer health change");
}
void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
const RaftConfigPB& committed_config,
const string& reason) {
ChangeConfigRequestPB req;
req.set_tablet_id(options_.tablet_id);
req.mutable_server()->set_permanent_uuid(uuid);
req.set_type(REMOVE_PEER);
req.set_cas_config_opid_index(committed_config.opid_index());
LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower "
<< uuid << " from the Raft config. Reason: " << reason;
boost::optional<TabletServerErrorPB::Code> error_code;
WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
}
void RaftConsensus::TryPromoteNonVoterTask(const string& peer_uuid) {
string msg = Substitute("attempt to promote peer $0: ", peer_uuid);
int64_t current_committed_config_index;
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
if (cmeta_->has_pending_config()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "there is already a config change operation "
<< "in progress. Unable to promote follower until it "
<< "completes. Doing nothing.";
return;
}
// Check if the peer is still part of the current committed config.
RaftConfigPB committed_config = cmeta_->CommittedConfig();
current_committed_config_index = committed_config.opid_index();
RaftPeerPB* peer_pb;
Status s = GetRaftConfigMember(&committed_config, peer_uuid, &peer_pb);
if (!s.ok()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "can't find peer in the "
<< "current committed config: "
<< committed_config.ShortDebugString()
<< ". Doing nothing.";
return;
}
// Also check if the peer it still a NON_VOTER waiting for promotion.
if (peer_pb->member_type() != RaftPeerPB::NON_VOTER || !peer_pb->attrs().promote()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "peer is either no longer a NON_VOTER "
<< "or not marked for promotion anymore. Current "
<< "config: " << committed_config.ShortDebugString()
<< ". Doing nothing.";
return;
}
}
ChangeConfigRequestPB req;
req.set_tablet_id(options_.tablet_id);
req.set_type(MODIFY_PEER);
req.mutable_server()->set_permanent_uuid(peer_uuid);
req.mutable_server()->set_member_type(RaftPeerPB::VOTER);
req.mutable_server()->mutable_attrs()->set_promote(false);
req.set_cas_config_opid_index(current_committed_config_index);
LOG(INFO) << LogPrefixThreadSafe() << "attempting to promote NON_VOTER "
<< peer_uuid << " to VOTER";
boost::optional<TabletServerErrorPB::Code> error_code;
WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
LogPrefixThreadSafe() + Substitute("Unable to promote non-voter $0", peer_uuid));
}
void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
// Double-check that the peer is a voter in the active config.
if (!IsRaftConfigVoter(peer_uuid, cmeta_->ActiveConfig())) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not signalling peer " << peer_uuid
<< "to start an election: it's not a voter "
<< "in the active config.";
return;
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid
<< " to start an election";
WARN_NOT_OK(peer_manager_->StartElection(peer_uuid),
Substitute("unable to start election on peer $0", peer_uuid));
}
Status RaftConsensus::Update(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
++update_calls_for_tests_;
if (PREDICT_FALSE(FLAGS_follower_reject_update_consensus_requests)) {
return Status::IllegalState("Rejected: --follower_reject_update_consensus_requests "
"is set to true.");
}
response->set_responder_uuid(peer_uuid());
VLOG_WITH_PREFIX(2) << "Replica received request: " << SecureShortDebugString(*request);
// see var declaration
std::lock_guard<simple_spinlock> lock(update_lock_);
Status s = UpdateReplica(request, response);
if (PREDICT_FALSE(VLOG_IS_ON(1))) {
if (request->ops().empty()) {
VLOG_WITH_PREFIX(1)
<< Substitute("Replica replied to status only request. Replica: $0. Response: $1",
ToString(), SecureShortDebugString(*response));
}
}
return s;
}
// Helper function to check if the op is a no-op op.
static bool IsConsensusOnlyOperation(OperationType op_type) {
return op_type == NO_OP || op_type == CHANGE_CONFIG_OP;
}
Status RaftConsensus::StartFollowerOpUnlocked(const ReplicateRefPtr& msg) {
DCHECK(lock_.is_locked());
if (IsConsensusOnlyOperation(msg->get()->op_type())) {
return StartConsensusOnlyRoundUnlocked(msg);
}
if (PREDICT_FALSE(FLAGS_follower_fail_all_prepare)) {
return Status::IllegalState("Rejected: --follower_fail_all_prepare "
"is set to true.");
}
VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting op: "
<< SecureShortDebugString(msg->get()->id());
scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
ConsensusRound* round_ptr = round.get();
RETURN_NOT_OK(round_handler_->StartFollowerOp(round));
return AddPendingOperationUnlocked(round_ptr);
}
bool RaftConsensus::IsSingleVoterConfig() const {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
return cmeta_->CountVotersInConfig(COMMITTED_CONFIG) == 1 &&
cmeta_->IsVoterInConfig(peer_uuid(), COMMITTED_CONFIG);
}
string RaftConsensus::LeaderRequest::OpsRangeString() const {
string ret;
ret.reserve(100);
ret.push_back('[');
if (!messages.empty()) {
const OpId& first_op = (*messages.begin())->get()->id();
const OpId& last_op = (*messages.rbegin())->get()->id();
strings::SubstituteAndAppend(&ret, "$0.$1-$2.$3",
first_op.term(), first_op.index(),
last_op.term(), last_op.index());
}
ret.push_back(']');
return ret;
}
void RaftConsensus::DeduplicateLeaderRequestUnlocked(const ConsensusRequestPB* rpc_req,
LeaderRequest* deduplicated_req) {
DCHECK(lock_.is_locked());
// TODO(todd): use queue committed index?
int64_t last_committed_index = pending_->GetCommittedIndex();
// The leader's preceding id.
deduplicated_req->preceding_opid = &rpc_req->preceding_id();
int64_t dedup_up_to_index = queue_->GetLastOpIdInLog().index();
deduplicated_req->first_message_idx = -1;
// In this loop we discard duplicates and advance the leader's preceding id
// accordingly.
for (int i = 0; i < rpc_req->ops_size(); i++) {
const ReplicateMsg* leader_msg = &rpc_req->ops(i);
if (leader_msg->id().index() <= last_committed_index) {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
<< " (already committed)";
deduplicated_req->preceding_opid = &leader_msg->id();
continue;
}
if (leader_msg->id().index() <= dedup_up_to_index) {
// If the index is uncommitted and below our match index, then it must be in the
// pendings set.
scoped_refptr<ConsensusRound> round =
pending_->GetPendingOpByIndexOrNull(leader_msg->id().index());
DCHECK(round) << "Could not find op with index " << leader_msg->id().index()
<< " in pending set. committed= " << last_committed_index
<< " dedup=" << dedup_up_to_index;
// If the OpIds match, i.e. if they have the same term and id, then this is just
// duplicate, we skip...
if (OpIdEquals(round->replicate_msg()->id(), leader_msg->id())) {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
<< " (already replicated)";
deduplicated_req->preceding_opid = &leader_msg->id();
continue;
}
// ... otherwise we must adjust our match index, i.e. all messages from now on
// are "new"
dedup_up_to_index = leader_msg->id().index();
}
if (deduplicated_req->first_message_idx == - 1) {
deduplicated_req->first_message_idx = i;
}
deduplicated_req->messages.emplace_back(
make_scoped_refptr_replicate(new ReplicateMsg(*leader_msg)));
}
if (deduplicated_req->messages.size() != rpc_req->ops_size()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Deduplicated request from leader. Original: "
<< rpc_req->preceding_id() << "->" << OpsRangeString(*rpc_req)
<< " Dedup: " << *deduplicated_req->preceding_opid << "->"
<< deduplicated_req->OpsRangeString();
}
}
Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
DCHECK(lock_.is_locked());
// Do term checks first:
if (PREDICT_FALSE(request->caller_term() != CurrentTermUnlocked())) {
// If less, reject.
if (request->caller_term() < CurrentTermUnlocked()) {
string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
"Current term is $2. Ops: $3",
request->caller_uuid(),
request->caller_term(),
CurrentTermUnlocked(),
OpsRangeString(*request));
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
FillConsensusResponseError(response,
ConsensusErrorPB::INVALID_TERM,
Status::IllegalState(msg));
return Status::OK();
}
RETURN_NOT_OK(HandleTermAdvanceUnlocked(request->caller_term()));
}
return Status::OK();
}
Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
ConsensusResponsePB* response) {
DCHECK(lock_.is_locked());
bool term_mismatch;
if (pending_->IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
return Status::OK();
}
string error_msg = Substitute(
"Log matching property violated."
" Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)",
SecureShortDebugString(queue_->GetLastOpIdInLog()),
SecureShortDebugString(*req.preceding_opid),
term_mismatch ? "term" : "index");
FillConsensusResponseError(response,
ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH,
Status::IllegalState(error_msg));
// Adding a check to eliminate an unnecessary log message in the
// scenario where this is the first message from the Leader of a new tablet.
if (!OpIdEquals(MakeOpId(1,1), *req.preceding_opid)) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer "
<< req.leader_uuid << ": " << error_msg;
}
// If the terms mismatch we abort down to the index before the leader's preceding,
// since we know that is the last opid that has a chance of not being overwritten.
// Aborting preemptively here avoids us reporting a last received index that is
// possibly higher than the leader's causing an avoidable cache miss on the leader's
// queue.
//
// TODO: this isn't just an optimization! if we comment this out, we get
// failures on raft_consensus-itest a couple percent of the time! Should investigate
// why this is actually critical to do here, as opposed to just on requests that
// append some ops.
if (term_mismatch) {
TruncateAndAbortOpsAfterUnlocked(req.preceding_opid->index() - 1);
}
return Status::OK();
}
void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
DCHECK(lock_.is_locked());
pending_->AbortOpsAfter(truncate_after_index);
queue_->TruncateOpsAfter(truncate_after_index);
}
Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
ConsensusResponsePB* response,
LeaderRequest* deduped_req) {
DCHECK(lock_.is_locked());
if (PREDICT_FALSE(request->has_deprecated_committed_index() ||
!request->has_all_replicated_index())) {
return Status::InvalidArgument("Leader appears to be running an earlier version "
"of Kudu. Please shut down and upgrade all servers "
"before restarting.");
}
DeduplicateLeaderRequestUnlocked(request, deduped_req);
// This is an additional check for KUDU-639 that makes sure the message's index
// and term are in the right sequence in the request, after we've deduplicated
// them. We do this before we change any of the internal state.
//
// TODO move this to raft_consensus-state or whatever we transform that into.
// We should be able to do this check for each append, but right now the way
// we initialize raft_consensus-state is preventing us from doing so.
const OpId* prev = deduped_req->preceding_opid;
for (const ReplicateRefPtr& message : deduped_req->messages) {
Status s = PendingRounds::CheckOpInSequence(*prev, message->get()->id());
if (PREDICT_FALSE(!s.ok())) {
LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: "
<< s.ToString() << ". Leader Request: " << SecureShortDebugString(*request);
return s;
}
prev = &message->get()->id();
}
RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response));
if (response->status().has_error()) {
return Status::OK();
}
RETURN_NOT_OK(EnforceLogMatchingPropertyMatchesUnlocked(*deduped_req, response));
if (response->status().has_error()) {
return Status::OK();
}
// If the first of the messages to apply is not in our log, either it follows the last
// received message or it replaces some in-flight.
if (!deduped_req->messages.empty()) {
bool term_mismatch;
CHECK(!pending_->IsOpCommittedOrPending(deduped_req->messages[0]->get()->id(), &term_mismatch));
// If the index is in our log but the terms are not the same abort down to the leader's
// preceding id.
if (term_mismatch) {
TruncateAndAbortOpsAfterUnlocked(deduped_req->preceding_opid->index());
}
}
// If all of the above logic was successful then we can consider this to be
// the effective leader of the configuration. If they are not currently marked as
// the leader locally, mark them as leader now.
const string& caller_uuid = request->caller_uuid();
if (PREDICT_FALSE(HasLeaderUnlocked() &&
GetLeaderUuidUnlocked() != caller_uuid)) {
LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected new leader in same term! "
<< "Existing leader UUID: " << GetLeaderUuidUnlocked() << ", "
<< "new leader UUID: " << caller_uuid;
}
if (PREDICT_FALSE(!HasLeaderUnlocked())) {
SetLeaderUuidUnlocked(request->caller_uuid());
}
return Status::OK();
}
Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
Synchronizer log_synchronizer;
StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();
// The ordering of the following operations is crucial, read on for details.
//
// The main requirements explained in more detail below are:
//
// 1) We must enqueue the prepares before we write to our local log.
// 2) If we were able to enqueue a prepare then we must be able to log it.
// 3) If we fail to enqueue a prepare, we must not attempt to enqueue any
// later-indexed prepare or apply.
//
// See below for detailed rationale.
//
// The steps are:
//
// 0 - Split/Dedup
//
// We split the operations into replicates and commits and make sure that we don't
// don't do anything on operations we've already received in a previous call.
// This essentially makes this method idempotent.
//
// 1 - We mark as many pending ops as committed as we can.
//
// We may have some pending ops that, according to the leader, are now
// committed. We Apply them early, because:
// - Soon (step 2) we may reject the call due to excessive memory pressure. One
// way to relieve the pressure is by flushing the MRS, and applying these
// ops may unblock an in-flight Flush().
// - The Apply and subsequent Prepares (step 2) can take place concurrently.
//
// 2 - We enqueue the Prepare of the ops.
//
// The actual prepares are enqueued in order but happen asynchronously so we don't
// have decoding/acquiring locks on the critical path.
//
// We need to do this now for a number of reasons:
// - Prepares, by themselves, are inconsequential, i.e. they do not mutate the
// state machine so, were we to crash afterwards, having the prepares in-flight
// won't hurt.
// - Prepares depend on factors external to consensus (the op drivers and
// the TabletReplica) so if for some reason they cannot be enqueued we must know
// before we try write them to the WAL. Once enqueued, we assume that prepare will
// always succeed on a replica op (because the leader already prepared them
// successfully, and thus we know they are valid).
// - The prepares corresponding to every operation that was logged must be in-flight
// first. This because should we need to abort certain ops (say a new leader
// says they are not committed) we need to have those prepares in-flight so that
// the ops can be continued (in the abort path).
// - Failure to enqueue prepares is OK, we can continue and let the leader know that
// we only went so far. The leader will re-send the remaining messages.
// - Prepares represent new ops, and ops consume memory. Thus, if the
// overall memory pressure on the server is too high, we will reject the prepares.
//
// 3 - We enqueue the writes to the WAL.
//
// We enqueue writes to the WAL, but only the operations that were successfully
// enqueued for prepare (for the reasons introduced above). This means that even
// if a prepare fails to enqueue, if any of the previous prepares were successfully
// submitted they must be written to the WAL.
// If writing to the WAL fails, we're in an inconsistent state and we crash. In this
// case, no one will ever know of the ops we previously prepared so those are
// inconsequential.
//
// 4 - We mark the ops as committed.
//
// For each op which has been committed by the leader, we update the
// op state to reflect that. If the logging has already succeeded for that
// op, this will trigger the Apply phase. Otherwise, Apply will be triggered
// when the logging completes. In both cases the Apply phase executes asynchronously.
// This must, of course, happen after the prepares have been triggered as the same batch
// can both replicate/prepare and commit/apply an operation.
//
// Currently, if a prepare failed to enqueue we still trigger all applies for operations
// with an id lower than it (if we have them). This is important now as the leader will
// not re-send those commit messages. This will be moot when we move to the commit
// commitIndex way of doing things as we can simply ignore the applies as we know
// they will be triggered with the next successful batch.
//
// 5 - We wait for the writes to be durable.
//
// Before replying to the leader we wait for the writes to be durable. We then
// just update the last replicated watermark and respond.
//
// TODO - These failure scenarios need to be exercised in an unit
// test. Moreover we need to add more fault injection spots (well that
// and actually use the) for each of these steps.
// This will be done in a follow up patch.
TRACE("Updating replica for $0 ops", request->ops_size());
// The deduplicated request.
LeaderRequest deduped_req;
auto& messages = deduped_req.messages;
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RETURN_NOT_OK(CheckRunningUnlocked());
if (!cmeta_->IsMemberInConfig(peer_uuid(), ACTIVE_CONFIG)) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
}
deduped_req.leader_uuid = request->caller_uuid();
RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req));
if (response->status().has_error()) {
// We had an error, like an invalid term, we still fill the response.
FillConsensusResponseOKUnlocked(response);
return Status::OK();
}
// As soon as we decide to accept the message:
// * snooze the failure detector
// * prohibit voting for anyone for the minimum election timeout
// We are guaranteed to be acting as a FOLLOWER at this point by the above
// sanity check.
SnoozeFailureDetector();
WithholdVotes();
last_leader_communication_time_micros_ = GetMonoTimeMicros();
// Reset the 'failed_elections_since_stable_leader' metric now that we've
// accepted an update from the established leader. This is done in addition
// to the reset of the value in SetLeaderUuidUnlocked() because there is
// a potential race between resetting the failed elections count in
// SetLeaderUuidUnlocked() and incrementing after a failed election
// if another replica was elected leader in an election concurrent with
// the one called by this replica.
failed_elections_since_stable_leader_ = 0;
num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);
// We update the lag metrics here in addition to after appending to the queue so the
// metrics get updated even when the operation is rejected.
queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader());
// 1 - Early commit pending (and committed) ops
// What should we commit?
// 1. As many pending ops as we can, except...
// 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
// 3. ...the leader's committed index is always our upper bound.
const int64_t early_apply_up_to = std::min({
pending_->GetLastPendingOpOpId().index(),
deduped_req.preceding_opid->index(),
request->committed_index()});
VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to
<< ", Last pending opid index: "
<< pending_->GetLastPendingOpOpId().index()
<< ", preceding opid index: "
<< deduped_req.preceding_opid->index()
<< ", requested index: " << request->committed_index();
TRACE("Early marking committed up to index $0", early_apply_up_to);
CHECK_OK(pending_->AdvanceCommittedIndex(early_apply_up_to));
// 2 - Enqueue the prepares
TRACE("Triggering prepare for $0 ops", messages.size());
if (PREDICT_TRUE(!messages.empty())) {
// This request contains at least one message, and is likely to increase
// our memory pressure.
double capacity_pct;
if (process_memory::SoftLimitExceeded(&capacity_pct)) {
if (follower_memory_pressure_rejections_) follower_memory_pressure_rejections_->Increment();
string msg = StringPrintf(
"Soft memory limit exceeded (at %.2f%% of capacity)",
capacity_pct);
if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting consensus request: " << msg
<< THROTTLE_MSG;
} else {
KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting consensus request: " << msg
<< THROTTLE_MSG;
}
return Status::ServiceUnavailable(msg);
}
}
Status prepare_status;
auto iter = messages.begin();
while (iter != messages.end()) {
prepare_status = StartFollowerOpUnlocked(*iter);
if (PREDICT_FALSE(!prepare_status.ok())) {
break;
}
// TODO(dralves) Without leader leases this shouldn't be allowed to fail.
// Once we have that functionality we'll have to revisit this.
CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get()));
++iter;
}
// If we stopped before reaching the end we failed to prepare some message(s) and need
// to perform cleanup, namely trimming deduped_req.messages to only contain the messages
// that were actually prepared, and deleting the other ones since we've taken ownership
// when we first deduped.
if (iter != messages.end()) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << Substitute(
"Could not prepare op '$0' and following $1 ops. "
"Status for this op: $2",
(*iter)->get()->id().ShortDebugString(),
std::distance(iter, messages.end()) - 1,
prepare_status.ToString());
iter = messages.erase(iter, messages.end());
// If this is empty, it means we couldn't prepare a single de-duped message. There is nothing
// else we can do. The leader will detect this and retry later.
if (messages.empty()) {
string msg = Substitute("Rejecting Update request from peer $0 for term $1. "
"Could not prepare a single op due to: $2",
request->caller_uuid(),
request->caller_term(),
prepare_status.ToString());
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE,
Status::IllegalState(msg));
FillConsensusResponseOKUnlocked(response);
return Status::OK();
}
}
// All ops that are going to be prepared were started, advance the safe timestamp.
// TODO(dralves) This is only correct because the queue only sets safe time when the request is
// an empty heartbeat. If we actually start setting this on a consensus request along with
// actual messages we need to be careful to ignore it if any of the messages fails to prepare.
if (request->has_safe_timestamp()) {
time_manager_->AdvanceSafeTime(Timestamp(request->safe_timestamp()));
}
OpId last_from_leader;
// 3 - Enqueue the writes.
// Now that we've triggered the prepares enqueue the operations to be written
// to the WAL.
if (PREDICT_TRUE(!messages.empty())) {
last_from_leader = messages.back()->get()->id();
// Trigger the log append asap, if fsync() is on this might take a while
// and we can't reply until this is done.
//
// Since we've prepared, we need to be able to append (or we risk trying to apply
// later something that wasn't logged). We crash if we can't.
CHECK_OK(queue_->AppendOperations(messages, sync_status_cb));
} else {
last_from_leader = *deduped_req.preceding_opid;
}
// 4 - Mark ops as committed
// Choose the last operation to be applied. This will either be 'committed_index', if
// no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
// the last successfully enqueued prepare, if some prepare failed to enqueue.
int64_t apply_up_to;
if (last_from_leader.index() < request->committed_index()) {
// we should never apply anything later than what we received in this request
apply_up_to = last_from_leader.index();
VLOG_WITH_PREFIX_UNLOCKED(2) << "Received commit index "
<< request->committed_index() << " from the leader but only"
<< " marked up to " << apply_up_to << " as committed.";
} else {
apply_up_to = request->committed_index();
}
VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
TRACE("Marking committed up to $0", apply_up_to);
CHECK_OK(pending_->AdvanceCommittedIndex(apply_up_to));
queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());
// If any messages failed to be started locally, then we already have removed them
// from 'deduped_req' at this point. So, 'last_from_leader' is the last one that
// we might apply.
last_received_cur_leader_ = last_from_leader;
// Fill the response with the current state. We will not mutate anymore state until
// we actually reply to the leader, we'll just wait for the messages to be durable.
FillConsensusResponseOKUnlocked(response);
}
// Release the lock while we wait for the log append to finish so that commits can go through.
// We'll re-acquire it before we update the state again.
// Update the last replicated op id
if (!messages.empty()) {
// 5 - We wait for the writes to be durable.
// Note that this is safe because dist consensus now only supports a single outstanding
// request at a time and this way we can allow commits to proceed while we wait.
TRACE("Waiting on the replicates to finish logging");
TRACE_EVENT0("consensus", "Wait for log");
Status s;
do {
s = log_synchronizer.WaitFor(
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
// If just waiting for our log append to finish lets snooze the timer.
// We don't want to fire leader election nor accept vote requests because
// we're still processing the Raft message from the leader,
// waiting on our own log.
if (s.IsTimedOut()) {
SnoozeFailureDetector();
WithholdVotes();
}
} while (s.IsTimedOut());
RETURN_NOT_OK(s);
TRACE("finished");
}
VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString()
<< ". Request: " << SecureShortDebugString(*request);
TRACE("UpdateReplicas() finished");
return Status::OK();
}
void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
DCHECK(lock_.is_locked());
TRACE("Filling consensus response to leader.");
response->set_responder_term(CurrentTermUnlocked());
response->mutable_status()->mutable_last_received()->CopyFrom(
queue_->GetLastOpIdInLog());
response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
last_received_cur_leader_);
response->mutable_status()->set_last_committed_idx(
queue_->GetCommittedIndex());
if (PREDICT_TRUE(server_ctx_.quiescing) && server_ctx_.quiescing->load()) {
response->set_server_quiescing(true);
}
}
void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
ConsensusErrorPB::Code error_code,
const Status& status) {
ConsensusErrorPB* error = response->mutable_status()->mutable_error();
error->set_code(error_code);
StatusToPB(status, error->mutable_status());
}
Status RaftConsensus::RequestVote(const VoteRequestPB* request,
TabletVotingState tablet_voting_state,
VoteResponsePB* response) {
TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
response->set_responder_uuid(peer_uuid());
// If we've heard recently from the leader, then we should ignore the request.
// It might be from a "disruptive" server. This could happen in a few cases:
//
// 1) Network partitions
// If the leader can talk to a majority of the nodes, but is partitioned from a
// bad node, the bad node's failure detector will trigger. If the bad node is
// able to reach other nodes in the cluster, it will continuously trigger elections.
//
// 2) An abandoned node
// It's possible that a node has fallen behind the log GC mark of the leader. In that
// case, the leader will stop sending it requests. Eventually, the the configuration
// will change to eject the abandoned node, but until that point, we don't want the
// abandoned follower to disturb the other nodes.
//
// 3) Other dynamic scenarios with a stale former leader
// This is a generalization of the case 1. It's possible that a stale former
// leader detects it's not a leader anymore at some point, but a majority
// of replicas has elected a new leader already.
//
// See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf
// section 4.2.3.
if (PREDICT_TRUE(!request->ignore_live_leader()) &&
MonoTime::Now() < withhold_votes_until_) {
return RequestVoteRespondLeaderIsAlive(request, response);
}
// We must acquire the update lock in order to ensure that this vote action
// takes place between requests.
// Lock ordering: update_lock_ must be acquired before lock_.
std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock);
if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
update_guard.try_lock();
} else {
// If failure detection is not enabled, then we can't just reject the vote,
// because there will be no automatic retry later. So, block for the lock.
update_guard.lock();
}
if (!update_guard.owns_lock()) {
// There is another vote or update concurrent with the vote. In that case, that
// other request is likely to reset the timer, and we'll end up just voting
// "NO" after waiting. To avoid starving RPC handlers and causing cascading
// timeouts, just vote a quick NO.
return RequestVoteRespondIsBusy(request, response);
}
// Acquire the replica state lock so we can read / modify the consensus state.
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
// Ensure our lifecycle state is compatible with voting.
// If RaftConsensus is running, we use the latest OpId from the WAL to vote.
// Otherwise, we must be voting while tombstoned.
OpId local_last_logged_opid;
const State state = state_;
switch (state) {
case kShutdown:
return Status::IllegalState("cannot vote while shut down");
case kRunning:
// Note: it is (theoretically) possible for 'tombstone_last_logged_opid'
// to be passed in and by the time we reach here the state is kRunning.
// That may occur when a vote request comes in at the end of a tablet
// copy and then tablet bootstrap completes quickly. In that case, we
// ignore the passed-in value and use the latest OpId from our queue.
local_last_logged_opid = queue_->GetLastOpIdInLog();
break;
default:
if (!tablet_voting_state.tombstone_last_logged_opid_) {
return Status::IllegalState("must be running to vote when last-logged opid is not known");
}
if (!FLAGS_raft_enable_tombstoned_voting) {
return Status::IllegalState("must be running to vote when tombstoned voting is disabled");
}
local_last_logged_opid = *(tablet_voting_state.tombstone_last_logged_opid_);
if (tablet_voting_state.data_state_ == tablet::TABLET_DATA_COPYING) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while copying based on last-logged opid "
<< local_last_logged_opid;
} else if (tablet_voting_state.data_state_ == tablet::TABLET_DATA_TOMBSTONED) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while tombstoned based on last-logged opid "
<< local_last_logged_opid;
}
break;
}
DCHECK(local_last_logged_opid.IsInitialized());
// If the node is not in the configuration, allow the vote (this is required by Raft)
// but log an informational message anyway.
if (!cmeta_->IsMemberInConfig(request->candidate_uuid(), ACTIVE_CONFIG)) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Handling vote request from an unknown peer "
<< request->candidate_uuid();
}
// Recheck the heard-from-the-leader condition. There is a slight chance
// that a heartbeat from the leader replica registers after the first check
// in the very beginning of this method and before lock_ is acquired. This
// extra check costs a little, but it helps in avoiding extra election rounds
// and disruptive transfers of the replica leadership.
if (PREDICT_TRUE(!request->ignore_live_leader()) &&
MonoTime::Now() < withhold_votes_until_) {
return RequestVoteRespondLeaderIsAlive(request, response);
}
// Candidate is running behind.
if (request->candidate_term() < CurrentTermUnlocked()) {
return RequestVoteRespondInvalidTerm(request, response);
}
// We already voted this term.
if (request->candidate_term() == CurrentTermUnlocked() &&
HasVotedCurrentTermUnlocked()) {
// Already voted for the same candidate in the current term.
if (GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
return RequestVoteRespondVoteAlreadyGranted(request, response);
}
// Voted for someone else in current term.
return RequestVoteRespondAlreadyVotedForOther(request, response);
}
// Candidate must have last-logged OpId at least as large as our own to get
// our vote.
bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
local_last_logged_opid);
// Record the term advancement if necessary. We don't do so in the case of
// pre-elections because it's possible that the node who called the pre-election
// has actually now successfully become leader of the prior term, in which case
// bumping our term here would disrupt it.
if (!request->is_pre_election() &&
request->candidate_term() > CurrentTermUnlocked()) {
// If we are going to vote for this peer, then we will flush the consensus metadata
// to disk below when we record the vote, and we can skip flushing the term advancement
// to disk here.
auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK;
RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush),
Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
CurrentTermUnlocked(), request->candidate_term()));
}
if (!vote_yes) {
return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, response);
}
// Passed all our checks. Vote granted.
return RequestVoteRespondVoteGranted(request, response);
}
Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
StatusCallback client_cb,
boost::optional<TabletServerErrorPB::Code>* error_code) {
TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
BulkChangeConfigRequestPB bulk_req;
*bulk_req.mutable_tablet_id() = req.tablet_id();
if (req.has_dest_uuid()) {
*bulk_req.mutable_dest_uuid() = req.dest_uuid();
}
if (req.has_cas_config_opid_index()) {
bulk_req.set_cas_config_opid_index(req.cas_config_opid_index());
}
auto* change = bulk_req.add_config_changes();
if (req.has_type()) {
change->set_type(req.type());
}
if (req.has_server()) {
*change->mutable_peer() = req.server();
}
return BulkChangeConfig(bulk_req, std::move(client_cb), error_code);
}
Status RaftConsensus::BulkChangeConfig(const BulkChangeConfigRequestPB& req,
StatusCallback client_cb,
boost::optional<TabletServerErrorPB::Code>* error_code) {
TRACE_EVENT2("consensus", "RaftConsensus::BulkChangeConfig",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RETURN_NOT_OK(CheckRunningUnlocked());
RETURN_NOT_OK(CheckActiveLeaderUnlocked());
RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
// We are required by Raft to reject config change operations until we have
// committed at least one operation in our current term as leader.
// See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
if (!queue_->IsCommittedIndexInCurrentTerm()) {
return Status::IllegalState("Leader has not yet committed an operation in its own term");
}
const RaftConfigPB committed_config = cmeta_->CommittedConfig();
// Support atomic ChangeConfig requests.
if (req.has_cas_config_opid_index()) {
if (committed_config.opid_index() != req.cas_config_opid_index()) {
*error_code = TabletServerErrorPB::CAS_FAILED;
return Status::IllegalState(Substitute("Request specified cas_config_opid_index "
"of $0 but the committed config has opid_index "
"of $1",
req.cas_config_opid_index(),
committed_config.opid_index()));
}
}
// 'new_config' will be modified in-place and validated before being used
// as the new Raft configuration.
RaftConfigPB new_config = committed_config;
// Enforce the "one by one" config change rules, even with the bulk API.
// Keep track of total voters added, including non-voters promoted to
// voters, and removed, including voters demoted to non-voters.
int num_voters_modified = 0;
// A record of the peers being modified so that we can enforce only one
// change per peer per request.
unordered_set<string> peers_modified;
for (const auto& item : req.config_changes()) {
if (PREDICT_FALSE(!item.has_type())) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Must specify 'type' argument",
SecureShortDebugString(req));
}
if (PREDICT_FALSE(!item.has_peer())) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Must specify 'peer' argument",
SecureShortDebugString(req));
}
ChangeConfigType type = item.type();
const RaftPeerPB& peer = item.peer();
if (PREDICT_FALSE(!peer.has_permanent_uuid())) {
return Status::InvalidArgument("peer must have permanent_uuid specified",
SecureShortDebugString(req));
}
if (!InsertIfNotPresent(&peers_modified, peer.permanent_uuid())) {
return Status::InvalidArgument(
Substitute("only one change allowed per peer: peer $0 appears more "
"than once in the config change request",
peer.permanent_uuid()),
SecureShortDebugString(req));
}
const string& server_uuid = peer.permanent_uuid();
switch (type) {
case ADD_PEER:
// Ensure the peer we are adding is not already a member of the configuration.
if (IsRaftConfigMember(server_uuid, committed_config)) {
return Status::InvalidArgument(
Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
server_uuid, SecureShortDebugString(committed_config)));
}
if (!peer.has_member_type()) {
return Status::InvalidArgument("peer must have member_type specified",
SecureShortDebugString(req));
}
if (!peer.has_last_known_addr()) {
return Status::InvalidArgument("peer must have last_known_addr specified",
SecureShortDebugString(req));
}
if (peer.member_type() == RaftPeerPB::VOTER) {
num_voters_modified++;
}
*new_config.add_peers() = peer;
break;
case REMOVE_PEER:
if (server_uuid == peer_uuid()) {
return Status::InvalidArgument(
Substitute("Cannot remove peer $0 from the config because it is the leader. "
"Force another leader to be elected to remove this peer. "
"Consensus state: $1",
server_uuid,
SecureShortDebugString(cmeta_->ToConsensusStatePB())));
}
if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
return Status::NotFound(
Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
server_uuid, SecureShortDebugString(committed_config)));
}
if (IsRaftConfigVoter(server_uuid, committed_config)) {
num_voters_modified++;
}
break;
case MODIFY_PEER: {
RaftPeerPB* modified_peer;
RETURN_NOT_OK(GetRaftConfigMember(&new_config, server_uuid, &modified_peer));
const RaftPeerPB orig_peer(*modified_peer);
// Override 'member_type' and items within 'attrs' only if they are
// explicitly passed in the request. At least one field must be
// modified to be a valid request.
if (peer.has_member_type() && peer.member_type() != modified_peer->member_type()) {
if (modified_peer->member_type() == RaftPeerPB::VOTER ||
peer.member_type() == RaftPeerPB::VOTER) {
// This is a 'member_type' change involving a VOTER, i.e. a
// promotion or demotion.
num_voters_modified++;
}
// A leader must be forced to step down before demoting it.
if (server_uuid == peer_uuid()) {
return Status::InvalidArgument(
Substitute("Cannot modify member type of peer $0 because it is the leader. "
"Cause another leader to be elected to modify this peer. "
"Consensus state: $1",
server_uuid,
SecureShortDebugString(cmeta_->ToConsensusStatePB())));
}
modified_peer->set_member_type(peer.member_type());
}
if (peer.attrs().has_promote()) {
modified_peer->mutable_attrs()->set_promote(peer.attrs().promote());
}
if (peer.attrs().has_replace()) {
modified_peer->mutable_attrs()->set_replace(peer.attrs().replace());
}
// Ensure that MODIFY_PEER actually modified something.
if (MessageDifferencer::Equals(orig_peer, *modified_peer)) {
return Status::InvalidArgument("must modify a field when calling MODIFY_PEER");
}
break;
}
default:
return Status::NotSupported(Substitute(
"$0: unsupported type of configuration change",
ChangeConfigType_Name(type)));
}
}
// Don't allow no-op config changes to be committed.
if (MessageDifferencer::Equals(committed_config, new_config)) {
return Status::InvalidArgument("requested configuration change does not "
"actually modify the config",
SecureShortDebugString(req));
}
// Ensure this wasn't an illegal bulk change.
if (num_voters_modified > 1) {
return Status::InvalidArgument("it is not safe to modify the VOTER status "
"of more than one peer at a time",
SecureShortDebugString(req));
}
// We'll assign a new opid_index to this config change.
new_config.clear_opid_index();
RETURN_NOT_OK(ReplicateConfigChangeUnlocked(
committed_config, std::move(new_config),
[this, client_cb](const Status& s) {
this->MarkDirtyOnSuccess("Config change replication complete",
client_cb, s);
}));
} // Release lock before signaling request.
peer_manager_->SignalRequest();
return Status::OK();
}
Status RaftConsensus::UnsafeChangeConfig(
const UnsafeChangeConfigRequestPB& req,
boost::optional<tserver::TabletServerErrorPB::Code>* error_code) {
if (PREDICT_FALSE(!req.has_new_config())) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Request must contain 'new_config' argument "
"to UnsafeChangeConfig()", SecureShortDebugString(req));
}
if (PREDICT_FALSE(!req.has_caller_id())) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument("Must specify 'caller_id' argument to UnsafeChangeConfig()",
SecureShortDebugString(req));
}
// Grab the committed config and current term on this node.
int64_t current_term;
RaftConfigPB committed_config;
int64_t all_replicated_index;
int64_t last_committed_index;
OpId preceding_opid;
uint64_t msg_timestamp;
{
// Take the snapshot of the replica state and queue state so that
// we can stick them in the consensus update request later.
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
current_term = CurrentTermUnlocked();
committed_config = cmeta_->CommittedConfig();
if (cmeta_->has_pending_config()) {
LOG_WITH_PREFIX_UNLOCKED(WARNING)
<< "Replica has a pending config, but the new config "
<< "will be unsafely changed anyway. "
<< "Currently pending config on the node: "
<< SecureShortDebugString(cmeta_->PendingConfig());
}
all_replicated_index = queue_->GetAllReplicatedIndex();
last_committed_index = queue_->GetCommittedIndex();
preceding_opid = queue_->GetLastOpIdInLog();
msg_timestamp = time_manager_->GetSerialTimestamp().value();
}
// Validate that passed replica uuids are part of the committed config
// on this node. This allows a manual recovery tool to only have to specify
// the uuid of each replica in the new config without having to know the
// addresses of each server (since we can get the address information from
// the committed config). Additionally, only a subset of the committed config
// is required for typical cluster repair scenarios.
std::unordered_set<string> retained_peer_uuids;
const RaftConfigPB& config = req.new_config();
for (const RaftPeerPB& new_peer : config.peers()) {
const string& peer_uuid = new_peer.permanent_uuid();
retained_peer_uuids.insert(peer_uuid);
if (!IsRaftConfigMember(peer_uuid, committed_config)) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument(Substitute("Peer with uuid $0 is not in the committed "
"config on this replica, rejecting the "
"unsafe config change request for tablet $1. "
"Committed config: $2",
peer_uuid, req.tablet_id(),
SecureShortDebugString(committed_config)));
}
}
RaftConfigPB new_config = committed_config;
for (const auto& peer : committed_config.peers()) {
const string& peer_uuid = peer.permanent_uuid();
if (!ContainsKey(retained_peer_uuids, peer_uuid)) {
CHECK(RemoveFromRaftConfig(&new_config, peer_uuid));
}
}
// Check that local peer is part of the new config and is a VOTER.
// Although it is valid for a local replica to not have itself
// in the committed config, it is rare and a replica without itself
// in the latest config is definitely not caught up with the latest leader's log.
if (!IsRaftConfigVoter(peer_uuid(), new_config)) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
"a VOTER in the new config, "
"rejecting the unsafe config "
"change request for tablet $1. "
"Rejected config: $2" ,
peer_uuid(), req.tablet_id(),
SecureShortDebugString(new_config)));
}
new_config.set_unsafe_config_change(true);
int64_t replicate_opid_index = preceding_opid.index() + 1;
new_config.set_opid_index(replicate_opid_index);
// Sanity check the new config. 'type' is irrelevant here.
Status s = VerifyRaftConfig(new_config);
if (!s.ok()) {
*error_code = TabletServerErrorPB::INVALID_CONFIG;
return Status::InvalidArgument(Substitute("The resulting new config for tablet $0 "
"from passed parameters has failed raft "
"config sanity check: $1",
req.tablet_id(), s.ToString()));
}
// Prepare the consensus request as if the request is being generated
// from a different leader.
ConsensusRequestPB consensus_req;
consensus_req.set_caller_uuid(req.caller_id());
// Bumping up the term for the consensus request being generated.
// This makes this request appear to come from a new leader that
// the local replica doesn't know about yet. If the local replica
// happens to be the leader, this will cause it to step down.
const int64_t new_term = current_term + 1;
consensus_req.set_caller_term(new_term);
consensus_req.mutable_preceding_id()->CopyFrom(preceding_opid);
consensus_req.set_committed_index(last_committed_index);
consensus_req.set_all_replicated_index(all_replicated_index);
// Prepare the replicate msg to be replicated.
ReplicateMsg* replicate = consensus_req.add_ops();
ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record();
cc_req->set_tablet_id(req.tablet_id());
*cc_req->mutable_old_config() = committed_config;
*cc_req->mutable_new_config() = new_config;
OpId* id = replicate->mutable_id();
// Bumping up both the term and the opid_index from what's found in the log.
id->set_term(new_term);
id->set_index(replicate_opid_index);
replicate->set_op_type(CHANGE_CONFIG_OP);
replicate->set_timestamp(msg_timestamp);
VLOG_WITH_PREFIX(3) << "UnsafeChangeConfig: Generated consensus request: "
<< SecureShortDebugString(consensus_req);
LOG_WITH_PREFIX(WARNING)
<< "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, "
<< "COMMITTED CONFIG: " << SecureShortDebugString(committed_config)
<< "NEW CONFIG: " << SecureShortDebugString(new_config);
ConsensusResponsePB consensus_resp;
return Update(&consensus_req, &consensus_resp).AndThen([&consensus_resp]{
return consensus_resp.has_error()
? StatusFromPB(consensus_resp.error().status()) : Status::OK();
});
}
void RaftConsensus::Stop() {
TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
const State state = state_;
if (state == kStopping || state == kStopped || state == kShutdown) {
return;
}
// Transition to kStopping state.
SetStateUnlocked(kStopping);
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
}
// Close the peer manager.
if (peer_manager_) peer_manager_->Close();
// We must close the queue after we close the peers.
if (queue_) {
// If we were leader, decrement the number of leaders there are now.
if (queue_->IsInLeaderMode() && server_ctx_.num_leaders) {
server_ctx_.num_leaders->IncrementBy(-1);
}
queue_->Close();
}
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
if (pending_) {
CHECK_OK(pending_->CancelPendingOps());
}
SetStateUnlocked(kStopped);
// Clear leader status on Stop(), in case this replica was the leader. If
// we don't do this, the log messages still show this node as the leader.
// No need to sync it since it's not persistent state.
if (cmeta_) {
ClearLeaderUnlocked();
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
}
// If we were the leader, stop withholding votes.
auto max_time = MonoTime::Max();
withhold_votes_until_.compare_exchange_strong(max_time, MonoTime::Min());
// Shut down things that might acquire locks during destruction.
if (raft_pool_token_) raft_pool_token_->Shutdown();
if (failure_detector_) DisableFailureDetector();
}
void RaftConsensus::Shutdown() {
// Avoid taking locks if already shut down so we don't violate
// ThreadRestrictions assertions in the case where the RaftConsensus
// destructor runs on the reactor thread due to an election callback being
// the last outstanding reference.
if (shutdown_) {
return;
}
Stop();
{
LockGuard l(lock_);
SetStateUnlocked(kShutdown);
}
shutdown_ = true;
}
Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg) {
DCHECK(lock_.is_locked());
OperationType op_type = msg->get()->op_type();
CHECK(IsConsensusOnlyOperation(op_type))
<< "Expected a consensus-only op type, got " << OperationType_Name(op_type)
<< ": " << SecureShortDebugString(*msg->get());
VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting consensus round: "
<< SecureShortDebugString(msg->get()->id());
auto client_cb = [this](const Status& s) {
this->MarkDirtyOnSuccess("Replicated consensus-only round",
&DoNothingStatusCB, s);
};
scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
auto* round_raw = round.get();
round->SetConsensusReplicatedCallback(
[this, round_raw, client_cb](const Status& s) {
this->NonTxRoundReplicationFinished(round_raw, client_cb, s);
});
return AddPendingOperationUnlocked(round);
}
Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
CHECK_OK(CheckRunningUnlocked());
return HandleTermAdvanceUnlocked(new_term);
}
string RaftConsensus::GetRequestVoteLogPrefixThreadSafe(const VoteRequestPB& request) const {
return Substitute("$0Leader $1election vote request",
LogPrefixThreadSafe(),
request.is_pre_election() ? "pre-" : "");
}
string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
DCHECK(lock_.is_locked());
return Substitute("$0Leader $1election vote request",
LogPrefixUnlocked(),
request.is_pre_election() ? "pre-" : "");
}
void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
response->set_responder_term(CurrentTermUnlocked());
response->set_vote_granted(true);
}
void RaftConsensus::FillVoteResponseVoteDenied(
ConsensusErrorPB::Code error_code,
VoteResponsePB* response,
ResponderTermPolicy responder_term_policy) {
response->set_vote_granted(false);
response->mutable_consensus_error()->set_code(error_code);
if (responder_term_policy == ResponderTermPolicy::SET) {
response->set_responder_term(CurrentTermUnlocked());
}
}
Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request,
VoteResponsePB* response) {
FillVoteResponseVoteDenied(ConsensusErrorPB::INVALID_TERM, response);
string msg = Substitute("$0: Denying vote to candidate $1 for earlier term $2. "
"Current term is $3.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
request->candidate_term(),
CurrentTermUnlocked());
LOG(INFO) << msg;
StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
return Status::OK();
}
Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request,
VoteResponsePB* response) {
FillVoteResponseVoteGranted(response);
LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. "
"Re-sending same reply.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
request->candidate_term());
return Status::OK();
}
Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request,
VoteResponsePB* response) {
FillVoteResponseVoteDenied(ConsensusErrorPB::ALREADY_VOTED, response);
string msg = Substitute("$0: Denying vote to candidate $1 in current term $2: "
"Already voted for candidate $3 in this term.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
CurrentTermUnlocked(),
GetVotedForCurrentTermUnlocked());
LOG(INFO) << msg;
StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
return Status::OK();
}
Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpId& local_last_logged_opid,
const VoteRequestPB* request,
VoteResponsePB* response) {
FillVoteResponseVoteDenied(ConsensusErrorPB::LAST_OPID_TOO_OLD, response);
string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
"replica has last-logged OpId of $3, which is greater than that of the "
"candidate, which has last-logged OpId of $4.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
request->candidate_term(),
SecureShortDebugString(local_last_logged_opid),
SecureShortDebugString(request->candidate_status().last_received()));
LOG(INFO) << msg;
StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
return Status::OK();
}
Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request,
VoteResponsePB* response) {
FillVoteResponseVoteDenied(ConsensusErrorPB::LEADER_IS_ALIVE, response,
ResponderTermPolicy::DO_NOT_SET);
auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
"replica is either leader or believes a valid leader to "
"be alive.",
GetRequestVoteLogPrefixThreadSafe(*request),
request->candidate_uuid(),
request->candidate_term());
VLOG(1) << msg;
StatusToPB(Status::InvalidArgument(msg),
response->mutable_consensus_error()->mutable_status());
return Status::OK();
}
Status RaftConsensus::RequestVoteRespondIsBusy(
const VoteRequestPB* request, VoteResponsePB* response) {
// Don't set the term in the response: the requestor doesn't need it
// to process the NO vote response in this case.
FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response,
ResponderTermPolicy::DO_NOT_SET);
auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
"replica is already servicing an update from "
"a current leader or another vote",
GetRequestVoteLogPrefixThreadSafe(*request),
request->candidate_uuid(),
request->candidate_term());
VLOG(1) << msg;
StatusToPB(Status::ServiceUnavailable(msg),
response->mutable_consensus_error()->mutable_status());
return Status::OK();
}
Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request,
VoteResponsePB* response) {
// We know our vote will be "yes", so avoid triggering an election while we
// persist our vote to disk. We use an exponential backoff to avoid too much
// split-vote contention when nodes display high latencies.
MonoDelta backoff = LeaderElectionExpBackoffDeltaUnlocked();
SnoozeFailureDetector(string("vote granted"), backoff);
if (!request->is_pre_election()) {
// Persist our vote to disk.
RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
}
FillVoteResponseVoteGranted(response);
// Give peer time to become leader. Snooze one more time after persisting our
// vote. When disk latency is high, this should help reduce churn.
SnoozeFailureDetector(/*reason_for_log=*/boost::none, backoff);
LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
CurrentTermUnlocked());
return Status::OK();
}
RaftPeerPB::Role RaftConsensus::role() const {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
return cmeta_->active_role();
}
RaftConsensus::RoleAndMemberType RaftConsensus::GetRoleAndMemberType() const {
ThreadRestrictions::AssertWaitAllowed();
auto member_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
const auto& local_peer_uuid = peer_uuid();
LockGuard l(lock_);
for (const auto& peer : cmeta_->ActiveConfig().peers()) {
if (peer.permanent_uuid() == local_peer_uuid) {
member_type = peer.member_type();
break;
}
}
return std::make_pair(cmeta_->active_role(), member_type);
}
int64_t RaftConsensus::CurrentTerm() const {
LockGuard l(lock_);
return CurrentTermUnlocked();
}
void RaftConsensus::SetStateUnlocked(State new_state) {
DCHECK(lock_.is_locked());
switch (new_state) {
case kInitialized:
CHECK_EQ(kNew, state_);
break;
case kRunning:
CHECK_EQ(kInitialized, state_);
break;
case kStopping:
CHECK(state_ != kStopped && state_ != kShutdown) << "State = " << State_Name(state_);
break;
case kStopped:
CHECK_EQ(kStopping, state_);
break;
case kShutdown:
CHECK(state_ == kStopped || state_ == kShutdown) << "State = " << State_Name(state_);
break;
default:
LOG(FATAL) << "Disallowed transition to state = " << State_Name(new_state);
break;
}
state_ = new_state;
}
const char* RaftConsensus::State_Name(State state) {
switch (state) {
case kNew:
return "New";
case kInitialized:
return "Initialized";
case kRunning:
return "Running";
case kStopping:
return "Stopping";
case kStopped:
return "Stopped";
case kShutdown:
return "Shut down";
default:
LOG(DFATAL) << "Unknown State value: " << state;
return "Unknown";
}
}
MonoDelta RaftConsensus::MinimumElectionTimeout() {
int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
FLAGS_raft_heartbeat_interval_ms;
return MonoDelta::FromMilliseconds(failure_timeout);
}
void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
DCHECK(lock_.is_locked());
failed_elections_since_stable_leader_ = 0;
num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);
cmeta_->set_leader_uuid(uuid);
MarkDirty(Substitute("New leader $0", uuid));
}
Status RaftConsensus::ReplicateConfigChangeUnlocked(
RaftConfigPB old_config,
RaftConfigPB new_config,
StatusCallback client_cb) {
DCHECK(lock_.is_locked());
auto cc_replicate = new ReplicateMsg();
cc_replicate->set_op_type(CHANGE_CONFIG_OP);
ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
cc_req->set_tablet_id(options_.tablet_id);
*cc_req->mutable_old_config() = std::move(old_config);
*cc_req->mutable_new_config() = std::move(new_config);
CHECK_OK(time_manager_->AssignTimestamp(cc_replicate));
scoped_refptr<ConsensusRound> round(
new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(cc_replicate))));
auto* round_raw = round.get();
round->SetConsensusReplicatedCallback(
[this, round_raw, client_cb](const Status& s) {
this->NonTxRoundReplicationFinished(round_raw, client_cb, s);
});
return AppendNewRoundToQueueUnlocked(round);
}
Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
DCHECK(lock_.is_locked());
DCHECK_EQ(RaftPeerPB::LEADER, cmeta_->active_role());
const RaftConfigPB& active_config = cmeta_->ActiveConfig();
// Change the peers so that we're able to replicate messages remotely and
// locally. The peer manager must be closed before updating the active config
// in the queue -- when the queue is in LEADER mode, it checks that all
// registered peers are a part of the active config.
peer_manager_->Close();
// TODO(todd): should use queue committed index here? in that case do
// we need to pass it in at all?
queue_->SetLeaderMode(pending_->GetCommittedIndex(),
CurrentTermUnlocked(),
active_config);
return peer_manager_->UpdateRaftConfig(active_config);
}
const string& RaftConsensus::peer_uuid() const {
return local_peer_pb_.permanent_uuid();
}
const string& RaftConsensus::tablet_id() const {
return options_.tablet_id;
}
Status RaftConsensus::ConsensusState(ConsensusStatePB* cstate,
IncludeHealthReport report_health) const {
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(lock_);
if (state_ == kShutdown) {
return Status::IllegalState("Tablet replica is shutdown");
}
ConsensusStatePB cstate_tmp = cmeta_->ToConsensusStatePB();
// If we need to include the health report, merge it into the committed
// config iff we believe we are the current leader of the config.
if (report_health == INCLUDE_HEALTH_REPORT &&
cmeta_->active_role() == RaftPeerPB::LEADER) {
auto reports = queue_->ReportHealthOfPeers();
// We don't need to access the queue anymore, so drop the consensus lock.
l.unlock();
// Iterate through each peer in the committed config and attach the health
// report to it.
RaftConfigPB* committed_raft_config = cstate_tmp.mutable_committed_config();
for (int i = 0; i < committed_raft_config->peers_size(); i++) {
RaftPeerPB* peer = committed_raft_config->mutable_peers(i);
const HealthReportPB* report = FindOrNull(reports, peer->permanent_uuid());
if (!report) continue; // Only attach details if we know about the peer.
*peer->mutable_health_report() = *report;
}
}
*cstate = std::move(cstate_tmp);
return Status::OK();
}
RaftConfigPB RaftConsensus::CommittedConfig() const {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
return cmeta_->CommittedConfig();
}
void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
if (state_ != kRunning) {
out << "Tablet " << EscapeForHtmlToString(tablet_id())
<< " not running" << std::endl;
return;
}
const RaftPeerPB::Role role = cmeta_->GetRoleAndTerm().first;
out << "<h1>Raft Consensus State</h1>" << std::endl;
out << "<h2>State</h2>" << std::endl;
out << "<pre>" << EscapeForHtmlToString(ToString()) << "</pre>" << std::endl;
out << "<h2>Queue</h2>" << std::endl;
out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
// Dump the queues on a leader.
if (role == RaftPeerPB::LEADER) {
out << "<h2>Queue overview</h2>" << std::endl;
out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
out << "<hr/>" << std::endl;
out << "<h2>Queue details</h2>" << std::endl;
queue_->DumpToHtml(out);
}
}
void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult& result) {
// We're running on a reactor thread; service the callback on another thread.
//
// There's no need to reenable the failure detector; if this fails, it's a
// sign that RaftConsensus has stopped and we no longer need failure detection.
auto self = shared_from_this();
Status s = raft_pool_token_->Submit([=]() { self->DoElectionCallback(reason, result); });
if (!s.ok()) {
static const char* msg = "unable to run election callback";
CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
WARN_NOT_OK(s, LogPrefixThreadSafe() + msg);
}
}
void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResult& result) {
const int64_t election_term = result.vote_request.candidate_term();
const bool was_pre_election = result.vote_request.is_pre_election();
const char* election_type = was_pre_election ? "pre-election" : "election";
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(lock_);
Status s = CheckRunningUnlocked();
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << " callback for term "
<< election_term << " while not running: "
<< s.ToString();
return;
}
// If this election was not triggered by the failure detector, the fd may
// still be enabled and needs to be snoozed, both if we win and lose:
// - When we win because we're about to disable it and become leader.
// - When we lose or otherwise we can fall into a cycle, where everyone keeps
// triggering elections but no election ever completes because by the time they
// finish another one is triggered already.
//
// This is a no-op if the failure detector is disabled..
MonoDelta snooze_delta = LeaderElectionExpBackoffDeltaUnlocked();
SnoozeFailureDetector(string("election complete"), snooze_delta);
auto enable_fd = MakeScopedCleanup([&]() {
// The failure detector was disabled if it triggered this election. Reenable
// it when exiting this function.
if (reason == ELECTION_TIMEOUT_EXPIRED) {
EnableFailureDetector(snooze_delta);
}
});
if (result.decision == VOTE_DENIED) {
failed_elections_since_stable_leader_++;
num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);
// If we called an election and one of the voters had a higher term than we did,
// we should bump our term before we potentially try again. This is particularly
// important with pre-elections to avoid getting "stuck" in a case like:
// Peer A: has ops through 1.10, term = 2, voted in term 2 for peer C
// Peer B: has ops through 1.15, term = 1
// In this case, Peer B will reject peer A's pre-elections for term 3 because
// the local log is longer. Peer A will reject B's pre-elections for term 2
// because it already voted in term 2. The check below ensures that peer B
// will bump to term 2 when it gets the vote rejection, such that its
// next pre-election (for term 3) would succeed.
if (result.highest_voter_term > CurrentTermUnlocked()) {
HandleTermAdvanceUnlocked(result.highest_voter_term);
}
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Leader " << election_type << " lost for term " << election_term
<< ". Reason: "
<< (!result.message.empty() ? result.message : "None given");
return;
}
// In a pre-election, we collected votes for the _next_ term.
// So, we need to adjust our expectations of what the current term should be.
int64_t election_started_in_term = election_term;
if (was_pre_election) {
election_started_in_term--;
}
if (election_started_in_term != CurrentTermUnlocked()) {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Leader " << election_type << " decision vote started in "
<< "defunct term " << election_started_in_term << ": won";
return;
}
if (!cmeta_->IsVoterInConfig(peer_uuid(), ACTIVE_CONFIG)) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type
<< " decision while not in active config. "
<< "Result: Term " << election_term
<< ": won. RaftConfig: "
<< SecureShortDebugString(cmeta_->ActiveConfig());
return;
}
// At this point we're either already leader, we're going to become leader,
// or we're going to run a real election.
//
// In all cases, do not reenable the failure detector.
enable_fd.cancel();
if (cmeta_->active_role() == RaftPeerPB::LEADER) {
// If this was a pre-election, it's possible to see the following interleaving:
//
// 1. Term N (follower): send a real election for term N.
// 2. An externally-triggered election is started.
// 3. Term N (follower): send a pre-election for term N+1.
// 4. Election callback for real election from term N completes.
// Peer is now leader for term N.
// 5. Pre-election callback from term N+1 completes, even though
// we are currently a leader of term N.
// In this case, we should just ignore the pre-election, since we're
// happily the leader of the prior term.
if (was_pre_election) return;
LOG_WITH_PREFIX_UNLOCKED(DFATAL)
<< "Leader " << election_type << " callback while already leader! "
<< "Result: Term " << election_term << ": won";
return;
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Leader " << election_type << " won for term " << election_term;
if (was_pre_election) {
// We just won the pre-election. So, we need to call a real election.
l.unlock();
WARN_NOT_OK_EVERY_N_SECS(StartElection(NORMAL_ELECTION, reason),
"Couldn't start leader election after successful pre-election", 10);
} else {
// We won a real election. Convert role to LEADER.
SetLeaderUuidUnlocked(peer_uuid());
// TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown.
// It races with the above state check.
// This could be a problem during tablet deletion.
CHECK_OK(BecomeLeaderUnlocked());
}
}
boost::optional<OpId> RaftConsensus::GetLastOpId(OpIdType type) {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
return GetLastOpIdUnlocked(type);
}
boost::optional<OpId> RaftConsensus::GetLastOpIdUnlocked(OpIdType type) {
// Return early if this method is called on an instance of RaftConsensus that
// has not yet been started, failed during Init(), or failed during Start().
if (!queue_ || !pending_) return boost::none;
switch (type) {
case RECEIVED_OPID:
return queue_->GetLastOpIdInLog();
case COMMITTED_OPID:
return MakeOpId(pending_->GetTermWithLastCommittedOp(),
pending_->GetCommittedIndex());
default:
LOG(DFATAL) << LogPrefixUnlocked() << "Invalid OpIdType " << type;
return boost::none;
}
}
log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
// Grab the watermarks from the queue. It's OK to fetch these two watermarks
// separately -- the worst case is we see a relatively "out of date" watermark
// which just means we'll retain slightly more than necessary in this invocation
// of log GC.
return log::RetentionIndexes(queue_->GetCommittedIndex(), // for durability
queue_->GetAllReplicatedIndex()); // for peers
}
void RaftConsensus::MarkDirty(const string& reason) {
WARN_NOT_OK(raft_pool_token_->Submit([=]() { this->mark_dirty_clbk_(reason); }),
LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
}
void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
const StatusCallback& client_cb,
const Status& status) {
if (PREDICT_TRUE(status.ok())) {
MarkDirty(reason);
}
client_cb(status);
}
void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
const StatusCallback& client_cb,
const Status& status) {
// NOTE: lock_ is held here because this is triggered by
// PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex().
DCHECK(lock_.is_locked());
OperationType op_type = round->replicate_msg()->op_type();
const string& op_type_str = OperationType_Name(op_type);
CHECK(IsConsensusOnlyOperation(op_type)) << "Unexpected op type: " << op_type_str;
if (op_type == CHANGE_CONFIG_OP) {
CompleteConfigChangeRoundUnlocked(round, status);
// Fall through to the generic handling.
}
// TODO(mpercy): May need some refactoring to unlock 'lock_' before invoking
// the client callback.
if (!status.ok()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: "
<< status.ToString();
client_cb(status);
return;
}
VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
<< round->id();
round_handler_->FinishConsensusOnlyRound(round);
CommitMsg commit_msg;
commit_msg.set_op_type(round->replicate_msg()->op_type());
*commit_msg.mutable_commited_op_id() = round->id();
CHECK_OK(log_->AsyncAppendCommit(
commit_msg, [](const Status& s) {
CrashIfNotOkStatusCB("Enqueued commit operation failed to write to WAL", s);
}));
client_cb(status);
}
void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) {
DCHECK(lock_.is_locked());
const OpId& op_id = round->replicate_msg()->id();
if (!status.ok()) {
// If the config change being aborted is the current pending one, abort it.
if (cmeta_->has_pending_config() &&
cmeta_->GetConfigOpIdIndex(PENDING_CONFIG) == op_id.index()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId "
<< op_id << ": " << status.ToString();
cmeta_->clear_pending_config();
// Disable leader failure detection if transitioning from VOTER to
// NON_VOTER and vice versa.
UpdateFailureDetectorState();
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Skipping abort of non-pending config change with OpId "
<< op_id << ": " << status.ToString();
}
// It's possible to abort a config change which isn't the pending one in the following
// sequence:
// - replicate a config change
// - it gets committed, so we write the new config to disk as the Committed configuration
// - we crash before the COMMIT message hits the WAL
// - we restart the server, and the config change is added as a pending round again,
// but isn't set as Pending because it's already committed.
// - we delete the tablet before committing it
// See KUDU-1735.
return;
}
// Commit the successful config change.
DCHECK(round->replicate_msg()->change_config_record().has_old_config());
DCHECK(round->replicate_msg()->change_config_record().has_new_config());
const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
DCHECK(old_config.has_opid_index());
DCHECK(new_config.has_opid_index());
// Check if the pending Raft config has an OpId less than the committed
// config. If so, this is a replay at startup in which the COMMIT
// messages were delayed.
int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
if (new_config.opid_index() > committed_config_opid_index) {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Committing config change with OpId "
<< op_id << ": "
<< DiffRaftConfigs(old_config, new_config)
<< ". New config: { " << SecureShortDebugString(new_config) << " }";
CHECK_OK(SetCommittedConfigUnlocked(new_config));
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Ignoring commit of config change with OpId "
<< op_id << " because the committed config has OpId index "
<< committed_config_opid_index << ". The config change we are ignoring is: "
<< "Old config: { " << SecureShortDebugString(old_config) << " }. "
<< "New config: { " << SecureShortDebugString(new_config) << " }";
}
}
void RaftConsensus::EnableFailureDetector(boost::optional<MonoDelta> delta) {
if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
failure_detector_->Start(std::move(delta));
}
}
void RaftConsensus::DisableFailureDetector() {
if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
failure_detector_->Stop();
}
}
void RaftConsensus::UpdateFailureDetectorState(boost::optional<MonoDelta> delta) {
DCHECK(lock_.is_locked());
const auto& uuid = peer_uuid();
if (uuid != cmeta_->leader_uuid() &&
cmeta_->IsVoterInConfig(uuid, ACTIVE_CONFIG)) {
// A voter that is not the leader should run the failure detector.
EnableFailureDetector(std::move(delta));
} else {
// Otherwise, the local peer should not start leader elections
// (e.g. if it is the leader, a non-voter, a non-participant, etc).
DisableFailureDetector();
}
}
void RaftConsensus::SnoozeFailureDetector(boost::optional<string> reason_for_log,
boost::optional<MonoDelta> delta) {
if (PREDICT_TRUE(failure_detector_ && FLAGS_enable_leader_failure_detection)) {
if (reason_for_log) {
VLOG(1) << LogPrefixThreadSafe()
<< Substitute("Snoozing failure detection for $0 ($1)",
delta ? delta->ToString() : "election timeout",
*reason_for_log);
}
if (!delta) {
delta = MinimumElectionTimeout();
}
failure_detector_->Snooze(std::move(delta));
}
}
void RaftConsensus::WithholdVotes() {
MonoTime prev = withhold_votes_until_;
MonoTime next = MonoTime::Now() + MinimumElectionTimeout();
do {
if (prev == MonoTime::Max()) {
// Maximum withholding time already. It might be the case if replica
// has become a leader already.
break;
}
next = MonoTime::Now() + MinimumElectionTimeout();
} while (!withhold_votes_until_.compare_exchange_weak(prev, next));
}
MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
DCHECK(lock_.is_locked());
// Compute a backoff factor based on how many leader elections have
// failed since a stable leader was last seen.
double backoff_factor = pow(1.5, failed_elections_since_stable_leader_ + 1);
double min_timeout = MinimumElectionTimeout().ToMilliseconds();
double max_timeout = std::min<double>(
min_timeout * backoff_factor,
FLAGS_leader_failure_exp_backoff_max_delta_ms);
// Randomize the timeout between the minimum and the calculated value.
// We do this after the above capping to the max. Otherwise, after a
// churny period, we'd end up highly likely to backoff exactly the max
// amount.
double timeout = min_timeout + (max_timeout - min_timeout) * rng_.NextDoubleFraction();
DCHECK_GE(timeout, min_timeout);
return MonoDelta::FromMilliseconds(timeout);
}
Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
FlushToDisk flush) {
DCHECK(lock_.is_locked());
if (new_term <= CurrentTermUnlocked()) {
return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
new_term, CurrentTermUnlocked()));
}
if (cmeta_->active_role() == RaftPeerPB::LEADER) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
<< CurrentTermUnlocked();
RETURN_NOT_OK(BecomeReplicaUnlocked());
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush));
if (term_metric_) {
term_metric_->set_value(new_term);
}
last_received_cur_leader_ = MinimumOpId();
return Status::OK();
}
Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const {
DCHECK(lock_.is_locked());
DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
RETURN_NOT_OK(CheckRunningUnlocked());
return CheckActiveLeaderUnlocked();
}
Status RaftConsensus::CheckRunningUnlocked() const {
DCHECK(lock_.is_locked());
if (PREDICT_FALSE(state_ != kRunning)) {
return Status::IllegalState("RaftConsensus is not running",
Substitute("State = $0", State_Name(state_)));
}
return Status::OK();
}
Status RaftConsensus::CheckActiveLeaderUnlocked() const {
DCHECK(lock_.is_locked());
RaftPeerPB::Role role = cmeta_->active_role();
switch (role) {
case RaftPeerPB::LEADER:
// Check for the consistency of the information in the consensus metadata
// and the state of the consensus queue.
DCHECK(queue_->IsInLeaderMode());
if (leader_transfer_in_progress_) {
return Status::ServiceUnavailable("leader transfer in progress");
}
return Status::OK();
default:
// Check for the consistency of the information in the consensus metadata
// and the state of the consensus queue.
DCHECK(!queue_->IsInLeaderMode());
return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. "
"Consensus state: $2",
peer_uuid(),
RaftPeerPB::Role_Name(role),
SecureShortDebugString(cmeta_->ToConsensusStatePB())));
}
}
Status RaftConsensus::CheckNoConfigChangePendingUnlocked() const {
DCHECK(lock_.is_locked());
if (cmeta_->has_pending_config()) {
return Status::IllegalState(
Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
" Committed config: $0.\n Pending config: $1",
SecureShortDebugString(cmeta_->CommittedConfig()),
SecureShortDebugString(cmeta_->PendingConfig())));
}
return Status::OK();
}
Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
DCHECK(lock_.is_locked());
RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config),
"Invalid config to set as pending");
if (!new_config.unsafe_config_change()) {
CHECK(!cmeta_->has_pending_config())
<< "Attempt to set pending config while another is already pending! "
<< "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; "
<< "Attempted new pending config: " << SecureShortDebugString(new_config);
} else if (cmeta_->has_pending_config()) {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Allowing unsafe config change even though there is a pending config! "
<< "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; "
<< "New pending config: " << SecureShortDebugString(new_config);
}
cmeta_->set_pending_config(new_config);
UpdateFailureDetectorState();
return Status::OK();
}
Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
TRACE_EVENT0("consensus", "RaftConsensus::SetCommittedConfigUnlocked");
DCHECK(lock_.is_locked());
DCHECK(config_to_commit.IsInitialized());
RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config_to_commit),
"Invalid config to set as committed");
// Compare committed with pending configuration, ensure that they are the same.
// In the event of an unsafe config change triggered by an administrator,
// it is possible that the config being committed may not match the pending config
// because unsafe config change allows multiple pending configs to exist.
// Therefore we only need to validate that 'config_to_commit' matches the pending config
// if the pending config does not have its 'unsafe_config_change' flag set.
if (cmeta_->has_pending_config()) {
RaftConfigPB pending_config = cmeta_->PendingConfig();
if (!pending_config.unsafe_config_change()) {
// Quorums must be exactly equal, even w.r.t. peer ordering.
CHECK_EQ(pending_config.SerializeAsString(),
config_to_commit.SerializeAsString())
<< Substitute("New committed config must equal pending config, but does not. "
"Pending config: $0, committed config: $1",
SecureShortDebugString(pending_config),
SecureShortDebugString(config_to_commit));
}
}
cmeta_->set_committed_config(config_to_commit);
cmeta_->clear_pending_config();
CHECK_OK(cmeta_->Flush());
return Status::OK();
}
Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
FlushToDisk flush) {
TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
"term", new_term);
DCHECK(lock_.is_locked());
if (PREDICT_FALSE(new_term <= CurrentTermUnlocked())) {
return Status::IllegalState(
Substitute("Cannot change term to a term that is lower than or equal to the current one. "
"Current: $0, Proposed: $1", CurrentTermUnlocked(), new_term));
}
cmeta_->set_current_term(new_term);
cmeta_->clear_voted_for();
if (flush == FLUSH_TO_DISK) {
CHECK_OK(cmeta_->Flush());
}
ClearLeaderUnlocked();
return Status::OK();
}
int64_t RaftConsensus::CurrentTermUnlocked() const {
DCHECK(lock_.is_locked());
return cmeta_->current_term();
}
string RaftConsensus::GetLeaderUuidUnlocked() const {
DCHECK(lock_.is_locked());
return cmeta_->leader_uuid();
}
bool RaftConsensus::HasLeaderUnlocked() const {
DCHECK(lock_.is_locked());
return !GetLeaderUuidUnlocked().empty();
}
void RaftConsensus::ClearLeaderUnlocked() {
DCHECK(lock_.is_locked());
leader_is_ready_ = false;
cmeta_->set_leader_uuid("");
}
bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
DCHECK(lock_.is_locked());
return cmeta_->has_voted_for();
}
Status RaftConsensus::SetVotedForCurrentTermUnlocked(const string& uuid) {
TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked",
"uuid", uuid);
DCHECK(lock_.is_locked());
cmeta_->set_voted_for(uuid);
CHECK_OK(cmeta_->Flush());
return Status::OK();
}
const string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
DCHECK(lock_.is_locked());
DCHECK(cmeta_->has_voted_for());
return cmeta_->voted_for();
}
const ConsensusOptions& RaftConsensus::GetOptions() const {
return options_;
}
string RaftConsensus::LogPrefix() const {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
return LogPrefixUnlocked();
}
string RaftConsensus::LogPrefixUnlocked() const {
DCHECK(lock_.is_locked());
// 'cmeta_' may not be set if initialization failed.
string cmeta_info;
if (cmeta_) {
cmeta_info = Substitute(" [term $0 $1]",
cmeta_->current_term(),
RaftPeerPB::Role_Name(cmeta_->active_role()));
}
return Substitute("T $0 P $1$2: ", options_.tablet_id, peer_uuid(), cmeta_info);
}
string RaftConsensus::LogPrefixThreadSafe() const {
return Substitute("T $0 P $1: ",
options_.tablet_id,
peer_uuid());
}
string RaftConsensus::ToString() const {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
return ToStringUnlocked();
}
string RaftConsensus::ToStringUnlocked() const {
DCHECK(lock_.is_locked());
return Substitute("Replica: $0, State: $1, Role: $2",
peer_uuid(), State_Name(state_), RaftPeerPB::Role_Name(cmeta_->active_role()));
}
int64_t RaftConsensus::MetadataOnDiskSize() const {
return cmeta_->on_disk_size();
}
ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
return cmeta_.get();
}
int64_t RaftConsensus::GetMillisSinceLastLeaderHeartbeat() const {
return last_leader_communication_time_micros_ == 0 ?
0 : (GetMonoTimeMicros() - last_leader_communication_time_micros_) / 1000;
}
////////////////////////////////////////////////////////////////////////
// ConsensusBootstrapInfo
////////////////////////////////////////////////////////////////////////
ConsensusBootstrapInfo::ConsensusBootstrapInfo()
: last_id(MinimumOpId()),
last_committed_id(MinimumOpId()) {
}
ConsensusBootstrapInfo::~ConsensusBootstrapInfo() {
STLDeleteElements(&orphaned_replicates);
}
////////////////////////////////////////////////////////////////////////
// ConsensusRound
////////////////////////////////////////////////////////////////////////
ConsensusRound::ConsensusRound(RaftConsensus* consensus,
unique_ptr<ReplicateMsg> replicate_msg,
ConsensusReplicatedCallback replicated_cb)
: consensus_(consensus),
replicate_msg_(new RefCountedReplicate(replicate_msg.release())),
replicated_cb_(std::move(replicated_cb)),
bound_term_(-1) {}
ConsensusRound::ConsensusRound(RaftConsensus* consensus,
ReplicateRefPtr replicate_msg)
: consensus_(consensus),
replicate_msg_(std::move(replicate_msg)),
bound_term_(-1) {
DCHECK(replicate_msg_);
}
void ConsensusRound::NotifyReplicationFinished(const Status& status) {
if (PREDICT_FALSE(!replicated_cb_)) return;
replicated_cb_(status);
}
Status ConsensusRound::CheckBoundTerm(int64_t current_term) const {
if (PREDICT_FALSE(bound_term_ != -1 &&
bound_term_ != current_term)) {
return Status::Aborted(
strings::Substitute(
"Op submitted in term $0 cannot be replicated in term $1",
bound_term_, current_term));
}
return Status::OK();
}
} // namespace consensus
} // namespace kudu