blob: b390188c895f7d2a203bf5e464f64283320fde25 [file] [log] [blame]
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::prelude::v1::*;
use std::cmp;
use eraftpb::{Entry, EntryType, HardState, Message, MessageType, Snapshot};
use fxhash::FxHashMap;
use protobuf::RepeatedField;
use sgx_rand::{thread_rng, Rng};
use super::errors::{Error, Result, StorageError};
use super::progress::{Progress, ProgressSet, ProgressState};
use super::raft_log::{self, RaftLog};
use super::read_only::{ReadOnly, ReadOnlyOption, ReadState};
use super::storage::Storage;
use super::Config;
// CAMPAIGN_PRE_ELECTION represents the first phase of a normal election when
// Config.pre_vote is true.
const CAMPAIGN_PRE_ELECTION: &[u8] = b"CampaignPreElection";
// CAMPAIGN_ELECTION represents a normal (time-based) election (the second phase
// of the election when Config.pre_vote is true).
const CAMPAIGN_ELECTION: &[u8] = b"CampaignElection";
// CAMPAIGN_TRANSFER represents the type of leader transfer.
const CAMPAIGN_TRANSFER: &[u8] = b"CampaignTransfer";
/// The role of the node.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum StateRole {
/// The node is a follower of the leader.
Follower,
/// The node could become a leader.
Candidate,
/// The node is a leader.
Leader,
/// The node could become a candidate, if `prevote` is enabled.
PreCandidate,
}
impl Default for StateRole {
fn default() -> StateRole {
StateRole::Follower
}
}
/// A constant represents invalid id of raft.
pub const INVALID_ID: u64 = 0;
/// A constant represents invalid index of raft log.
pub const INVALID_INDEX: u64 = 0;
/// SoftState provides state that is useful for logging and debugging.
/// The state is volatile and does not need to be persisted to the WAL.
#[derive(Default, PartialEq, Debug)]
pub struct SoftState {
/// The potential leader of the cluster.
pub leader_id: u64,
/// The soft role this node may take.
pub raft_state: StateRole,
}
/// A struct that represents the raft consensus itself. Stores details concerning the current
/// and possible state the system can take.
#[derive(Default)]
pub struct Raft<T: Storage> {
/// The current election term.
pub term: u64,
/// Which peer this raft is voting for.
pub vote: u64,
/// The ID of this node.
pub id: u64,
/// The current read states.
pub read_states: Vec<ReadState>,
/// The persistent log.
pub raft_log: RaftLog<T>,
/// The maximum number of messages that can be inflight.
pub max_inflight: usize,
/// The maximum length (in bytes) of all the entries.
pub max_msg_size: u64,
prs: Option<ProgressSet>,
/// The current role of this node.
pub state: StateRole,
/// Whether this is a learner node.
///
/// Learners are not permitted to vote in elections, and are not counted for commit quorums.
/// They do replicate data from the leader.
pub is_learner: bool,
/// The current votes for this node in an election.
///
/// Reset when changing role.
pub votes: FxHashMap<u64, bool>,
/// The list of messages.
pub msgs: Vec<Message>,
/// The leader id
pub leader_id: u64,
/// ID of the leader transfer target when its value is not None.
///
/// If this is Some(id), we follow the procedure defined in raft thesis 3.10.
pub lead_transferee: Option<u64>,
/// Only one conf change may be pending (in the log, but not yet
/// applied) at a time. This is enforced via pending_conf_index, which
/// is set to a value >= the log index of the latest pending
/// configuration change (if any). Config changes are only allowed to
/// be proposed if the leader's applied index is greater than this
/// value.
pub pending_conf_index: u64,
/// The queue of read-only requests.
pub read_only: ReadOnly,
/// Ticks since it reached last electionTimeout when it is leader or candidate.
/// Number of ticks since it reached last electionTimeout or received a
/// valid message from current leader when it is a follower.
pub election_elapsed: usize,
/// Number of ticks since it reached last heartbeatTimeout.
/// only leader keeps heartbeatElapsed.
heartbeat_elapsed: usize,
/// Whether to check the quorum
pub check_quorum: bool,
/// Enable the prevote algorithm.
///
/// This enables a pre-election vote round on Candidates prior to disrupting the cluster.
///
/// Enable this if greater cluster stability is preferred over faster elections.
pub pre_vote: bool,
skip_bcast_commit: bool,
heartbeat_timeout: usize,
election_timeout: usize,
// randomized_election_timeout is a random number between
// [min_election_timeout, max_election_timeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
randomized_election_timeout: usize,
min_election_timeout: usize,
max_election_timeout: usize,
/// Tag is only used for logging
tag: String,
}
trait AssertSend: Send {}
impl<T: Storage + Send> AssertSend for Raft<T> {}
fn new_message(to: u64, field_type: MessageType, from: Option<u64>) -> Message {
let mut m = Message::new();
m.set_to(to);
if let Some(id) = from {
m.set_from(id);
}
m.set_msg_type(field_type);
m
}
/// Maps vote and pre_vote message types to their correspond responses.
pub fn vote_resp_msg_type(t: MessageType) -> MessageType {
match t {
MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse,
MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse,
_ => panic!("Not a vote message: {:?}", t),
}
}
/// Calculate the quorum of a Raft cluster with the specified total nodes.
pub fn quorum(total: usize) -> usize {
total / 2 + 1
}
impl<T: Storage> Raft<T> {
/// Creates a new raft for use on the node.
pub fn new(c: &Config, store: T) -> Raft<T> {
c.validate().expect("configuration is invalid");
let rs = store.initial_state().expect("");
let conf_state = &rs.conf_state;
let raft_log = RaftLog::new(store, c.tag.clone());
let mut peers: &[u64] = &c.peers;
let mut learners: &[u64] = &c.learners;
if !conf_state.get_nodes().is_empty() || !conf_state.get_learners().is_empty() {
if !peers.is_empty() || !learners.is_empty() {
// TODO: the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snap
panic!(
"{} cannot specify both new(peers/learners) and ConfState.(Nodes/Learners)",
c.tag
)
}
peers = conf_state.get_nodes();
learners = conf_state.get_learners();
}
let mut r = Raft {
id: c.id,
read_states: Default::default(),
raft_log,
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
prs: Some(ProgressSet::with_capacity(peers.len(), learners.len())),
state: StateRole::Follower,
is_learner: false,
check_quorum: c.check_quorum,
pre_vote: c.pre_vote,
read_only: ReadOnly::new(c.read_only_option),
heartbeat_timeout: c.heartbeat_tick,
election_timeout: c.election_tick,
votes: Default::default(),
msgs: Default::default(),
leader_id: Default::default(),
lead_transferee: None,
term: Default::default(),
election_elapsed: Default::default(),
pending_conf_index: Default::default(),
vote: Default::default(),
heartbeat_elapsed: Default::default(),
randomized_election_timeout: 0,
min_election_timeout: c.min_election_tick(),
max_election_timeout: c.max_election_tick(),
skip_bcast_commit: c.skip_bcast_commit,
tag: c.tag.to_owned(),
};
for p in peers {
let pr = Progress::new(1, r.max_inflight, false);
if let Err(e) = r.mut_prs().insert_voter(*p, pr) {
panic!("{}", e);
}
}
for p in learners {
let pr = Progress::new(1, r.max_inflight, true);
if let Err(e) = r.mut_prs().insert_learner(*p, pr) {
panic!("{}", e);
};
if *p == r.id {
r.is_learner = true;
}
}
if rs.hard_state != HardState::new() {
r.load_state(&rs.hard_state);
}
if c.applied > 0 {
r.raft_log.applied_to(c.applied);
}
let term = r.term;
r.become_follower(term, INVALID_ID);
info!(
"{} newRaft [peers: {:?}, term: {:?}, commit: {}, applied: {}, last_index: {}, \
last_term: {}]",
r.tag,
r.prs().voters().collect::<Vec<_>>(),
r.term,
r.raft_log.committed,
r.raft_log.get_applied(),
r.raft_log.last_index(),
r.raft_log.last_term()
);
r
}
/// Grabs an immutable reference to the store.
#[inline]
pub fn get_store(&self) -> &T {
self.raft_log.get_store()
}
/// Grabs a mutable reference to the store.
#[inline]
pub fn mut_store(&mut self) -> &mut T {
self.raft_log.mut_store()
}
/// Grabs a reference to the snapshot
#[inline]
pub fn get_snap(&self) -> Option<&Snapshot> {
self.raft_log.get_unstable().snapshot.as_ref()
}
/// Returns the number of pending read-only messages.
#[inline]
pub fn pending_read_count(&self) -> usize {
self.read_only.pending_read_count()
}
/// Returns how many read states exist.
#[inline]
pub fn ready_read_count(&self) -> usize {
self.read_states.len()
}
/// Returns a value representing the softstate at the time of calling.
pub fn soft_state(&self) -> SoftState {
SoftState {
leader_id: self.leader_id,
raft_state: self.state,
}
}
/// Returns a value representing the hardstate at the time of calling.
pub fn hard_state(&self) -> HardState {
let mut hs = HardState::new();
hs.set_term(self.term);
hs.set_vote(self.vote);
hs.set_commit(self.raft_log.committed);
hs
}
/// Returns whether the current raft is in lease.
pub fn in_lease(&self) -> bool {
self.state == StateRole::Leader && self.check_quorum
}
fn quorum(&self) -> usize {
quorum(self.prs().voter_ids().len())
}
/// For testing leader lease
#[doc(hidden)]
pub fn set_randomized_election_timeout(&mut self, t: usize) {
assert!(self.min_election_timeout <= t && t < self.max_election_timeout);
self.randomized_election_timeout = t;
}
/// Fetch the length of the election timeout.
pub fn get_election_timeout(&self) -> usize {
self.election_timeout
}
/// Fetch the length of the heartbeat timeout
pub fn get_heartbeat_timeout(&self) -> usize {
self.heartbeat_timeout
}
/// Return the length of the current randomized election timeout.
pub fn get_randomized_election_timeout(&self) -> usize {
self.randomized_election_timeout
}
/// Set whether skip broadcast empty commit messages at runtime.
#[inline]
pub fn skip_bcast_commit(&mut self, skip: bool) {
self.skip_bcast_commit = skip;
}
// send persists state to stable storage and then sends to its mailbox.
fn send(&mut self, mut m: Message) {
m.set_from(self.id);
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
|| m.get_msg_type() == MessageType::MsgRequestVoteResponse
|| m.get_msg_type() == MessageType::MsgRequestPreVoteResponse
{
if m.get_term() == 0 {
// All {pre-,}campaign messages need to have the term set when
// sending.
// - MsgVote: m.Term is the term the node is campaigning for,
// non-zero as we increment the term when campaigning.
// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
// granted, non-zero for the same reason MsgVote is
// - MsgPreVote: m.Term is the term the node will campaign,
// non-zero as we use m.Term to indicate the next term we'll be
// campaigning for
// - MsgPreVoteResp: m.Term is the term received in the original
// MsgPreVote if the pre-vote was granted, non-zero for the
// same reasons MsgPreVote is
panic!(
"{} term should be set when sending {:?}",
self.tag,
m.get_msg_type()
);
}
} else {
if m.get_term() != 0 {
panic!(
"{} term should not be set when sending {:?} (was {})",
self.tag,
m.get_msg_type(),
m.get_term()
);
}
// do not attach term to MsgPropose, MsgReadIndex
// proposals are a way to forward to the leader and
// should be treated as local message.
// MsgReadIndex is also forwarded to leader.
if m.get_msg_type() != MessageType::MsgPropose
&& m.get_msg_type() != MessageType::MsgReadIndex
{
m.set_term(self.term);
}
}
self.msgs.push(m);
}
fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
if !pr.recent_active {
debug!(
"{} ignore sending snapshot to {} since it is not recently active",
self.tag, to
);
return false;
}
m.set_msg_type(MessageType::MsgSnapshot);
let snapshot_r = self.raft_log.snapshot();
if let Err(e) = snapshot_r {
if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
debug!(
"{} failed to send snapshot to {} because snapshot is temporarily \
unavailable",
self.tag, to
);
return false;
}
panic!("{} unexpected error: {:?}", self.tag, e);
}
let snapshot = snapshot_r.unwrap();
if snapshot.get_metadata().get_index() == 0 {
panic!("{} need non-empty snapshot", self.tag);
}
let (sindex, sterm) = (
snapshot.get_metadata().get_index(),
snapshot.get_metadata().get_term(),
);
m.set_snapshot(snapshot);
debug!(
"{} [firstindex: {}, commit: {}] sent snapshot[index: {}, term: {}] to {} \
[{:?}]",
self.tag,
self.raft_log.first_index(),
self.raft_log.committed,
sindex,
sterm,
to,
pr
);
pr.become_snapshot(sindex);
debug!(
"{} paused sending replication messages to {} [{:?}]",
self.tag, to, pr
);
true
}
fn prepare_send_entries(
&mut self,
m: &mut Message,
pr: &mut Progress,
term: u64,
ents: Vec<Entry>,
) {
m.set_msg_type(MessageType::MsgAppend);
m.set_index(pr.next_idx - 1);
m.set_log_term(term);
m.set_entries(RepeatedField::from_vec(ents));
m.set_commit(self.raft_log.committed);
if !m.get_entries().is_empty() {
match pr.state {
ProgressState::Replicate => {
let last = m.get_entries().last().unwrap().get_index();
pr.optimistic_update(last);
pr.ins.add(last);
}
ProgressState::Probe => pr.pause(),
_ => panic!(
"{} is sending append in unhandled state {:?}",
self.tag, pr.state
),
}
}
}
/// Sends RPC, with entries to the given peer.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) {
if pr.is_paused() {
return;
}
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
let mut m = Message::new();
m.set_to(to);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
}
self.send(m);
}
// send_heartbeat sends an empty MsgAppend
fn send_heartbeat(&mut self, to: u64, pr: &Progress, ctx: Option<Vec<u8>>) {
// Attach the commit as min(to.matched, self.raft_log.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
let mut m = Message::new();
m.set_to(to);
m.set_msg_type(MessageType::MsgHeartbeat);
let commit = cmp::min(pr.matched, self.raft_log.committed);
m.set_commit(commit);
if let Some(context) = ctx {
m.set_context(context);
}
self.send(m);
}
/// Sends RPC, with entries to all peers that are not up-to-date
/// according to the progress recorded in r.prs().
pub fn bcast_append(&mut self) {
let self_id = self.id;
let mut prs = self.take_prs();
prs.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| self.send_append(*id, pr));
self.set_prs(prs);
}
/// Sends RPC, without entries to all the peers.
pub fn bcast_heartbeat(&mut self) {
let ctx = self.read_only.last_pending_request_ctx();
self.bcast_heartbeat_with_ctx(ctx)
}
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn bcast_heartbeat_with_ctx(&mut self, ctx: Option<Vec<u8>>) {
let self_id = self.id;
let mut prs = self.take_prs();
prs.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| self.send_heartbeat(*id, pr, ctx.clone()));
self.set_prs(prs);
}
/// Attempts to advance the commit index. Returns true if the commit index
/// changed (in which case the caller should call `r.bcast_append`).
pub fn maybe_commit(&mut self) -> bool {
let mut mis_arr = [0; 5];
let mut mis_vec;
let voter_count = self.prs().voter_ids().len();
let mis = if voter_count <= 5 {
&mut mis_arr[..voter_count]
} else {
mis_vec = vec![0; voter_count];
mis_vec.as_mut_slice()
};
for (i, pr) in self.prs().voters().map(|(_, v)| v).enumerate() {
mis[i] = pr.matched;
}
// reverse sort
mis.sort_by(|a, b| b.cmp(a));
let mci = mis[self.quorum() - 1];
self.raft_log.maybe_commit(mci, self.term)
}
/// Resets the current node to a given term.
pub fn reset(&mut self, term: u64) {
if self.term != term {
self.term = term;
self.vote = INVALID_ID;
}
self.leader_id = INVALID_ID;
self.reset_randomized_election_timeout();
self.election_elapsed = 0;
self.heartbeat_elapsed = 0;
self.abort_leader_transfer();
self.votes = FxHashMap::default();
self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);
let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
let self_id = self.id;
for (&id, pr) in self.mut_prs().iter_mut() {
*pr = Progress::new(last_index + 1, max_inflight, pr.is_learner);
if id == self_id {
pr.matched = last_index;
}
}
}
/// Appends a slice of entries to the log. The entries are updated to match
/// the current index and term.
pub fn append_entry(&mut self, es: &mut [Entry]) {
let mut li = self.raft_log.last_index();
for (i, e) in es.iter_mut().enumerate() {
e.set_term(self.term);
e.set_index(li + 1 + i as u64);
}
// use latest "last" index after truncate/append
li = self.raft_log.append(es);
let self_id = self.id;
self.mut_prs().get_mut(self_id).unwrap().maybe_update(li);
// Regardless of maybe_commit's return, our caller will call bcastAppend.
self.maybe_commit();
}
/// Returns true to indicate that there will probably be some readiness need to be handled.
pub fn tick(&mut self) -> bool {
match self.state {
StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => {
self.tick_election()
}
StateRole::Leader => self.tick_heartbeat(),
}
}
// TODO: revoke pub when there is a better way to test.
/// Run by followers and candidates after self.election_timeout.
///
/// Returns true to indicate that there will probably be some readiness need to be handled.
pub fn tick_election(&mut self) -> bool {
self.election_elapsed += 1;
if !self.pass_election_timeout() || !self.promotable() {
return false;
}
self.election_elapsed = 0;
let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
self.step(m).is_ok();
true
}
// tick_heartbeat is run by leaders to send a MsgBeat after self.heartbeat_timeout.
// Returns true to indicate that there will probably be some readiness need to be handled.
fn tick_heartbeat(&mut self) -> bool {
self.heartbeat_elapsed += 1;
self.election_elapsed += 1;
let mut has_ready = false;
if self.election_elapsed >= self.election_timeout {
self.election_elapsed = 0;
if self.check_quorum {
let m = new_message(INVALID_ID, MessageType::MsgCheckQuorum, Some(self.id));
has_ready = true;
self.step(m).is_ok();
}
if self.state == StateRole::Leader && self.lead_transferee.is_some() {
self.abort_leader_transfer()
}
}
if self.state != StateRole::Leader {
return has_ready;
}
if self.heartbeat_elapsed >= self.heartbeat_timeout {
self.heartbeat_elapsed = 0;
has_ready = true;
let m = new_message(INVALID_ID, MessageType::MsgBeat, Some(self.id));
self.step(m).is_ok();
}
has_ready
}
/// Converts this node to a follower.
pub fn become_follower(&mut self, term: u64, leader_id: u64) {
self.reset(term);
self.leader_id = leader_id;
self.state = StateRole::Follower;
info!("{} became follower at term {}", self.tag, self.term);
}
// TODO: revoke pub when there is a better way to test.
/// Converts this node to a candidate
///
/// # Panics
///
/// Panics if a leader already exists.
pub fn become_candidate(&mut self) {
assert_ne!(
self.state,
StateRole::Leader,
"invalid transition [leader -> candidate]"
);
let term = self.term + 1;
self.reset(term);
let id = self.id;
self.vote = id;
self.state = StateRole::Candidate;
info!("{} became candidate at term {}", self.tag, self.term);
}
/// Converts this node to a pre-candidate
///
/// # Panics
///
/// Panics if a leader already exists.
pub fn become_pre_candidate(&mut self) {
assert_ne!(
self.state,
StateRole::Leader,
"invalid transition [leader -> pre-candidate]"
);
// Becoming a pre-candidate changes our state.
// but doesn't change anything else. In particular it does not increase
// self.term or change self.vote.
self.state = StateRole::PreCandidate;
self.votes = FxHashMap::default();
// If a network partition happens, and leader is in minority partition,
// it will step down, and become follower without notifying others.
self.leader_id = INVALID_ID;
info!("{} became pre-candidate at term {}", self.tag, self.term);
}
// TODO: revoke pub when there is a better way to test.
/// Makes this raft the leader.
///
/// # Panics
///
/// Panics if this is a follower node.
pub fn become_leader(&mut self) {
assert_ne!(
self.state,
StateRole::Follower,
"invalid transition [follower -> leader]"
);
let term = self.term;
self.reset(term);
self.leader_id = self.id;
self.state = StateRole::Leader;
// Conservatively set the pending_conf_index to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
self.pending_conf_index = self.raft_log.last_index();
self.append_entry(&mut [Entry::new()]);
info!("{} became leader at term {}", self.tag, self.term);
}
fn num_pending_conf(&self, ents: &[Entry]) -> usize {
ents.into_iter()
.filter(|e| e.get_entry_type() == EntryType::EntryConfChange)
.count()
}
/// Campaign to attempt to become a leader.
///
/// If prevote is enabled, this is handled as well.
pub fn campaign(&mut self, campaign_type: &[u8]) {
let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
self.become_pre_candidate();
// Pre-vote RPCs are sent for next term before we've incremented self.term.
(MessageType::MsgRequestPreVote, self.term + 1)
} else {
self.become_candidate();
(MessageType::MsgRequestVote, self.term)
};
let self_id = self.id;
if self.quorum() == self.poll(self_id, vote_resp_msg_type(vote_msg), true) {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if campaign_type == CAMPAIGN_PRE_ELECTION {
self.campaign(CAMPAIGN_ELECTION);
} else {
self.become_leader();
}
return;
}
// Only send vote request to voters.
let prs = self.take_prs();
prs.voter_ids()
.iter()
.filter(|&id| *id != self_id)
.for_each(|&id| {
info!(
"{} [logterm: {}, index: {}] sent {:?} request to {} at term {}",
self.tag,
self.raft_log.last_term(),
self.raft_log.last_index(),
vote_msg,
id,
self.term
);
let mut m = new_message(id, vote_msg, None);
m.set_term(term);
m.set_index(self.raft_log.last_index());
m.set_log_term(self.raft_log.last_term());
if campaign_type == CAMPAIGN_TRANSFER {
m.set_context(campaign_type.to_vec());
}
self.send(m);
});
self.set_prs(prs);
}
/// Sets the vote of `id` to `vote`.
///
/// Returns the number of votes for the `id` currently.
fn poll(&mut self, id: u64, msg_type: MessageType, vote: bool) -> usize {
if vote {
info!(
"{} received {:?} from {} at term {}",
self.tag, msg_type, id, self.term
)
} else {
info!(
"{} received {:?} rejection from {} at term {}",
self.tag, msg_type, id, self.term
)
}
self.votes.entry(id).or_insert(vote);
self.votes.values().filter(|x| **x).count()
}
/// Steps the raft along via a message. This should be called everytime your raft receives a
/// message from a peer.
pub fn step(&mut self, m: Message) -> Result<()> {
// Handle the message term, which may result in our stepping down to a follower.
if m.get_term() == 0 {
// local message
} else if m.get_term() > self.term {
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
{
let force = m.get_context() == CAMPAIGN_TRANSFER;
let in_lease = self.check_quorum
&& self.leader_id != INVALID_ID
&& self.election_elapsed < self.election_timeout;
if !force && in_lease {
// if a server receives RequestVote request within the minimum election
// timeout of hearing from a current leader, it does not update its term
// or grant its vote
info!(
"{} [logterm: {}, index: {}, vote: {}] ignored {:?} vote from \
{} [logterm: {}, index: {}] at term {}: lease is not expired \
(remaining ticks: {})",
self.tag,
self.raft_log.last_term(),
self.raft_log.last_index(),
self.vote,
m.get_msg_type(),
m.get_from(),
m.get_log_term(),
m.get_index(),
self.term,
self.election_timeout - self.election_elapsed
);
return Ok(());
}
}
if m.get_msg_type() == MessageType::MsgRequestPreVote
|| (m.get_msg_type() == MessageType::MsgRequestPreVoteResponse && !m.get_reject())
{
// For a pre-vote request:
// Never change our term in response to a pre-vote request.
//
// For a pre-vote response with pre-vote granted:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
} else {
info!(
"{} [term: {}] received a {:?} message with higher term from {} [term: {}]",
self.tag,
self.term,
m.get_msg_type(),
m.get_from(),
m.get_term()
);
if m.get_msg_type() == MessageType::MsgAppend
|| m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgSnapshot
{
self.become_follower(m.get_term(), m.get_from());
} else {
self.become_follower(m.get_term(), INVALID_ID);
}
}
} else if m.get_term() < self.term {
if (self.check_quorum || self.pre_vote)
&& (m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgAppend)
{
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
// partition, and it is now unable to either win an election or to rejoin
// the majority on the old term. If checkQuorum is false, this will be
// handled by incrementing term numbers in response to MsgVote with a higher
// term, but if checkQuorum is true we may not advance the term on MsgVote and
// must generate other messages to advance the term. The net result of these
// two features is to minimize the disruption caused by nodes that have been
// removed from the cluster's configuration: a removed node will send MsgVotes
// which will be ignored, but it will not receive MsgApp or MsgHeartbeat, so it
// will not create disruptive term increases, by notifying leader of this node's
// activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
let to_send = new_message(m.get_from(), MessageType::MsgAppendResponse, None);
self.send(to_send);
} else if m.get_msg_type() == MessageType::MsgRequestPreVote {
// Before pre_vote enable, there may be a recieving candidate with higher term,
// but less log. After update to pre_vote, the cluster may deadlock if
// we drop messages with a lower term.
info!(
"{} [log_term: {}, index: {}, vote: {}] rejected {:?} from {} [log_term: {}, index: {}] at term {}",
self.id,
self.raft_log.last_term(),
self.raft_log.last_index(),
self.vote,
m.get_msg_type(),
m.get_from(),
m.get_log_term(),
m.get_index(),
self.term,
);
let mut to_send =
new_message(m.get_from(), MessageType::MsgRequestPreVoteResponse, None);
to_send.set_term(self.term);
to_send.set_reject(true);
self.send(to_send);
} else {
// ignore other cases
info!(
"{} [term: {}] ignored a {:?} message with lower term from {} [term: {}]",
self.tag,
self.term,
m.get_msg_type(),
m.get_from(),
m.get_term()
);
}
return Ok(());
}
#[cfg(feature = "failpoint")]
fail_point!("before_step");
match m.get_msg_type() {
MessageType::MsgHup => if self.state != StateRole::Leader {
let ents = self
.raft_log
.slice(
self.raft_log.applied + 1,
self.raft_log.committed + 1,
raft_log::NO_LIMIT,
).expect("unexpected error getting unapplied entries");
let n = self.num_pending_conf(&ents);
if n != 0 && self.raft_log.committed > self.raft_log.applied {
warn!(
"{} cannot campaign at term {} since there are still {} pending \
configuration changes to apply",
self.tag, self.term, n
);
return Ok(());
}
info!(
"{} is starting a new election at term {}",
self.tag, self.term
);
if self.pre_vote {
self.campaign(CAMPAIGN_PRE_ELECTION);
} else {
self.campaign(CAMPAIGN_ELECTION);
}
} else {
debug!("{} ignoring MsgHup because already leader", self.tag);
},
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
// We can vote if this is a repeat of a vote we've already cast...
let can_vote = (self.vote == m.get_from()) ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(self.vote == INVALID_ID && self.leader_id == INVALID_ID) ||
// ...or this is a PreVote for a future term...
(m.msg_type == MessageType::MsgRequestPreVote && m.get_term() > self.term);
// ...and we believe the candidate is up to date.
if can_vote && self.raft_log.is_up_to_date(m.get_index(), m.get_log_term()) {
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why consider the
// case where a single node was previously partitioned away and
// it's local term is now of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
self.log_vote_approve(&m);
let mut to_send =
new_message(m.get_from(), vote_resp_msg_type(m.get_msg_type()), None);
to_send.set_reject(false);
to_send.set_term(m.get_term());
self.send(to_send);
if m.get_msg_type() == MessageType::MsgRequestVote {
// Only record real votes.
self.election_elapsed = 0;
self.vote = m.get_from();
}
} else {
self.log_vote_reject(&m);
let mut to_send =
new_message(m.get_from(), vote_resp_msg_type(m.get_msg_type()), None);
to_send.set_reject(true);
to_send.set_term(self.term);
self.send(to_send);
}
}
_ => match self.state {
StateRole::PreCandidate | StateRole::Candidate => self.step_candidate(m)?,
StateRole::Follower => self.step_follower(m)?,
StateRole::Leader => self.step_leader(m)?,
},
}
Ok(())
}
fn log_vote_approve(&self, m: &Message) {
info!(
"{} [logterm: {}, index: {}, vote: {}] cast {:?} for {} [logterm: {}, index: {}] \
at term {}",
self.tag,
self.raft_log.last_term(),
self.raft_log.last_index(),
self.vote,
m.get_msg_type(),
m.get_from(),
m.get_log_term(),
m.get_index(),
self.term
);
}
fn log_vote_reject(&self, m: &Message) {
info!(
"{} [logterm: {}, index: {}, vote: {}] rejected {:?} from {} [logterm: {}, index: \
{}] at term {}",
self.tag,
self.raft_log.last_term(),
self.raft_log.last_index(),
self.vote,
m.get_msg_type(),
m.get_from(),
m.get_log_term(),
m.get_index(),
self.term
);
}
fn handle_append_response(
&mut self,
m: &Message,
prs: &mut ProgressSet,
old_paused: &mut bool,
send_append: &mut bool,
maybe_commit: &mut bool,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
if m.get_reject() {
debug!(
"{} received msgAppend rejection(lastindex: {}) from {} for index {}",
self.tag,
m.get_reject_hint(),
m.get_from(),
m.get_index()
);
if pr.maybe_decr_to(m.get_index(), m.get_reject_hint()) {
debug!(
"{} decreased progress of {} to [{:?}]",
self.tag,
m.get_from(),
pr
);
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
*send_append = true;
}
return;
}
*old_paused = pr.is_paused();
if !pr.maybe_update(m.get_index()) {
return;
}
// Transfer leadership is in progress.
if let Some(lead_transferee) = self.lead_transferee {
let last_index = self.raft_log.last_index();
if m.get_from() == lead_transferee && pr.matched == last_index {
info!(
"{} sent MsgTimeoutNow to {} after received MsgAppResp",
self.tag,
m.get_from()
);
self.send_timeout_now(m.get_from());
}
}
match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if !pr.maybe_snapshot_abort() {
return;
}
debug!(
"{} snapshot aborted, resumed sending replication messages to {} \
[{:?}]",
self.tag,
m.get_from(),
pr
);
pr.become_probe();
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
*maybe_commit = true;
}
fn handle_heartbeat_response(
&mut self,
m: &Message,
prs: &mut ProgressSet,
quorum: usize,
send_append: &mut bool,
more_to_send: &mut Option<Message>,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
pr.resume();
// free one slot for the full inflights window to allow progress.
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.raft_log.last_index() {
*send_append = true;
}
if self.read_only.option != ReadOnlyOption::Safe || m.get_context().is_empty() {
return;
}
if self.read_only.recv_ack(m) < quorum {
return;
}
let rss = self.read_only.advance(m);
for rs in rss {
let mut req = rs.req;
if req.get_from() == INVALID_ID || req.get_from() == self.id {
// from local member
let rs = ReadState {
index: rs.index,
request_ctx: req.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
let mut to_send = Message::new();
to_send.set_to(req.get_from());
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.set_index(rs.index);
to_send.set_entries(req.take_entries());
*more_to_send = Some(to_send);
}
}
}
fn handle_transfer_leader(&mut self, m: &Message, pr: &mut Progress) {
if self.is_learner {
debug!("{} is learner. Ignored transferring leadership", self.tag);
return;
}
let lead_transferee = m.get_from();
let last_lead_transferee = self.lead_transferee;
if last_lead_transferee.is_some() {
if last_lead_transferee.unwrap() == lead_transferee {
info!(
"{} [term {}] transfer leadership to {} is in progress, ignores request \
to same node {}",
self.tag, self.term, lead_transferee, lead_transferee
);
return;
}
self.abort_leader_transfer();
info!(
"{} [term {}] abort previous transferring leadership to {}",
self.tag,
self.term,
last_lead_transferee.unwrap()
);
}
if lead_transferee == self.id {
debug!(
"{} is already leader. Ignored transferring leadership to self",
self.tag
);
return;
}
// Transfer leadership to third party.
info!(
"{} [term {}] starts to transfer leadership to {}",
self.tag, self.term, lead_transferee
);
// Transfer leadership should be finished in one electionTimeout
// so reset r.electionElapsed.
self.election_elapsed = 0;
self.lead_transferee = Some(lead_transferee);
if pr.matched == self.raft_log.last_index() {
self.send_timeout_now(lead_transferee);
info!(
"{} sends MsgTimeoutNow to {} immediately as {} already has up-to-date log",
self.tag, lead_transferee, lead_transferee
);
} else {
self.send_append(lead_transferee, pr);
}
}
fn handle_snapshot_status(&mut self, m: &Message, pr: &mut Progress) {
if m.get_reject() {
pr.snapshot_failure();
pr.become_probe();
debug!(
"{} snapshot failed, resumed sending replication messages to {} [{:?}]",
self.tag,
m.get_from(),
pr
);
} else {
pr.become_probe();
debug!(
"{} snapshot succeeded, resumed sending replication messages to {} [{:?}]",
self.tag,
m.get_from(),
pr
);
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgAppend.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause();
}
/// Check message's progress to decide which action should be taken.
fn check_message_with_progress(
&mut self,
m: &mut Message,
send_append: &mut bool,
old_paused: &mut bool,
maybe_commit: &mut bool,
more_to_send: &mut Option<Message>,
) {
if self.prs().get(m.get_from()).is_none() {
debug!("{} no progress available for {}", self.tag, m.get_from());
return;
}
let mut prs = self.take_prs();
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit);
}
MessageType::MsgHeartbeatResponse => {
let quorum = quorum(prs.voter_ids().len());
self.handle_heartbeat_response(m, &mut prs, quorum, send_append, more_to_send);
}
MessageType::MsgSnapStatus => {
let pr = prs.get_mut(m.get_from()).unwrap();
if pr.state == ProgressState::Snapshot {
self.handle_snapshot_status(m, pr);
}
}
MessageType::MsgUnreachable => {
let pr = prs.get_mut(m.get_from()).unwrap();
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgAppend is lost.
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
debug!(
"{} failed to send message to {} because it is unreachable [{:?}]",
self.tag,
m.get_from(),
pr
);
}
MessageType::MsgTransferLeader => {
let pr = prs.get_mut(m.get_from()).unwrap();
self.handle_transfer_leader(m, pr);
}
_ => {}
}
self.set_prs(prs);
}
fn step_leader(&mut self, mut m: Message) -> Result<()> {
// These message types do not require any progress for m.From.
match m.get_msg_type() {
MessageType::MsgBeat => {
self.bcast_heartbeat();
return Ok(());
}
MessageType::MsgCheckQuorum => {
if !self.check_quorum_active() {
warn!(
"{} stepped down to follower since quorum is not active",
self.tag
);
let term = self.term;
self.become_follower(term, INVALID_ID);
}
return Ok(());
}
MessageType::MsgPropose => {
if m.get_entries().is_empty() {
panic!("{} stepped empty MsgProp", self.tag);
}
if !self.prs().voter_ids().contains(&self.id) {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return Err(Error::ProposalDropped);
}
if self.lead_transferee.is_some() {
debug!(
"{} [term {}] transfer leadership to {} is in progress; dropping \
proposal",
self.tag,
self.term,
self.lead_transferee.unwrap()
);
return Err(Error::ProposalDropped);
}
for (i, e) in m.mut_entries().iter_mut().enumerate() {
if e.get_entry_type() == EntryType::EntryConfChange {
if self.has_pending_conf() {
info!(
"propose conf {:?} ignored since pending unapplied \
configuration [index {}, applied {}]",
e, self.pending_conf_index, self.raft_log.applied
);
*e = Entry::new();
e.set_entry_type(EntryType::EntryNormal);
} else {
self.pending_conf_index = self.raft_log.last_index() + i as u64 + 1;
}
}
}
self.append_entry(&mut m.mut_entries());
self.bcast_append();
return Ok(());
}
MessageType::MsgReadIndex => {
if self.raft_log.term(self.raft_log.committed).unwrap_or(0) != self.term {
// Reject read only request when this leader has not committed any log entry
// in its term.
return Ok(());
}
if self.quorum() > 1 {
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of
// a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
match self.read_only.option {
ReadOnlyOption::Safe => {
let ctx = m.get_entries()[0].get_data().to_vec();
self.read_only.add_request(self.raft_log.committed, m);
self.bcast_heartbeat_with_ctx(Some(ctx));
}
ReadOnlyOption::LeaseBased => {
let mut read_index = INVALID_INDEX;
if self.check_quorum {
read_index = self.raft_log.committed
}
if m.get_from() == INVALID_ID || m.get_from() == self.id {
// from local member
let rs = ReadState {
index: self.raft_log.committed,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.set_index(read_index);
to_send.set_entries(m.take_entries());
self.send(to_send);
}
}
}
} else {
let rs = ReadState {
index: self.raft_log.committed,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
}
return Ok(());
}
_ => {}
}
let mut send_append = false;
let mut maybe_commit = false;
let mut old_paused = false;
let mut more_to_send = None;
self.check_message_with_progress(
&mut m,
&mut send_append,
&mut old_paused,
&mut maybe_commit,
&mut more_to_send,
);
if maybe_commit {
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append();
}
} else if old_paused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
send_append = true;
}
}
if send_append {
let from = m.get_from();
let mut prs = self.take_prs();
self.send_append(from, prs.get_mut(from).unwrap());
self.set_prs(prs);
}
if let Some(to_send) = more_to_send {
self.send(to_send)
}
Ok(())
}
// step_candidate is shared by state Candidate and PreCandidate; the difference is
// whether they respond to MsgRequestVote or MsgRequestPreVote.
fn step_candidate(&mut self, m: Message) -> Result<()> {
match m.get_msg_type() {
MessageType::MsgPropose => {
info!(
"{} no leader at term {}; dropping proposal",
self.tag, self.term
);
return Err(Error::ProposalDropped);
}
MessageType::MsgAppend => {
debug_assert_eq!(self.term, m.get_term());
self.become_follower(m.get_term(), m.get_from());
self.handle_append_entries(&m);
}
MessageType::MsgHeartbeat => {
debug_assert_eq!(self.term, m.get_term());
self.become_follower(m.get_term(), m.get_from());
self.handle_heartbeat(m);
}
MessageType::MsgSnapshot => {
debug_assert_eq!(self.term, m.get_term());
self.become_follower(m.get_term(), m.get_from());
self.handle_snapshot(m);
}
MessageType::MsgRequestPreVoteResponse | MessageType::MsgRequestVoteResponse => {
// Only handle vote responses corresponding to our candidacy (while in
// state Candidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
if (self.state == StateRole::PreCandidate
&& m.get_msg_type() != MessageType::MsgRequestPreVoteResponse)
|| (self.state == StateRole::Candidate
&& m.get_msg_type() != MessageType::MsgRequestVoteResponse)
{
return Ok(());
}
let gr = self.poll(m.get_from(), m.get_msg_type(), !m.get_reject());
info!(
"{} [quorum:{}] has received {} {:?} votes and {} vote rejections",
self.tag,
self.quorum(),
gr,
m.get_msg_type(),
self.votes.len() - gr
);
if self.quorum() == gr {
if self.state == StateRole::PreCandidate {
self.campaign(CAMPAIGN_ELECTION);
} else {
self.become_leader();
self.bcast_append();
}
} else if self.quorum() == self.votes.len() - gr {
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.term > self.term; reuse self.term
let term = self.term;
self.become_follower(term, INVALID_ID);
}
}
MessageType::MsgTimeoutNow => debug!(
"{} [term {} state {:?}] ignored MsgTimeoutNow from {}",
self.tag,
self.term,
self.state,
m.get_from()
),
_ => {}
}
Ok(())
}
fn step_follower(&mut self, mut m: Message) -> Result<()> {
match m.get_msg_type() {
MessageType::MsgPropose => {
if self.leader_id == INVALID_ID {
info!(
"{} no leader at term {}; dropping proposal",
self.tag, self.term
);
return Err(Error::ProposalDropped);
}
m.set_to(self.leader_id);
self.send(m);
}
MessageType::MsgAppend => {
self.election_elapsed = 0;
self.leader_id = m.get_from();
self.handle_append_entries(&m);
}
MessageType::MsgHeartbeat => {
self.election_elapsed = 0;
self.leader_id = m.get_from();
self.handle_heartbeat(m);
}
MessageType::MsgSnapshot => {
self.election_elapsed = 0;
self.leader_id = m.get_from();
self.handle_snapshot(m);
}
MessageType::MsgTransferLeader => {
if self.leader_id == INVALID_ID {
info!(
"{} no leader at term {}; dropping leader transfer msg",
self.tag, self.term
);
return Ok(());
}
m.set_to(self.leader_id);
self.send(m);
}
MessageType::MsgTimeoutNow => {
if self.promotable() {
info!(
"{} [term {}] received MsgTimeoutNow from {} and starts an election to \
get leadership.",
self.tag,
self.term,
m.get_from()
);
// Leadership transfers never use pre-vote even if self.pre_vote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
self.campaign(CAMPAIGN_TRANSFER);
} else {
info!(
"{} received MsgTimeoutNow from {} but is not promotable",
self.tag,
m.get_from()
);
}
}
MessageType::MsgReadIndex => {
if self.leader_id == INVALID_ID {
info!(
"{} no leader at term {}; dropping index reading msg",
self.tag, self.term
);
return Ok(());
}
m.set_to(self.leader_id);
self.send(m);
}
MessageType::MsgReadIndexResp => {
if m.get_entries().len() != 1 {
error!(
"{} invalid format of MsgReadIndexResp from {}, entries count: {}",
self.tag,
m.get_from(),
m.get_entries().len()
);
return Ok(());
}
let rs = ReadState {
index: m.get_index(),
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
}
_ => {}
}
Ok(())
}
// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
if m.get_index() < self.raft_log.committed {
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.set_index(self.raft_log.committed);
self.send(to_send);
return;
}
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgAppendResponse);
match self.raft_log.maybe_append(
m.get_index(),
m.get_log_term(),
m.get_commit(),
m.get_entries(),
) {
Some(mlast_index) => {
to_send.set_index(mlast_index);
self.send(to_send);
}
None => {
debug!(
"{} [logterm: {}, index: {}] rejected msgApp [logterm: {}, index: {}] \
from {}",
self.tag,
self.raft_log.term(m.get_index()).unwrap_or(0),
m.get_index(),
m.get_log_term(),
m.get_index(),
m.get_from()
);
to_send.set_index(m.get_index());
to_send.set_reject(true);
to_send.set_reject_hint(self.raft_log.last_index());
self.send(to_send);
}
}
}
// TODO: revoke pub when there is a better way to test.
/// For a message, commit and send out heartbeat.
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.get_commit());
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgHeartbeatResponse);
to_send.set_context(m.take_context());
self.send(to_send);
}
fn handle_snapshot(&mut self, mut m: Message) {
let (sindex, sterm) = (
m.get_snapshot().get_metadata().get_index(),
m.get_snapshot().get_metadata().get_term(),
);
if self.restore(m.take_snapshot()) {
info!(
"{} [commit: {}] restored snapshot [index: {}, term: {}]",
self.tag, self.raft_log.committed, sindex, sterm
);
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.set_index(self.raft_log.last_index());
self.send(to_send);
} else {
info!(
"{} [commit: {}] ignored snapshot [index: {}, term: {}]",
self.tag, self.raft_log.committed, sindex, sterm
);
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.set_index(self.raft_log.committed);
self.send(to_send);
}
}
fn restore_raft(&mut self, snap: &Snapshot) -> Option<bool> {
let meta = snap.get_metadata();
if self.raft_log.match_term(meta.get_index(), meta.get_term()) {
info!(
"{} [commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to \
snapshot [index: {}, term: {}]",
self.tag,
self.raft_log.committed,
self.raft_log.last_index(),
self.raft_log.last_term(),
meta.get_index(),
meta.get_term()
);
self.raft_log.commit_to(meta.get_index());
return Some(false);
}
// Both of learners and voters are empty means the peer is created by ConfChange.
if self.prs().iter().len() != 0 && !self.is_learner {
for &id in meta.get_conf_state().get_learners() {
if id == self.id {
error!(
"{} can't become learner when restores snapshot [index: {}, term: {}]",
self.tag,
meta.get_index(),
meta.get_term(),
);
return Some(false);
}
}
}
info!(
"{} [commit: {}, lastindex: {}, lastterm: {}] starts to restore snapshot \
[index: {}, term: {}]",
self.tag,
self.raft_log.committed,
self.raft_log.last_index(),
self.raft_log.last_term(),
meta.get_index(),
meta.get_term()
);
let nodes = meta.get_conf_state().get_nodes();
let learners = meta.get_conf_state().get_learners();
self.prs = Some(ProgressSet::with_capacity(nodes.len(), learners.len()));
for &(is_learner, nodes) in &[(false, nodes), (true, learners)] {
for &n in nodes {
let next_index = self.raft_log.last_index() + 1;
let mut matched = 0;
if n == self.id {
matched = next_index - 1;
self.is_learner = is_learner;
}
self.set_progress(n, matched, next_index, is_learner);
info!(
"{} restored progress of {} [{:?}]",
self.tag,
n,
self.prs().get(n)
);
}
}
None
}
/// Recovers the state machine from a snapshot. It restores the log and the
/// configuration of state machine.
pub fn restore(&mut self, snap: Snapshot) -> bool {
if snap.get_metadata().get_index() < self.raft_log.committed {
return false;
}
if let Some(b) = self.restore_raft(&snap) {
return b;
}
self.raft_log.restore(snap);
true
}
/// Check if there is any pending confchange.
///
/// This method can be false positive.
#[inline]
pub fn has_pending_conf(&self) -> bool {
self.pending_conf_index > self.raft_log.applied
}
/// Specifies if the commit should be broadcast.
pub fn should_bcast_commit(&self) -> bool {
!self.skip_bcast_commit || self.has_pending_conf()
}
/// Indicates whether state machine can be promoted to leader,
/// which is true when its own id is in progress list.
pub fn promotable(&self) -> bool {
self.prs().voter_ids().contains(&self.id)
}
fn add_voter_or_learner(&mut self, id: u64, learner: bool) {
debug!(
"Adding node (learner: {}) with ID {} to peers.",
learner, id
);
// Ignore redundant inserts.
// TODO: Remove these and have this function and related functions return errors.
if let Some(progress) = self.prs().get(id) {
// If progress.is_learner == learner, then it's already inserted as what it should be, return early to avoid error.
if progress.is_learner == learner {
info!("{} Ignoring redundant insert of ID {}.", self.tag, id);
return;
}
// If progress.is_learner == false, and learner == true, then it's a demotion, return early to avoid an error.
if !progress.is_learner && learner {
info!("{} Ignoring voter demotion of ID {}.", self.tag, id);
return;
}
};
let progress = Progress::new(self.raft_log.last_index() + 1, self.max_inflight, learner);
let result = if learner {
self.mut_prs().insert_learner(id, progress)
} else if self.prs().learner_ids().contains(&id) {
self.mut_prs().promote_learner(id)
} else {
self.mut_prs().insert_voter(id, progress)
};
if let Err(e) = result {
panic!("{}", e)
}
if self.id == id {
self.is_learner = learner
};
// When a node is first added/promoted, we should mark it as recently active.
// Otherwise, check_quorum may cause us to step down if it is invoked
// before the added node has a chance to commuicate with us.
self.mut_prs().get_mut(id).unwrap().recent_active = true;
}
/// Adds a new node to the cluster.
pub fn add_node(&mut self, id: u64) {
self.add_voter_or_learner(id, false);
}
/// Adds a learner node.
pub fn add_learner(&mut self, id: u64) {
self.add_voter_or_learner(id, true);
}
/// Removes a node from the raft.
pub fn remove_node(&mut self, id: u64) {
self.mut_prs().remove(id);
// do not try to commit or abort transferring if there are no nodes in the cluster.
if self.prs().voter_ids().is_empty() && self.prs().learner_ids().is_empty() {
return;
}
// The quorum size is now smaller, so see if any pending entries can
// be committed.
if self.maybe_commit() {
self.bcast_append();
}
// If the removed node is the lead_transferee, then abort the leadership transferring.
if self.state == StateRole::Leader && self.lead_transferee == Some(id) {
self.abort_leader_transfer()
}
}
/// Updates the progress of the learner or voter.
pub fn set_progress(&mut self, id: u64, matched: u64, next_idx: u64, is_learner: bool) {
let mut p = Progress::new(next_idx, self.max_inflight, is_learner);
p.matched = matched;
if is_learner {
if let Err(e) = self.mut_prs().insert_learner(id, p) {
panic!("{}", e);
}
} else if let Err(e) = self.mut_prs().insert_voter(id, p) {
panic!("{}", e);
}
}
/// Takes the progress set (destructively turns to `None`).
pub fn take_prs(&mut self) -> ProgressSet {
self.prs.take().unwrap()
}
/// Sets the progress set.
pub fn set_prs(&mut self, prs: ProgressSet) {
self.prs = Some(prs);
}
/// Returns a read-only reference to the progress set.
pub fn prs(&self) -> &ProgressSet {
self.prs.as_ref().unwrap()
}
/// Returns a mutable reference to the progress set.
pub fn mut_prs(&mut self) -> &mut ProgressSet {
self.prs.as_mut().unwrap()
}
// TODO: revoke pub when there is a better way to test.
/// For a given hardstate, load the state into self.
pub fn load_state(&mut self, hs: &HardState) {
if hs.get_commit() < self.raft_log.committed || hs.get_commit() > self.raft_log.last_index()
{
panic!(
"{} hs.commit {} is out of range [{}, {}]",
self.tag,
hs.get_commit(),
self.raft_log.committed,
self.raft_log.last_index()
)
}
self.raft_log.committed = hs.get_commit();
self.term = hs.get_term();
self.vote = hs.get_vote();
}
/// `pass_election_timeout` returns true iff `election_elapsed` is greater
/// than or equal to the randomized election timeout in
/// [`election_timeout`, 2 * `election_timeout` - 1].
pub fn pass_election_timeout(&self) -> bool {
self.election_elapsed >= self.randomized_election_timeout
}
/// Regenerates and stores the election timeout.
pub fn reset_randomized_election_timeout(&mut self) {
let prev_timeout = self.randomized_election_timeout;
let timeout = thread_rng().gen_range(self.min_election_timeout, self.max_election_timeout);
debug!(
"{} reset election timeout {} -> {} at {}",
self.tag, prev_timeout, timeout, self.election_elapsed
);
self.randomized_election_timeout = timeout;
}
// check_quorum_active returns true if the quorum is active from
// the view of the local raft state machine. Otherwise, it returns
// false.
// check_quorum_active also resets all recent_active to false.
// check_quorum_active can only called by leader.
fn check_quorum_active(&mut self) -> bool {
let self_id = self.id;
let mut act = 0;
for (&id, pr) in self.mut_prs().iter_mut() {
if id == self_id {
act += 1;
continue;
}
if !pr.is_learner && pr.recent_active {
act += 1;
}
pr.recent_active = false;
}
act >= self.quorum()
}
/// Issues a message to timeout immediately.
pub fn send_timeout_now(&mut self, to: u64) {
let msg = new_message(to, MessageType::MsgTimeoutNow, None);
self.send(msg);
}
/// Stops the tranfer of a leader.
pub fn abort_leader_transfer(&mut self) {
self.lead_transferee = None;
}
}