// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/consensus/raft_consensus.h"

#include <algorithm>
#include <cmath>
#include <cstdint>
#include <functional>
#include <iterator>
#include <memory>
#include <mutex>
#include <optional>
#include <ostream>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>

#include <gflags/gflags.h>
#include <google/protobuf/util/message_differencer.h>

#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/leader_election.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/peer_manager.h"
#include "kudu/consensus/pending_rounds.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/periodic.h"
#include "kudu/util/async_util.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/process_memory.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
#include "kudu/util/url-coding.h"

DEFINE_int32(raft_heartbeat_interval_ms, 500,
             "The heartbeat interval for Raft replication. The leader produces heartbeats "
             "to followers at this interval. The followers expect a heartbeat at this interval "
             "and consider a leader to have failed if it misses several in a row.");
TAG_FLAG(raft_heartbeat_interval_ms, advanced);

DEFINE_double(leader_failure_max_missed_heartbeat_periods, 3.0,
             "Maximum heartbeat periods that the leader can fail to heartbeat in before we "
             "consider the leader to be failed. The total failure timeout in milliseconds is "
             "raft_heartbeat_interval_ms times leader_failure_max_missed_heartbeat_periods. "
             "The value passed to this flag may be fractional.");
TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced);

DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000,
             "Maximum time to sleep in between leader election retries, in addition to the "
             "regular timeout. When leader election fails the interval in between retries "
             "increases exponentially, up to this value.");
TAG_FLAG(leader_failure_exp_backoff_max_delta_ms, experimental);

DEFINE_bool(enable_leader_failure_detection, true,
            "Whether to enable failure detection of tablet leaders. If enabled, attempts will be "
            "made to elect a follower as a new leader when the leader is detected to have failed.");
TAG_FLAG(enable_leader_failure_detection, unsafe);

DEFINE_bool(evict_failed_followers, true,
            "Whether to evict followers from the Raft config that have fallen "
            "too far behind the leader's log to catch up normally or have been "
            "unreachable by the leader for longer than "
            "follower_unavailable_considered_failed_sec");
TAG_FLAG(evict_failed_followers, advanced);
TAG_FLAG(evict_failed_followers, runtime);

DEFINE_bool(follower_reject_update_consensus_requests, false,
            "Whether a follower will return an error for all UpdateConsensus() requests. "
            "Warning! This is only intended for testing.");
TAG_FLAG(follower_reject_update_consensus_requests, unsafe);

DEFINE_bool(follower_fail_all_prepare, false,
            "Whether a follower will fail preparing all ops. "
            "Warning! This is only intended for testing.");
TAG_FLAG(follower_fail_all_prepare, unsafe);

DEFINE_bool(raft_enable_pre_election, true,
            "When enabled, candidates will call a pre-election before "
            "running a real leader election.");
TAG_FLAG(raft_enable_pre_election, experimental);
TAG_FLAG(raft_enable_pre_election, runtime);

DEFINE_bool(raft_enable_tombstoned_voting, true,
            "When enabled, tombstoned tablets may vote in elections.");
TAG_FLAG(raft_enable_tombstoned_voting, experimental);
TAG_FLAG(raft_enable_tombstoned_voting, runtime);

// Enable improved re-replication (KUDU-1097).
DEFINE_bool(raft_prepare_replacement_before_eviction, true,
            "When enabled, failed replicas will only be evicted after a "
            "replacement has been prepared for them.");
TAG_FLAG(raft_prepare_replacement_before_eviction, advanced);
TAG_FLAG(raft_prepare_replacement_before_eviction, experimental);

DECLARE_int32(memory_limit_warn_threshold_percentage);

// Metrics
// ---------
METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections,
                      "Follower Memory Pressure Rejections",
                      kudu::MetricUnit::kRequests,
                      "Number of RPC requests rejected due to "
                      "memory pressure while FOLLOWER.",
                      kudu::MetricLevel::kWarn);
METRIC_DEFINE_gauge_int64(tablet, raft_term,
                          "Current Raft Consensus Term",
                          kudu::MetricUnit::kUnits,
                          "Current Term of the Raft Consensus algorithm. This number increments "
                          "each time a leader election is started.",
                          kudu::MetricLevel::kDebug);
METRIC_DEFINE_gauge_int64(tablet, failed_elections_since_stable_leader,
                          "Failed Elections Since Stable Leader",
                          kudu::MetricUnit::kUnits,
                          "Number of failed elections on this node since there was a stable "
                          "leader. This number increments on each failed election and resets on "
                          "each successful one.",
                          kudu::MetricLevel::kWarn);
METRIC_DEFINE_gauge_int64(tablet, time_since_last_leader_heartbeat,
                          "Time Since Last Leader Heartbeat",
                          kudu::MetricUnit::kMilliseconds,
                          "The time elapsed since the last heartbeat from the leader "
                          "in milliseconds. This metric is identically zero on a leader replica.",
                          kudu::MetricLevel::kDebug);


using google::protobuf::util::MessageDifferencer;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::PeriodicTimer;
using kudu::tserver::TabletServerErrorPB;
using std::nullopt;
using std::optional;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::weak_ptr;
using strings::Substitute;

namespace kudu {
namespace consensus {

RaftConsensus::RaftConsensus(
    ConsensusOptions options,
    RaftPeerPB local_peer_pb,
    scoped_refptr<ConsensusMetadataManager> cmeta_manager,
    ServerContext server_ctx)
    : options_(std::move(options)),
      local_peer_pb_(std::move(local_peer_pb)),
      log_prefix_(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid())),
      cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
      server_ctx_(std::move(server_ctx)),
      state_(kNew),
      rng_(GetRandomSeed32()),
      leader_is_ready_(false),
      leader_transfer_in_progress_(false),
      withhold_votes_until_(MonoTime::Min()),
      last_received_cur_leader_(MinimumOpId()),
      failed_elections_since_stable_leader_(0),
      shutdown_(false),
      update_calls_for_tests_(0) {
  DCHECK(local_peer_pb_.has_permanent_uuid());
}

Status RaftConsensus::Init() {
  LockGuard l(lock_);
  DCHECK_EQ(kNew, state_) << State_Name(state_);
  RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
  SetStateUnlocked(kInitialized);
  return Status::OK();
}

RaftConsensus::~RaftConsensus() {
  Shutdown();
}

Status RaftConsensus::Create(ConsensusOptions options,
                             RaftPeerPB local_peer_pb,
                             scoped_refptr<ConsensusMetadataManager> cmeta_manager,
                             ServerContext server_ctx,
                             shared_ptr<RaftConsensus>* consensus_out) {
  shared_ptr<RaftConsensus> consensus(RaftConsensus::make_shared(std::move(options),
                                                                 std::move(local_peer_pb),
                                                                 std::move(cmeta_manager),
                                                                 std::move(server_ctx)));
  RETURN_NOT_OK_PREPEND(consensus->Init(), "Unable to initialize Raft consensus");
  *consensus_out = std::move(consensus);
  return Status::OK();
}

Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                            unique_ptr<PeerProxyFactory> peer_proxy_factory,
                            scoped_refptr<log::Log> log,
                            unique_ptr<TimeManager> time_manager,
                            ConsensusRoundHandler* round_handler,
                            const scoped_refptr<MetricEntity>& metric_entity,
                            MarkDirtyCallback cb) {
  DCHECK(metric_entity);

  peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
  log_ = DCHECK_NOTNULL(std::move(log));
  time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
  round_handler_ = DCHECK_NOTNULL(round_handler);
  mark_dirty_clbk_ = std::move(cb);

  term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term,
                                                  CurrentTerm(),
                                                  MergeType::kMax);
  follower_memory_pressure_rejections_ =
      metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections);

  num_failed_elections_metric_ =
      metric_entity->FindOrCreateGauge(&METRIC_failed_elections_since_stable_leader,
                                       failed_elections_since_stable_leader_);

  METRIC_time_since_last_leader_heartbeat.InstantiateFunctionGauge(
      metric_entity, [this]() { return this->GetMillisSinceLastLeaderHeartbeat(); },
      MergeType::kMax)
      ->AutoDetach(&metric_detacher_);

  // A single Raft thread pool token is shared between RaftConsensus and
  // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
  // raw pointer to the token, to emphasize that RaftConsensus is responsible
  // for destroying the token.
  ThreadPool* raft_pool = server_ctx_.raft_pool;
  raft_pool_token_ = raft_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT);

  // The message queue that keeps track of which operations need to be replicated
  // where.
  //
  // Note: the message queue receives a dedicated Raft thread pool token so that
  // its submissions don't block other submissions by RaftConsensus (such as
  // heartbeat processing).
  //
  // TODO(adar): the token is SERIAL to match the previous single-thread
  // observer pool behavior, but CONCURRENT may be safe here.
  unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue(
      metric_entity,
      log_,
      time_manager_.get(),
      local_peer_pb_,
      options_.tablet_id,
      raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL),
      server_ctx_.quiescing,
      info.last_id,
      info.last_committed_id,
      server_ctx_.allow_status_msg_for_failed_peer));

  // A manager for the set of peers that actually send the operations both remotely
  // and to the local wal.
  unique_ptr<PeerManager> peer_manager(new PeerManager(options_.tablet_id,
                                                       peer_uuid(),
                                                       peer_proxy_factory_.get(),
                                                       queue.get(),
                                                       raft_pool_token_.get(),
                                                       log_));
  unique_ptr<PendingRounds> pending(new PendingRounds(
      LogPrefixThreadSafe(), time_manager_.get()));

  // Capture a weak_ptr reference into the functor so it can safely handle
  // outliving the consensus instance.
  weak_ptr<RaftConsensus> w = shared_from_this();
  PeriodicTimer::Options opts;
  opts.one_shot = true;
  failure_detector_ = PeriodicTimer::Create(
      peer_proxy_factory_->messenger(),
      [w]() {
        if (auto consensus = w.lock()) {
          consensus->ReportFailureDetected();
        }
      },
      MinimumElectionTimeout(),
      opts);

  transfer_period_timer_ = PeriodicTimer::Create(
      peer_proxy_factory_->messenger(),
      [w]() {
        if (auto consensus = w.lock()) {
          consensus->EndLeaderTransferPeriod();
        }
      },
      MinimumElectionTimeout(),
      opts);

  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
                                   << State_Name(state_);

    queue_ = std::move(queue);
    peer_manager_ = std::move(peer_manager);
    pending_ = std::move(pending);

    ClearLeaderUnlocked();

    // Our last persisted term can be higher than the last persisted operation
    // (i.e. if we called an election) but reverse should never happen.
    if (info.last_id.term() > CurrentTermUnlocked()) {
      return Status::Corruption(Substitute("Unable to start RaftConsensus: "
          "The last op in the WAL with id $0 has a term ($1) that is greater "
          "than the latest recorded term, which is $2",
          OpIdToString(info.last_id),
          info.last_id.term(),
          CurrentTermUnlocked()));
    }

    // Append any uncommitted replicate messages found during log replay to the queue.
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
                                   << info.orphaned_replicates.size()
                                   << " pending ops. Active config: "
                                   << SecureShortDebugString(cmeta_->ActiveConfig());
    for (ReplicateMsg* replicate : info.orphaned_replicates) {
      ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
      RETURN_NOT_OK(StartFollowerOpUnlocked(replicate_ptr));
    }

    // Set the initial committed opid for the PendingRounds only after
    // appending any uncommitted replicate messages to the queue.
    pending_->SetInitialCommittedOpId(info.last_committed_id);

    // If this is the first term expire the FD immediately so that we have a
    // fast first election, otherwise we just let the timer expire normally.
    optional<MonoDelta> fd_initial_delta;
    if (CurrentTermUnlocked() == 0) {
      // The failure detector is initialized to a low value to trigger an early
      // election (unless someone else requested a vote from us first, which
      // resets the election timer).
      //
      // We do it this way instead of immediately running an election to get a
      // higher likelihood of enough servers being available when the first one
      // attempts an election to avoid multiple election cycles on startup,
      // while keeping that "waiting period" random.
      if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Consensus starting up: Expiring failure detector timer "
                                          "to make a prompt election more likely";
        fd_initial_delta = MonoDelta::FromMilliseconds(
            rng_.Uniform(FLAGS_raft_heartbeat_interval_ms));
      }
    }

    // Now assume non-leader replica duties.
    RETURN_NOT_OK(BecomeReplicaUnlocked(fd_initial_delta));

    SetStateUnlocked(kRunning);
  }

  if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
    LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
    WARN_NOT_OK_EVERY_N_SECS(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION),
                             "Couldn't start leader election", 10);
  }

  // Report become visible to the Master.
  MarkDirty("RaftConsensus started");

  return Status::OK();
}

bool RaftConsensus::IsRunning() const {
  return state_ == kRunning;
}

Status RaftConsensus::EmulateElectionForTests() {
  TRACE_EVENT2("consensus", "RaftConsensus::EmulateElectionForTests",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);

  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  RETURN_NOT_OK(CheckRunningUnlocked());

  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";

  // Assume leadership of new term.
  RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1));
  SetLeaderUuidUnlocked(peer_uuid());
  return BecomeLeaderUnlocked();
}

namespace {
const char* ModeString(RaftConsensus::ElectionMode mode) {
  switch (mode) {
    case RaftConsensus::NORMAL_ELECTION:
      return "leader election";
    case RaftConsensus::PRE_ELECTION:
      return "pre-election";
    case RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE:
      return "forced leader election";
  }
  __builtin_unreachable(); // silence gcc warnings
}
string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uuid) {
  switch (reason) {
    case RaftConsensus::INITIAL_SINGLE_NODE_ELECTION:
      return "initial election of a single-replica configuration";
    case RaftConsensus::EXTERNAL_REQUEST:
      return "received explicit request";
    case RaftConsensus::ELECTION_TIMEOUT_EXPIRED:
      if (leader_uuid.empty()) {
        return "no leader contacted us within the election timeout";
      }
      return Substitute("detected failure of leader $0", leader_uuid);
  }
  __builtin_unreachable(); // silence gcc warnings
}
} // anonymous namespace

Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
  if (server_ctx_.quiescing && server_ctx_.quiescing->load()) {
    return Status::IllegalState("leader elections are disabled");
  }
  const char* const mode_str = ModeString(mode);

  TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
               "peer", LogPrefixThreadSafe(),
               "mode", mode_str);
  scoped_refptr<LeaderElection> election;
  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    RETURN_NOT_OK(CheckRunningUnlocked());

    RaftPeerPB::Role active_role = cmeta_->active_role();
    if (active_role == RaftPeerPB::LEADER) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << Substitute(
          "Not starting $0 -- already a leader", mode_str);
      return Status::OK();
    }
    if (PREDICT_FALSE(!consensus::IsVoterRole(active_role))) {
      // A non-voter should not start leader elections. The leader failure
      // detector should be re-enabled once the non-voter replica is promoted
      // to voter replica.
      return Status::IllegalState("only voting members can start elections",
          SecureShortDebugString(cmeta_->ActiveConfig()));
    }
    if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
      SnoozeFailureDetector();
      return Status::IllegalState("Not starting election: node is currently "
                                  "a non-participant in the Raft config",
                                  SecureShortDebugString(cmeta_->ActiveConfig()));
    }
    LOG_WITH_PREFIX_UNLOCKED(INFO)
        << "Starting " << mode_str
        << " (" << ReasonString(reason, GetLeaderUuidUnlocked()) << ")";

    // Snooze to avoid the election timer firing again as much as possible.
    // We do not disable the election timer while running an election, so that
    // if the election times out, we will try again.
    MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked();
    SnoozeFailureDetector(string("starting election"), timeout);

    // Increment the term and vote for ourselves, unless it's a pre-election.
    if (mode != PRE_ELECTION) {
      // TODO(mpercy): Consider using a separate Mutex for voting, which must sync to disk.

      // We skip flushing the term to disk because setting the vote just below also
      // flushes to disk, and the double fsync doesn't buy us anything.
      RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1,
                                              SKIP_FLUSH_TO_DISK));
      RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid()));
    }

    RaftConfigPB active_config = cmeta_->ActiveConfig();
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: "
                                   << SecureShortDebugString(active_config);

    // Initialize the VoteCounter.
    int num_voters = CountVoters(active_config);
    int majority_size = MajoritySize(num_voters);
    VoteCounter counter(num_voters, majority_size);

    // Vote for ourselves.
    bool duplicate;
    RETURN_NOT_OK(counter.RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
    CHECK(!duplicate) << LogPrefixUnlocked()
                      << "Inexplicable duplicate self-vote for term "
                      << CurrentTermUnlocked();

    VoteRequestPB request;
    request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
    request.set_candidate_uuid(peer_uuid());
    if (mode == PRE_ELECTION) {
      // In a pre-election, we haven't bumped our own term yet, so we need to be
      // asking for votes for the next term.
      request.set_is_pre_election(true);
      request.set_candidate_term(CurrentTermUnlocked() + 1);
    } else {
      request.set_candidate_term(CurrentTermUnlocked());
    }
    request.set_tablet_id(options_.tablet_id);
    *request.mutable_candidate_status()->mutable_last_received() =
        queue_->GetLastOpIdInLog();

    auto self = shared_from_this();
    election.reset(new LeaderElection(
        std::move(active_config),
        // The RaftConsensus ref passed below ensures that this raw pointer
        // remains safe to use for the entirety of LeaderElection's life.
        peer_proxy_factory_.get(),
        std::move(request), std::move(counter), timeout,
        [self, reason](const ElectionResult& result) {
          self->ElectionCallback(reason, result);
        }));
  }

  // Start the election outside the lock.
  election->Run();

  return Status::OK();
}

Status RaftConsensus::WaitUntilLeader(const MonoDelta& timeout) {
  MonoTime deadline = MonoTime::Now() + timeout;
  while (role() != consensus::RaftPeerPB::LEADER) {
    if (MonoTime::Now() >= deadline) {
      return Status::TimedOut(Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3",
                                         peer_uuid(), options_.tablet_id, timeout.ToString(),
                                         role()));
    }
    SleepFor(MonoDelta::FromMilliseconds(10));
  }
  return Status::OK();
}

Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
  TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  DCHECK((queue_->IsInLeaderMode() && cmeta_->active_role() == RaftPeerPB::LEADER) ||
         (!queue_->IsInLeaderMode() && cmeta_->active_role() != RaftPeerPB::LEADER));
  RETURN_NOT_OK(CheckRunningUnlocked());
  if (cmeta_->active_role() != RaftPeerPB::LEADER) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to step down while not leader";
    resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
    StatusToPB(Status::IllegalState("Not currently leader"),
               resp->mutable_error()->mutable_status());
    // We return OK so that the tablet service won't overwrite the error code.
    return Status::OK();
  }
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received request to step down";
  RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1,
                                          SKIP_FLUSH_TO_DISK));
  // Snooze the failure detector for an extra leader failure timeout.
  // This should ensure that a different replica is elected leader after this one steps down.
  SnoozeFailureDetector(string("explicit stepdown request"), MonoDelta::FromMilliseconds(
      2 * MinimumElectionTimeout().ToMilliseconds()));
  return Status::OK();
}

Status RaftConsensus::TransferLeadership(const optional<string>& new_leader_uuid,
                                         LeaderStepDownResponsePB* resp) {
  TRACE_EVENT0("consensus", "RaftConsensus::TransferLeadership");
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received request to transfer leadership"
                                 << (new_leader_uuid ?
                                    Substitute(" to TS $0", *new_leader_uuid) :
                                    "");
  DCHECK((queue_->IsInLeaderMode() && cmeta_->active_role() == RaftPeerPB::LEADER) ||
         (!queue_->IsInLeaderMode() && cmeta_->active_role() != RaftPeerPB::LEADER));
  RETURN_NOT_OK(CheckRunningUnlocked());
  if (cmeta_->active_role() != RaftPeerPB::LEADER) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transer leadership while not leader";
    resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
    StatusToPB(Status::IllegalState("not currently leader"),
               resp->mutable_error()->mutable_status());
    // We return OK so that the tablet service won't overwrite the error code.
    return Status::OK();
  }
  if (new_leader_uuid) {
    if (*new_leader_uuid == peer_uuid()) {
      // Short-circuit as we are transferring leadership to ourselves and we
      // already checked that we are leader.
      return Status::OK();
    }
    if (!IsRaftConfigVoter(*new_leader_uuid, cmeta_->ActiveConfig())) {
      const string msg = Substitute("tablet server $0 is not a voter in the active config",
                                    *new_leader_uuid);
      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transfer leadership "
                                     << "because " << msg;
      return Status::InvalidArgument(msg);
    }
  }
  return BeginLeaderTransferPeriodUnlocked(new_leader_uuid);
}

Status RaftConsensus::BeginLeaderTransferPeriodUnlocked(
    const optional<string>& successor_uuid) {
  DCHECK(lock_.is_locked());
  if (std::atomic_exchange(&leader_transfer_in_progress_, true)) {
    return Status::ServiceUnavailable(
        Substitute("leadership transfer for $0 already in progress",
                   options_.tablet_id));
  }
  queue_->BeginWatchForSuccessor(successor_uuid);
  transfer_period_timer_->Start();
  return Status::OK();
}

void RaftConsensus::EndLeaderTransferPeriod() {
  transfer_period_timer_->Stop();
  queue_->EndWatchForSuccessor();
  leader_transfer_in_progress_ = false;
}

scoped_refptr<ConsensusRound> RaftConsensus::NewRound(
    unique_ptr<ReplicateMsg> replicate_msg,
    ConsensusReplicatedCallback replicated_cb) {
  return make_scoped_refptr(new ConsensusRound(this,
                                               std::move(replicate_msg),
                                               std::move(replicated_cb)));
}

void RaftConsensus::ReportFailureDetectedTask() {
  Status s = StartElection(FLAGS_raft_enable_pre_election ?
      PRE_ELECTION : NORMAL_ELECTION, ELECTION_TIMEOUT_EXPIRED);
  if (PREDICT_FALSE(!s.ok())) {
    WARN_NOT_OK_EVERY_N_SECS(
        s, LogPrefixThreadSafe() + "failed to trigger leader election", 10);
    // Normally the failure detector would be enabled at the end of the election,
    // but since the election failed to start, we must reenable explicitly.
    EnableFailureDetector();
  }
}

void RaftConsensus::ReportFailureDetected() {
  // We're running on a timer thread; start an election on a different thread.
  //
  // There's no need to reenable the failure detector; if this fails, it's a
  // sign that RaftConsensus has stopped and we no longer need failure detection.
  auto self = shared_from_this();
  Status s = raft_pool_token_->Submit([self]() { self->ReportFailureDetectedTask(); });
  if (PREDICT_FALSE(!s.ok())) {
    static const char* msg = "failed to submit failure detected task";
    CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
    WARN_NOT_OK(s, LogPrefixThreadSafe() + msg);
  }
}

Status RaftConsensus::BecomeLeaderUnlocked() {
  DCHECK(lock_.is_locked());

  TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked();

  // Disable FD while we are leader.
  DisableFailureDetector();

  // Don't vote for anyone if we're a leader.
  withhold_votes_until_ = MonoTime::Max();

  // Leadership never starts in a transfer period.
  EndLeaderTransferPeriod();

  queue_->RegisterObserver(this);
  bool was_leader = queue_->IsInLeaderMode();
  RefreshConsensusQueueAndPeersUnlocked();
  if (!was_leader && server_ctx_.num_leaders) {
    server_ctx_.num_leaders->Increment();
  }

  // Initiate a NO_OP op that is sent at the beginning of every term
  // change in raft.
  auto replicate = new ReplicateMsg;
  replicate->set_op_type(NO_OP);
  replicate->mutable_noop_request(); // Define the no-op request field.
  CHECK_OK(time_manager_->AssignTimestamp(replicate));

  scoped_refptr<ConsensusRound> round(
      new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate))));
  auto* round_raw = round.get();
  round->SetConsensusReplicatedCallback(
      [this, round_raw](const Status& s) {
        this->NonTxRoundReplicationFinished(round_raw, &DoNothingStatusCB, s);
      });

  last_leader_communication_time_micros_ = 0;
  RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
  leader_is_ready_ = true;

  return Status::OK();
}

Status RaftConsensus::BecomeReplicaUnlocked(optional<MonoDelta> fd_delta) {
  DCHECK(lock_.is_locked());

  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: "
                                 << ToStringUnlocked();
  ClearLeaderUnlocked();

  // Enable/disable leader failure detection if becoming VOTER/NON_VOTER replica
  // correspondingly.
  UpdateFailureDetectorState(std::move(fd_delta));

  // Now that we're a replica, we can allow voting for other nodes.
  withhold_votes_until_ = MonoTime::Min();

  // Deregister ourselves from the queue. We no longer need to track what gets
  // replicated since we're stepping down.
  queue_->UnRegisterObserver(this);
  bool was_leader = queue_->IsInLeaderMode();
  queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
  if (was_leader && server_ctx_.num_leaders) {
    server_ctx_.num_leaders->IncrementBy(-1);
  }
  peer_manager_->Close();

  return Status::OK();
}

Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
  std::lock_guard<simple_spinlock> lock(update_lock_);
  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
    RETURN_NOT_OK(round->CheckBoundTerm(CurrentTermUnlocked()));
    RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
  }

  peer_manager_->SignalRequest();
  return Status::OK();
}

Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
#ifndef NDEBUG
  const auto& msg = *round->replicate_msg();
  DCHECK(!msg.has_id()) << "should not have an ID yet: "
                        << SecureShortDebugString(msg);
#endif
  // Get a snapshot of an atomic member for consistency between the condition
  // and the error message, if any.
  const State state = state_;
  if (PREDICT_FALSE(state != kRunning)) {
    return Status::IllegalState("RaftConsensus is not running",
                                Substitute("state $0", State_Name(state)));
  }

  // The order of reading role_and_term and leader_is_ready is essential to
  // deal with situations when leader_is_ready and role_and_term are read
  // in different Raft terms.
  //  * It's safe to bind to a stale term even if we've asserted leadership
  //    in a newer term: the stale op will be rejected on followers anyway
  //    because Raft guarantees that a majority of replicas have accepted
  //    the new term.
  //  * It's safe for the term to be stale if we haven't asserted leadership
  //    for a newer term because we'll exit out below.
  //  * It's unsafe to bind to a newer term if we've asserted leadership
  //    for a stale term, hence this ordering.
  const auto role_and_term = cmeta_->GetRoleAndTerm();
  const bool leader_is_ready = leader_is_ready_;

  const auto role = role_and_term.first;
  switch (role) {
    case RaftPeerPB::LEADER:
      if (leader_transfer_in_progress_) {
        return Status::ServiceUnavailable("leader transfer in progress");
      }
      if (!leader_is_ready) {
        // Leader replica should not accept write operations before scheduling
        // the replication of a NO_OP to assert its leadership in the current
        // term. Otherwise there might be a race, so the very first accepted
        // write operation might have timestamp lower than the timestamp
        // of the NO_OP. That would trigger an assertion in MVCC.
        return Status::ServiceUnavailable("leader is not yet ready");
      }
      break;

    default:
      return Status::IllegalState(
          Substitute("replica $0 is not leader of this config: current role $1",
                     peer_uuid(), RaftPeerPB::Role_Name(role)));
  }
  const auto term = role_and_term.second;
  round->BindToTerm(term);

  return Status::OK();
}

Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
  DCHECK(lock_.is_locked());

  *round->replicate_msg()->mutable_id() = queue_->GetNextOpId();
  RETURN_NOT_OK(AddPendingOperationUnlocked(round));

  // The only reasons for a bad status would be if the log itself were shut down,
  // or if we had an actual IO error, which we currently don't handle.
  CHECK_OK_PREPEND(queue_->AppendOperation(round->replicate_scoped_refptr()),
                   Substitute("$0: could not append to queue", LogPrefixUnlocked()));
  return Status::OK();
}

Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) {
  DCHECK(lock_.is_locked());
  DCHECK(pending_);

  // If we are adding a pending config change, we need to propagate it to the
  // metadata.
  if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
    // Fill in the opid for the proposed new configuration. This has to be done
    // here rather than when it's first created because we didn't yet have an
    // OpId assigned at creation time.
    ChangeConfigRecordPB* change_record = round->replicate_msg()->mutable_change_config_record();
    change_record->mutable_new_config()->set_opid_index(round->replicate_msg()->id().index());

    DCHECK(change_record->IsInitialized())
        << "change_config_record missing required fields: "
        << change_record->InitializationErrorString();

    const RaftConfigPB& new_config = change_record->new_config();

    if (!new_config.unsafe_config_change()) {
      Status s = CheckNoConfigChangePendingUnlocked();
      if (PREDICT_FALSE(!s.ok())) {
        s = s.CloneAndAppend(Substitute("\n  New config: $0", SecureShortDebugString(new_config)));
        LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
        return s;
      }
    }
    // Check if the pending Raft config has an OpId less than the committed
    // config. If so, this is a replay at startup in which the COMMIT
    // messages were delayed.
    int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
    if (round->replicate_msg()->id().index() > committed_config_opid_index) {
      RETURN_NOT_OK(SetPendingConfigUnlocked(new_config));
      if (cmeta_->active_role() == RaftPeerPB::LEADER) {
        RefreshConsensusQueueAndPeersUnlocked();
      }
    } else {
      LOG_WITH_PREFIX_UNLOCKED(INFO)
          << "Ignoring setting pending config change with OpId "
          << round->replicate_msg()->id() << " because the committed config has OpId index "
          << committed_config_opid_index << ". The config change we are ignoring is: "
          << "Old config: { " << SecureShortDebugString(change_record->old_config()) << " }. "
          << "New config: { " << SecureShortDebugString(new_config) << " }";
    }
  }

  return pending_->AddPendingOperation(round);
}

void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
  TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex",
               "tablet", options_.tablet_id,
               "commit_index", commit_index);

  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  // We will process commit notifications while shutting down because a replica
  // which has initiated a Prepare() / Replicate() may eventually commit even if
  // its state has changed after the initial Append() / Update().
  const State state = state_;
  if (PREDICT_FALSE(state != kRunning && state != kStopping)) {
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
                                      << "Replica not in running state: "
                                      << State_Name(state);
    return;
  }

  pending_->AdvanceCommittedIndex(commit_index);

  if (cmeta_->active_role() == RaftPeerPB::LEADER) {
    peer_manager_->SignalRequest(false);
  }
}

void RaftConsensus::NotifyTermChange(int64_t term) {
  TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange",
               "tablet", options_.tablet_id,
               "term", term);

  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  Status s = CheckRunningUnlocked();
  if (PREDICT_FALSE(!s.ok())) {
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term "
                                      << "(" << term << "): " << s.ToString();
    return;
  }
  WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
}

void RaftConsensus::NotifyFailedFollower(const string& uuid,
                                         int64_t term,
                                         const string& reason) {
  // Common info used in all of the log messages within this method.
  string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ",
                               uuid, term, reason);

  if (!FLAGS_evict_failed_followers) {
    LOG(INFO) << LogPrefixThreadSafe() << fail_msg
              << "Eviction of failed followers is disabled. Doing nothing.";
    return;
  }

  RaftConfigPB committed_config;
  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    int64_t current_term = CurrentTermUnlocked();
    if (current_term != term) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
                                     << "previous term " << term << ", but a leader election "
                                     << "likely occurred since the failure was detected. "
                                     << "Doing nothing.";
      return;
    }

    if (cmeta_->has_pending_config()) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "There is already a config change operation "
                                     << "in progress. Unable to evict follower until it completes. "
                                     << "Doing nothing.";
      return;
    }
    committed_config = cmeta_->CommittedConfig();
  }

  // Run config change on thread pool after dropping lock.
  auto self = shared_from_this();
  WARN_NOT_OK(raft_pool_token_->Submit(
      [self, uuid, committed_config, reason]() {
        self->TryRemoveFollowerTask(uuid, committed_config, reason);
      }),
    LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
}

void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
  // Run the config change on the raft thread pool.
  auto self = shared_from_this();
  WARN_NOT_OK(raft_pool_token_->Submit(
      [self, peer_uuid]() { self->TryPromoteNonVoterTask(peer_uuid); }),
    LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
}

void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
  const auto& log_prefix = LogPrefixThreadSafe();
  LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election";
  auto self = shared_from_this();
  WARN_NOT_OK(raft_pool_token_->Submit(
      [self, peer_uuid]() { self->TryStartElectionOnPeerTask(peer_uuid); }),
    log_prefix + "Unable to start TryStartElectionOnPeerTask");
}

void RaftConsensus::NotifyPeerHealthChange() {
  MarkDirty("Peer health change");
}

void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
                                          const RaftConfigPB& committed_config,
                                          const string& reason) {
  ChangeConfigRequestPB req;
  req.set_tablet_id(options_.tablet_id);
  req.mutable_server()->set_permanent_uuid(uuid);
  req.set_type(REMOVE_PEER);
  req.set_cas_config_opid_index(committed_config.opid_index());
  LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower "
            << uuid << " from the Raft config. Reason: " << reason;
  optional<TabletServerErrorPB::Code> error_code;
  WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
              LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
}

void RaftConsensus::TryPromoteNonVoterTask(const string& peer_uuid) {
  string msg = Substitute("attempt to promote peer $0: ", peer_uuid);
  int64_t current_committed_config_index;
  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);

    if (cmeta_->has_pending_config()) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "there is already a config change operation "
                                     << "in progress. Unable to promote follower until it "
                                     << "completes. Doing nothing.";
      return;
    }

    // Check if the peer is still part of the current committed config.
    RaftConfigPB committed_config = cmeta_->CommittedConfig();
    current_committed_config_index = committed_config.opid_index();

    RaftPeerPB* peer_pb;
    Status s = GetRaftConfigMember(&committed_config, peer_uuid, &peer_pb);
    if (!s.ok()) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "can't find peer in the "
                                     << "current committed config: "
                                     << committed_config.ShortDebugString()
                                     << ". Doing nothing.";
      return;
    }

    // Also check if the peer it still a NON_VOTER waiting for promotion.
    if (peer_pb->member_type() != RaftPeerPB::NON_VOTER || !peer_pb->attrs().promote()) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "peer is either no longer a NON_VOTER "
                                     << "or not marked for promotion anymore. Current "
                                     << "config: " << committed_config.ShortDebugString()
                                     << ". Doing nothing.";
      return;
    }
  }

  ChangeConfigRequestPB req;
  req.set_tablet_id(options_.tablet_id);
  req.set_type(MODIFY_PEER);
  req.mutable_server()->set_permanent_uuid(peer_uuid);
  req.mutable_server()->set_member_type(RaftPeerPB::VOTER);
  req.mutable_server()->mutable_attrs()->set_promote(false);
  req.set_cas_config_opid_index(current_committed_config_index);
  LOG(INFO) << LogPrefixThreadSafe() << "attempting to promote NON_VOTER "
            << peer_uuid << " to VOTER";
  optional<TabletServerErrorPB::Code> error_code;
  WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
              LogPrefixThreadSafe() + Substitute("Unable to promote non-voter $0", peer_uuid));
}

void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  // Double-check that the peer is a voter in the active config.
  if (!IsRaftConfigVoter(peer_uuid, cmeta_->ActiveConfig())) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not signalling peer " << peer_uuid
                                   << "to start an election: it's not a voter "
                                   << "in the active config.";
    return;
  }
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid
                                 << " to start an election";
  WARN_NOT_OK(peer_manager_->StartElection(peer_uuid),
              Substitute("unable to start election on peer $0", peer_uuid));
}

Status RaftConsensus::Update(const ConsensusRequestPB* request,
                             ConsensusResponsePB* response) {
  ++update_calls_for_tests_;

  if (PREDICT_FALSE(FLAGS_follower_reject_update_consensus_requests)) {
    return Status::IllegalState("Rejected: --follower_reject_update_consensus_requests "
                                "is set to true.");
  }

  response->set_responder_uuid(peer_uuid());

  VLOG_WITH_PREFIX(2) << "Replica received request: " << SecureShortDebugString(*request);

  // see var declaration
  std::lock_guard<simple_spinlock> lock(update_lock_);
  Status s = UpdateReplica(request, response);
  if (PREDICT_FALSE(VLOG_IS_ON(1))) {
    if (request->ops().empty()) {
      VLOG_WITH_PREFIX(1)
          << Substitute("Replica replied to status only request. Replica: $0. Response: $1",
                        ToString(), SecureShortDebugString(*response));
    }
  }
  return s;
}

// Helper function to check if the op is a no-op op.
static bool IsConsensusOnlyOperation(OperationType op_type) {
  return op_type == NO_OP || op_type == CHANGE_CONFIG_OP;
}

Status RaftConsensus::StartFollowerOpUnlocked(const ReplicateRefPtr& msg) {
  DCHECK(lock_.is_locked());

  if (IsConsensusOnlyOperation(msg->get()->op_type())) {
    return StartConsensusOnlyRoundUnlocked(msg);
  }

  if (PREDICT_FALSE(FLAGS_follower_fail_all_prepare)) {
    return Status::IllegalState("Rejected: --follower_fail_all_prepare "
                                "is set to true.");
  }

  VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting op: "
                               << SecureShortDebugString(msg->get()->id());
  scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
  ConsensusRound* round_ptr = round.get();
  RETURN_NOT_OK(round_handler_->StartFollowerOp(round));
  return AddPendingOperationUnlocked(round_ptr);
}

bool RaftConsensus::IsSingleVoterConfig() const {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  return cmeta_->CountVotersInConfig(COMMITTED_CONFIG) == 1 &&
         cmeta_->IsVoterInConfig(peer_uuid(), COMMITTED_CONFIG);
}

string RaftConsensus::LeaderRequest::OpsRangeString() const {
  string ret;
  ret.reserve(100);
  ret.push_back('[');
  if (!messages.empty()) {
    const OpId& first_op = (*messages.begin())->get()->id();
    const OpId& last_op = (*messages.rbegin())->get()->id();
    strings::SubstituteAndAppend(&ret, "$0.$1-$2.$3",
                                 first_op.term(), first_op.index(),
                                 last_op.term(), last_op.index());
  }
  ret.push_back(']');
  return ret;
}

void RaftConsensus::DeduplicateLeaderRequestUnlocked(const ConsensusRequestPB* rpc_req,
                                                     LeaderRequest* deduplicated_req) {
  DCHECK(lock_.is_locked());

  // TODO(todd): use queue committed index?
  int64_t last_committed_index = pending_->GetCommittedIndex();

  // The leader's preceding id.
  deduplicated_req->preceding_opid = &rpc_req->preceding_id();

  int64_t dedup_up_to_index = queue_->GetLastOpIdInLog().index();

  deduplicated_req->first_message_idx = -1;

  // In this loop we discard duplicates and advance the leader's preceding id
  // accordingly.
  for (int i = 0; i < rpc_req->ops_size(); i++) {
    const ReplicateMsg* leader_msg = &rpc_req->ops(i);

    if (leader_msg->id().index() <= last_committed_index) {
      VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
                                   << " (already committed)";
      deduplicated_req->preceding_opid = &leader_msg->id();
      continue;
    }

    if (leader_msg->id().index() <= dedup_up_to_index) {
      // If the index is uncommitted and below our match index, then it must be in the
      // pendings set.
      scoped_refptr<ConsensusRound> round =
          pending_->GetPendingOpByIndexOrNull(leader_msg->id().index());
      DCHECK(round) << "Could not find op with index " << leader_msg->id().index()
                    << " in pending set. committed= " << last_committed_index
                    << " dedup=" << dedup_up_to_index;

      // If the OpIds match, i.e. if they have the same term and id, then this is just
      // duplicate, we skip...
      if (OpIdEquals(round->replicate_msg()->id(), leader_msg->id())) {
        VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
                                     << " (already replicated)";
        deduplicated_req->preceding_opid = &leader_msg->id();
        continue;
      }

      // ... otherwise we must adjust our match index, i.e. all messages from now on
      // are "new"
      dedup_up_to_index = leader_msg->id().index();
    }

    if (deduplicated_req->first_message_idx == - 1) {
      deduplicated_req->first_message_idx = i;
    }
    deduplicated_req->messages.emplace_back(
        make_scoped_refptr_replicate(new ReplicateMsg(*leader_msg)));
  }

  if (deduplicated_req->messages.size() != rpc_req->ops_size()) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Deduplicated request from leader. Original: "
                          << rpc_req->preceding_id() << "->" << OpsRangeString(*rpc_req)
                          << "   Dedup: " << *deduplicated_req->preceding_opid << "->"
                          << deduplicated_req->OpsRangeString();
  }

}

Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
                                                      ConsensusResponsePB* response) {
  DCHECK(lock_.is_locked());
  // Do term checks first:
  if (PREDICT_FALSE(request->caller_term() != CurrentTermUnlocked())) {

    // If less, reject.
    if (request->caller_term() < CurrentTermUnlocked()) {
      string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
                              "Current term is $2. Ops: $3",

                              request->caller_uuid(),
                              request->caller_term(),
                              CurrentTermUnlocked(),
                              OpsRangeString(*request));
      LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
      FillConsensusResponseError(response,
                                 ConsensusErrorPB::INVALID_TERM,
                                 Status::IllegalState(msg));
      return Status::OK();
    }
    RETURN_NOT_OK(HandleTermAdvanceUnlocked(request->caller_term()));
  }
  return Status::OK();
}

Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
                                                                ConsensusResponsePB* response) {
  DCHECK(lock_.is_locked());

  bool term_mismatch;
  if (pending_->IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
    return Status::OK();
  }

  string error_msg = Substitute(
    "Log matching property violated."
    " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)",
    SecureShortDebugString(queue_->GetLastOpIdInLog()),
    SecureShortDebugString(*req.preceding_opid),
    term_mismatch ? "term" : "index");


  FillConsensusResponseError(response,
                             ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH,
                             Status::IllegalState(error_msg));

  // Adding a check to eliminate an unnecessary log message in the
  // scenario where this is the first message from the Leader of a new tablet.
  if (!OpIdEquals(MakeOpId(1,1), *req.preceding_opid)) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer "
                                   << req.leader_uuid << ": " << error_msg;
  }

  // If the terms mismatch we abort down to the index before the leader's preceding,
  // since we know that is the last opid that has a chance of not being overwritten.
  // Aborting preemptively here avoids us reporting a last received index that is
  // possibly higher than the leader's causing an avoidable cache miss on the leader's
  // queue.
  //
  // TODO: this isn't just an optimization! if we comment this out, we get
  // failures on raft_consensus-itest a couple percent of the time! Should investigate
  // why this is actually critical to do here, as opposed to just on requests that
  // append some ops.
  if (term_mismatch) {
    TruncateAndAbortOpsAfterUnlocked(req.preceding_opid->index() - 1);
  }

  return Status::OK();
}

void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
  DCHECK(lock_.is_locked());
  pending_->AbortOpsAfter(truncate_after_index);
  queue_->TruncateOpsAfter(truncate_after_index);
}

Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
                                                 ConsensusResponsePB* response,
                                                 LeaderRequest* deduped_req) {
  DCHECK(lock_.is_locked());

  if (PREDICT_FALSE(request->has_deprecated_committed_index() ||
      !request->has_all_replicated_index())) {
    return Status::InvalidArgument("Leader appears to be running an earlier version "
                                   "of Kudu. Please shut down and upgrade all servers "
                                   "before restarting.");
  }

  DeduplicateLeaderRequestUnlocked(request, deduped_req);

  // This is an additional check for KUDU-639 that makes sure the message's index
  // and term are in the right sequence in the request, after we've deduplicated
  // them. We do this before we change any of the internal state.
  //
  // TODO move this to raft_consensus-state or whatever we transform that into.
  // We should be able to do this check for each append, but right now the way
  // we initialize raft_consensus-state is preventing us from doing so.
  const OpId* prev = deduped_req->preceding_opid;
  for (const ReplicateRefPtr& message : deduped_req->messages) {
    Status s = PendingRounds::CheckOpInSequence(*prev, message->get()->id());
    if (PREDICT_FALSE(!s.ok())) {
      LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: "
          << s.ToString() << ". Leader Request: " << SecureShortDebugString(*request);
      return s;
    }
    prev = &message->get()->id();
  }

  RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response));

  if (response->status().has_error()) {
    return Status::OK();
  }

  RETURN_NOT_OK(EnforceLogMatchingPropertyMatchesUnlocked(*deduped_req, response));

  if (response->status().has_error()) {
    return Status::OK();
  }

  // If the first of the messages to apply is not in our log, either it follows the last
  // received message or it replaces some in-flight.
  if (!deduped_req->messages.empty()) {

    bool term_mismatch;
    CHECK(!pending_->IsOpCommittedOrPending(deduped_req->messages[0]->get()->id(), &term_mismatch));

    // If the index is in our log but the terms are not the same abort down to the leader's
    // preceding id.
    if (term_mismatch) {
      TruncateAndAbortOpsAfterUnlocked(deduped_req->preceding_opid->index());
    }
  }

  // If all of the above logic was successful then we can consider this to be
  // the effective leader of the configuration. If they are not currently marked as
  // the leader locally, mark them as leader now.
  const string& caller_uuid = request->caller_uuid();
  if (PREDICT_FALSE(HasLeaderUnlocked() &&
                    GetLeaderUuidUnlocked() != caller_uuid)) {
    LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected new leader in same term! "
        << "Existing leader UUID: " << GetLeaderUuidUnlocked() << ", "
        << "new leader UUID: " << caller_uuid;
  }
  if (PREDICT_FALSE(!HasLeaderUnlocked())) {
    SetLeaderUuidUnlocked(request->caller_uuid());
  }

  return Status::OK();
}

Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
                                    ConsensusResponsePB* response) {
  TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);
  Synchronizer log_synchronizer;
  StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();


  // The ordering of the following operations is crucial, read on for details.
  //
  // The main requirements explained in more detail below are:
  //
  //   1) We must enqueue the prepares before we write to our local log.
  //   2) If we were able to enqueue a prepare then we must be able to log it.
  //   3) If we fail to enqueue a prepare, we must not attempt to enqueue any
  //      later-indexed prepare or apply.
  //
  // See below for detailed rationale.
  //
  // The steps are:
  //
  // 0 - Split/Dedup
  //
  // We split the operations into replicates and commits and make sure that we don't
  // don't do anything on operations we've already received in a previous call.
  // This essentially makes this method idempotent.
  //
  // 1 - We mark as many pending ops as committed as we can.
  //
  // We may have some pending ops that, according to the leader, are now
  // committed. We Apply them early, because:
  // - Soon (step 2) we may reject the call due to excessive memory pressure. One
  //   way to relieve the pressure is by flushing the MRS, and applying these
  //   ops may unblock an in-flight Flush().
  // - The Apply and subsequent Prepares (step 2) can take place concurrently.
  //
  // 2 - We enqueue the Prepare of the ops.
  //
  // The actual prepares are enqueued in order but happen asynchronously so we don't
  // have decoding/acquiring locks on the critical path.
  //
  // We need to do this now for a number of reasons:
  // - Prepares, by themselves, are inconsequential, i.e. they do not mutate the
  //   state machine so, were we to crash afterwards, having the prepares in-flight
  //   won't hurt.
  // - Prepares depend on factors external to consensus (the op drivers and
  //   the TabletReplica) so if for some reason they cannot be enqueued we must know
  //   before we try write them to the WAL. Once enqueued, we assume that prepare will
  //   always succeed on a replica op (because the leader already prepared them
  //   successfully, and thus we know they are valid).
  // - The prepares corresponding to every operation that was logged must be in-flight
  //   first. This because should we need to abort certain ops (say a new leader
  //   says they are not committed) we need to have those prepares in-flight so that
  //   the ops can be continued (in the abort path).
  // - Failure to enqueue prepares is OK, we can continue and let the leader know that
  //   we only went so far. The leader will re-send the remaining messages.
  // - Prepares represent new ops, and ops consume memory. Thus, if the
  //   overall memory pressure on the server is too high, we will reject the prepares.
  //
  // 3 - We enqueue the writes to the WAL.
  //
  // We enqueue writes to the WAL, but only the operations that were successfully
  // enqueued for prepare (for the reasons introduced above). This means that even
  // if a prepare fails to enqueue, if any of the previous prepares were successfully
  // submitted they must be written to the WAL.
  // If writing to the WAL fails, we're in an inconsistent state and we crash. In this
  // case, no one will ever know of the ops we previously prepared so those are
  // inconsequential.
  //
  // 4 - We mark the ops as committed.
  //
  // For each op which has been committed by the leader, we update the
  // op state to reflect that. If the logging has already succeeded for that
  // op, this will trigger the Apply phase. Otherwise, Apply will be triggered
  // when the logging completes. In both cases the Apply phase executes asynchronously.
  // This must, of course, happen after the prepares have been triggered as the same batch
  // can both replicate/prepare and commit/apply an operation.
  //
  // Currently, if a prepare failed to enqueue we still trigger all applies for operations
  // with an id lower than it (if we have them). This is important now as the leader will
  // not re-send those commit messages. This will be moot when we move to the commit
  // commitIndex way of doing things as we can simply ignore the applies as we know
  // they will be triggered with the next successful batch.
  //
  // 5 - We wait for the writes to be durable.
  //
  // Before replying to the leader we wait for the writes to be durable. We then
  // just update the last replicated watermark and respond.
  //
  // TODO - These failure scenarios need to be exercised in an unit
  //        test. Moreover we need to add more fault injection spots (well that
  //        and actually use the) for each of these steps.
  //        This will be done in a follow up patch.
  TRACE("Updating replica for $0 ops", request->ops_size());

  // The deduplicated request.
  LeaderRequest deduped_req;
  auto& messages = deduped_req.messages;
  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    RETURN_NOT_OK(CheckRunningUnlocked());
    if (!cmeta_->IsMemberInConfig(peer_uuid(), ACTIVE_CONFIG)) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
    }

    deduped_req.leader_uuid = request->caller_uuid();

    RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req));
    if (response->status().has_error()) {
      // We had an error, like an invalid term, we still fill the response.
      FillConsensusResponseOKUnlocked(response);
      return Status::OK();
    }

    // As soon as we decide to accept the message:
    //   * snooze the failure detector
    //   * prohibit voting for anyone for the minimum election timeout
    // We are guaranteed to be acting as a FOLLOWER at this point by the above
    // sanity check.
    SnoozeFailureDetector();
    WithholdVotes();

    last_leader_communication_time_micros_ = GetMonoTimeMicros();

    // Reset the 'failed_elections_since_stable_leader' metric now that we've
    // accepted an update from the established leader. This is done in addition
    // to the reset of the value in SetLeaderUuidUnlocked() because there is
    // a potential race between resetting the failed elections count in
    // SetLeaderUuidUnlocked() and incrementing after a failed election
    // if another replica was elected leader in an election concurrent with
    // the one called by this replica.
    failed_elections_since_stable_leader_ = 0;
    num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);

    // We update the lag metrics here in addition to after appending to the queue so the
    // metrics get updated even when the operation is rejected.
    queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader());

    // 1 - Early commit pending (and committed) ops

    // What should we commit?
    // 1. As many pending ops as we can, except...
    // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
    // 3. ...the leader's committed index is always our upper bound.
    const int64_t early_apply_up_to = std::min({
        pending_->GetLastPendingOpOpId().index(),
        deduped_req.preceding_opid->index(),
        request->committed_index()});

    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to
                                 << ", Last pending opid index: "
                                 << pending_->GetLastPendingOpOpId().index()
                                 << ", preceding opid index: "
                                 << deduped_req.preceding_opid->index()
                                 << ", requested index: " << request->committed_index();
    TRACE("Early marking committed up to index $0", early_apply_up_to);
    CHECK_OK(pending_->AdvanceCommittedIndex(early_apply_up_to));

    // 2 - Enqueue the prepares

    TRACE("Triggering prepare for $0 ops", messages.size());

    if (PREDICT_TRUE(!messages.empty())) {
      // This request contains at least one message, and is likely to increase
      // our memory pressure.
      double capacity_pct;
      if (process_memory::SoftLimitExceeded(&capacity_pct)) {
        if (follower_memory_pressure_rejections_) follower_memory_pressure_rejections_->Increment();
        string msg = StringPrintf(
            "Soft memory limit exceeded (at %.2f%% of capacity)",
            capacity_pct);
        if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
          KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting consensus request: " << msg
                                        << THROTTLE_MSG;
        } else {
          KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting consensus request: " << msg
                                     << THROTTLE_MSG;
        }
        return Status::ServiceUnavailable(msg);
      }
    }

    Status prepare_status;
    auto iter = messages.begin();
    while (iter != messages.end()) {
      prepare_status = StartFollowerOpUnlocked(*iter);
      if (PREDICT_FALSE(!prepare_status.ok())) {
        break;
      }
      // TODO(dralves) Without leader leases this shouldn't be allowed to fail.
      // Once we have that functionality we'll have to revisit this.
      CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get()));
      ++iter;
    }

    // If we stopped before reaching the end we failed to prepare some message(s) and need
    // to perform cleanup, namely trimming deduped_req.messages to only contain the messages
    // that were actually prepared, and deleting the other ones since we've taken ownership
    // when we first deduped.
    if (iter != messages.end()) {
      LOG_WITH_PREFIX_UNLOCKED(WARNING) << Substitute(
          "Could not prepare op '$0' and following $1 ops. "
          "Status for this op: $2",
          (*iter)->get()->id().ShortDebugString(),
          std::distance(iter, messages.end()) - 1,
          prepare_status.ToString());
      iter = messages.erase(iter, messages.end());

      // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing
      // else we can do. The leader will detect this and retry later.
      if (messages.empty()) {
        string msg = Substitute("Rejecting Update request from peer $0 for term $1. "
                                "Could not prepare a single op due to: $2",
                                request->caller_uuid(),
                                request->caller_term(),
                                prepare_status.ToString());
        LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
        FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE,
                                   Status::IllegalState(msg));
        FillConsensusResponseOKUnlocked(response);
        return Status::OK();
      }
    }

    // All ops that are going to be prepared were started, advance the safe timestamp.
    // TODO(dralves) This is only correct because the queue only sets safe time when the request is
    // an empty heartbeat. If we actually start setting this on a consensus request along with
    // actual messages we need to be careful to ignore it if any of the messages fails to prepare.
    if (request->has_safe_timestamp()) {
      time_manager_->AdvanceSafeTime(Timestamp(request->safe_timestamp()));
    }

    OpId last_from_leader;
    // 3 - Enqueue the writes.
    // Now that we've triggered the prepares enqueue the operations to be written
    // to the WAL.
    if (PREDICT_TRUE(!messages.empty())) {
      last_from_leader = messages.back()->get()->id();
      // Trigger the log append asap, if fsync() is on this might take a while
      // and we can't reply until this is done.
      //
      // Since we've prepared, we need to be able to append (or we risk trying to apply
      // later something that wasn't logged). We crash if we can't.
      CHECK_OK(queue_->AppendOperations(messages, sync_status_cb));
    } else {
      last_from_leader = *deduped_req.preceding_opid;
    }

    // 4 - Mark ops as committed

    // Choose the last operation to be applied. This will either be 'committed_index', if
    // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
    // the last successfully enqueued prepare, if some prepare failed to enqueue.
    int64_t apply_up_to;
    if (last_from_leader.index() < request->committed_index()) {
      // we should never apply anything later than what we received in this request
      apply_up_to = last_from_leader.index();

      VLOG_WITH_PREFIX_UNLOCKED(2) << "Received commit index "
          << request->committed_index() << " from the leader but only"
          << " marked up to " << apply_up_to << " as committed.";
    } else {
      apply_up_to = request->committed_index();
    }

    VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
    TRACE("Marking committed up to $0", apply_up_to);
    CHECK_OK(pending_->AdvanceCommittedIndex(apply_up_to));
    queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());

    // If any messages failed to be started locally, then we already have removed them
    // from 'deduped_req' at this point. So, 'last_from_leader' is the last one that
    // we might apply.
    last_received_cur_leader_ = last_from_leader;

    // Fill the response with the current state. We will not mutate anymore state until
    // we actually reply to the leader, we'll just wait for the messages to be durable.
    FillConsensusResponseOKUnlocked(response);
  }
  // Release the lock while we wait for the log append to finish so that commits can go through.
  // We'll re-acquire it before we update the state again.

  // Update the last replicated op id
  if (!messages.empty()) {

    // 5 - We wait for the writes to be durable.

    // Note that this is safe because dist consensus now only supports a single outstanding
    // request at a time and this way we can allow commits to proceed while we wait.
    TRACE("Waiting on the replicates to finish logging");
    TRACE_EVENT0("consensus", "Wait for log");
    Status s;
    do {
      s = log_synchronizer.WaitFor(
          MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
      // If just waiting for our log append to finish lets snooze the timer.
      // We don't want to fire leader election nor accept vote requests because
      // we're still processing the Raft message from the leader,
      // waiting on our own log.
      if (s.IsTimedOut()) {
        SnoozeFailureDetector();
        WithholdVotes();
      }
    } while (s.IsTimedOut());
    RETURN_NOT_OK(s);

    TRACE("finished");
  }

  VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString()
                      << ". Request: " << SecureShortDebugString(*request);

  TRACE("UpdateReplicas() finished");
  return Status::OK();
}

void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
  DCHECK(lock_.is_locked());
  TRACE("Filling consensus response to leader.");
  response->set_responder_term(CurrentTermUnlocked());
  response->mutable_status()->mutable_last_received()->CopyFrom(
      queue_->GetLastOpIdInLog());
  response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
      last_received_cur_leader_);
  response->mutable_status()->set_last_committed_idx(
      queue_->GetCommittedIndex());
  if (PREDICT_TRUE(server_ctx_.quiescing) && server_ctx_.quiescing->load()) {
    response->set_server_quiescing(true);
  }
}

void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
                                               ConsensusErrorPB::Code error_code,
                                               const Status& status) {
  ConsensusErrorPB* error = response->mutable_status()->mutable_error();
  error->set_code(error_code);
  StatusToPB(status, error->mutable_status());
}

Status RaftConsensus::RequestVote(const VoteRequestPB* request,
                                  TabletVotingState tablet_voting_state,
                                  VoteResponsePB* response) {
  TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);
  response->set_responder_uuid(peer_uuid());

  // If we've heard recently from the leader, then we should ignore the request.
  // It might be from a "disruptive" server. This could happen in a few cases:
  //
  // 1) Network partitions
  // If the leader can talk to a majority of the nodes, but is partitioned from a
  // bad node, the bad node's failure detector will trigger. If the bad node is
  // able to reach other nodes in the cluster, it will continuously trigger elections.
  //
  // 2) An abandoned node
  // It's possible that a node has fallen behind the log GC mark of the leader. In that
  // case, the leader will stop sending it requests. Eventually, the the configuration
  // will change to eject the abandoned node, but until that point, we don't want the
  // abandoned follower to disturb the other nodes.
  //
  // 3) Other dynamic scenarios with a stale former leader
  // This is a generalization of the case 1. It's possible that a stale former
  // leader detects it's not a leader anymore at some point, but a majority
  // of replicas has elected a new leader already.
  //
  // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf
  // section 4.2.3.
  if (PREDICT_TRUE(!request->ignore_live_leader()) &&
      MonoTime::Now() < withhold_votes_until_) {
    return RequestVoteRespondLeaderIsAlive(request, response);
  }

  // We must acquire the update lock in order to ensure that this vote action
  // takes place between requests.
  // Lock ordering: update_lock_ must be acquired before lock_.
  std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock);
  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
    update_guard.try_lock();
  } else {
    // If failure detection is not enabled, then we can't just reject the vote,
    // because there will be no automatic retry later. So, block for the lock.
    update_guard.lock();
  }
  if (!update_guard.owns_lock()) {
    // There is another vote or update concurrent with the vote. In that case, that
    // other request is likely to reset the timer, and we'll end up just voting
    // "NO" after waiting. To avoid starving RPC handlers and causing cascading
    // timeouts, just vote a quick NO.
    return RequestVoteRespondIsBusy(request, response);
  }

  // Acquire the replica state lock so we can read / modify the consensus state.
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);

  // Ensure our lifecycle state is compatible with voting.
  // If RaftConsensus is running, we use the latest OpId from the WAL to vote.
  // Otherwise, we must be voting while tombstoned.
  OpId local_last_logged_opid;
  const State state = state_;
  switch (state) {
    case kShutdown:
      return Status::IllegalState("cannot vote while shut down");
    case kRunning:
      // Note: it is (theoretically) possible for 'tombstone_last_logged_opid'
      // to be passed in and by the time we reach here the state is kRunning.
      // That may occur when a vote request comes in at the end of a tablet
      // copy and then tablet bootstrap completes quickly. In that case, we
      // ignore the passed-in value and use the latest OpId from our queue.
      local_last_logged_opid = queue_->GetLastOpIdInLog();
      break;
    default:
      if (!tablet_voting_state.tombstone_last_logged_opid_) {
        return Status::IllegalState("must be running to vote when last-logged opid is not known");
      }
      if (!FLAGS_raft_enable_tombstoned_voting) {
        return Status::IllegalState("must be running to vote when tombstoned voting is disabled");
      }
      local_last_logged_opid = *(tablet_voting_state.tombstone_last_logged_opid_);
      if (tablet_voting_state.data_state_ == tablet::TABLET_DATA_COPYING) {
        LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while copying based on last-logged opid "
                                       << local_last_logged_opid;
      } else if (tablet_voting_state.data_state_ == tablet::TABLET_DATA_TOMBSTONED) {
        LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while tombstoned based on last-logged opid "
                                       << local_last_logged_opid;
      }
      break;
  }
  DCHECK(local_last_logged_opid.IsInitialized());

  // If the node is not in the configuration, allow the vote (this is required by Raft)
  // but log an informational message anyway.
  if (!cmeta_->IsMemberInConfig(request->candidate_uuid(), ACTIVE_CONFIG)) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Handling vote request from an unknown peer "
                                   << request->candidate_uuid();
  }

  // Recheck the heard-from-the-leader condition. There is a slight chance
  // that a heartbeat from the leader replica registers after the first check
  // in the very beginning of this method and before lock_ is acquired. This
  // extra check costs a little, but it helps in avoiding extra election rounds
  // and disruptive transfers of the replica leadership.
  if (PREDICT_TRUE(!request->ignore_live_leader()) &&
      MonoTime::Now() < withhold_votes_until_) {
    return RequestVoteRespondLeaderIsAlive(request, response);
  }

  // Candidate is running behind.
  if (request->candidate_term() < CurrentTermUnlocked()) {
    return RequestVoteRespondInvalidTerm(request, response);
  }

  // We already voted this term.
  if (request->candidate_term() == CurrentTermUnlocked() &&
      HasVotedCurrentTermUnlocked()) {

    // Already voted for the same candidate in the current term.
    if (GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
      return RequestVoteRespondVoteAlreadyGranted(request, response);
    }

    // Voted for someone else in current term.
    return RequestVoteRespondAlreadyVotedForOther(request, response);
  }

  // Candidate must have last-logged OpId at least as large as our own to get
  // our vote.
  bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
                                local_last_logged_opid);

  // Record the term advancement if necessary. We don't do so in the case of
  // pre-elections because it's possible that the node who called the pre-election
  // has actually now successfully become leader of the prior term, in which case
  // bumping our term here would disrupt it.
  if (!request->is_pre_election() &&
      request->candidate_term() > CurrentTermUnlocked()) {
    // If we are going to vote for this peer, then we will flush the consensus metadata
    // to disk below when we record the vote, and we can skip flushing the term advancement
    // to disk here.
    auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK;
    RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush),
        Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
                   CurrentTermUnlocked(), request->candidate_term()));
  }

  if (!vote_yes) {
    return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, response);
  }

  // Passed all our checks. Vote granted.
  return RequestVoteRespondVoteGranted(request, response);
}

Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
                                   StatusCallback client_cb,
                                   optional<TabletServerErrorPB::Code>* error_code) {
  TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);

  BulkChangeConfigRequestPB bulk_req;
  *bulk_req.mutable_tablet_id() = req.tablet_id();

  if (req.has_dest_uuid()) {
    *bulk_req.mutable_dest_uuid() = req.dest_uuid();
  }
  if (req.has_cas_config_opid_index()) {
    bulk_req.set_cas_config_opid_index(req.cas_config_opid_index());
  }
  auto* change = bulk_req.add_config_changes();
  if (req.has_type()) {
    change->set_type(req.type());
  }
  if (req.has_server()) {
    *change->mutable_peer() = req.server();
  }

  return BulkChangeConfig(bulk_req, std::move(client_cb), error_code);
}

Status RaftConsensus::BulkChangeConfig(const BulkChangeConfigRequestPB& req,
                                       StatusCallback client_cb,
                                       optional<TabletServerErrorPB::Code>* error_code) {
  TRACE_EVENT2("consensus", "RaftConsensus::BulkChangeConfig",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);
  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    RETURN_NOT_OK(CheckRunningUnlocked());
    RETURN_NOT_OK(CheckActiveLeaderUnlocked());
    RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());

    // We are required by Raft to reject config change operations until we have
    // committed at least one operation in our current term as leader.
    // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
    if (!queue_->IsCommittedIndexInCurrentTerm()) {
      return Status::IllegalState("Leader has not yet committed an operation in its own term");
    }

    const RaftConfigPB committed_config = cmeta_->CommittedConfig();

    // Support atomic ChangeConfig requests.
    if (req.has_cas_config_opid_index()) {
      if (committed_config.opid_index() != req.cas_config_opid_index()) {
        *error_code = TabletServerErrorPB::CAS_FAILED;
        return Status::IllegalState(Substitute("Request specified cas_config_opid_index "
                                               "of $0 but the committed config has opid_index "
                                               "of $1",
                                               req.cas_config_opid_index(),
                                               committed_config.opid_index()));
      }
    }

    // 'new_config' will be modified in-place and validated before being used
    // as the new Raft configuration.
    RaftConfigPB new_config = committed_config;

    // Enforce the "one by one" config change rules, even with the bulk API.
    // Keep track of total voters added, including non-voters promoted to
    // voters, and removed, including voters demoted to non-voters.
    int num_voters_modified = 0;

    // A record of the peers being modified so that we can enforce only one
    // change per peer per request.
    unordered_set<string> peers_modified;

    for (const auto& item : req.config_changes()) {
      if (PREDICT_FALSE(!item.has_type())) {
        *error_code = TabletServerErrorPB::INVALID_CONFIG;
        return Status::InvalidArgument("Must specify 'type' argument",
                                       SecureShortDebugString(req));
      }
      if (PREDICT_FALSE(!item.has_peer())) {
        *error_code = TabletServerErrorPB::INVALID_CONFIG;
        return Status::InvalidArgument("Must specify 'peer' argument",
                                       SecureShortDebugString(req));
      }

      ChangeConfigType type = item.type();
      const RaftPeerPB& peer = item.peer();

      if (PREDICT_FALSE(!peer.has_permanent_uuid())) {
        return Status::InvalidArgument("peer must have permanent_uuid specified",
                                       SecureShortDebugString(req));
      }

      if (!InsertIfNotPresent(&peers_modified, peer.permanent_uuid())) {
        return Status::InvalidArgument(
            Substitute("only one change allowed per peer: peer $0 appears more "
                       "than once in the config change request",
                       peer.permanent_uuid()),
            SecureShortDebugString(req));
      }

      const string& server_uuid = peer.permanent_uuid();
      switch (type) {
        case ADD_PEER:
          // Ensure the peer we are adding is not already a member of the configuration.
          if (IsRaftConfigMember(server_uuid, committed_config)) {
            return Status::InvalidArgument(
                Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
                           server_uuid, SecureShortDebugString(committed_config)));
          }
          if (!peer.has_member_type()) {
            return Status::InvalidArgument("peer must have member_type specified",
                                           SecureShortDebugString(req));
          }
          if (!peer.has_last_known_addr()) {
            return Status::InvalidArgument("peer must have last_known_addr specified",
                                           SecureShortDebugString(req));
          }
          if (peer.member_type() == RaftPeerPB::VOTER) {
            num_voters_modified++;
          }
          *new_config.add_peers() = peer;
          break;

        case REMOVE_PEER:
          if (server_uuid == peer_uuid()) {
            return Status::InvalidArgument(
                Substitute("Cannot remove peer $0 from the config because it is the leader. "
                           "Force another leader to be elected to remove this peer. "
                           "Consensus state: $1",
                           server_uuid,
                           SecureShortDebugString(cmeta_->ToConsensusStatePB())));
          }
          if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
            return Status::NotFound(
                Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
                           server_uuid, SecureShortDebugString(committed_config)));
          }
          if (IsRaftConfigVoter(server_uuid, committed_config)) {
            num_voters_modified++;
          }
          break;

        case MODIFY_PEER: {
          RaftPeerPB* modified_peer;
          RETURN_NOT_OK(GetRaftConfigMember(&new_config, server_uuid, &modified_peer));
          const RaftPeerPB orig_peer(*modified_peer);
          // Override 'member_type' and items within 'attrs' only if they are
          // explicitly passed in the request. At least one field must be
          // modified to be a valid request.
          if (peer.has_member_type() && peer.member_type() != modified_peer->member_type()) {
            if (modified_peer->member_type() == RaftPeerPB::VOTER ||
                peer.member_type() == RaftPeerPB::VOTER) {
              // This is a 'member_type' change involving a VOTER, i.e. a
              // promotion or demotion.
              num_voters_modified++;
            }
            // A leader must be forced to step down before demoting it.
            if (server_uuid == peer_uuid()) {
              return Status::InvalidArgument(
                  Substitute("Cannot modify member type of peer $0 because it is the leader. "
                              "Cause another leader to be elected to modify this peer. "
                              "Consensus state: $1",
                              server_uuid,
                              SecureShortDebugString(cmeta_->ToConsensusStatePB())));
            }
            modified_peer->set_member_type(peer.member_type());
          }
          if (peer.attrs().has_promote()) {
            modified_peer->mutable_attrs()->set_promote(peer.attrs().promote());
          }
          if (peer.attrs().has_replace()) {
            modified_peer->mutable_attrs()->set_replace(peer.attrs().replace());
          }
          // Ensure that MODIFY_PEER actually modified something.
          if (MessageDifferencer::Equals(orig_peer, *modified_peer)) {
            return Status::InvalidArgument("must modify a field when calling MODIFY_PEER");
          }
          break;
        }

        default:
          return Status::NotSupported(Substitute(
              "$0: unsupported type of configuration change",
              ChangeConfigType_Name(type)));
      }
    }

    // Don't allow no-op config changes to be committed.
    if (MessageDifferencer::Equals(committed_config, new_config)) {
      return Status::InvalidArgument("requested configuration change does not "
                                     "actually modify the config",
                                     SecureShortDebugString(req));
    }

    // Ensure this wasn't an illegal bulk change.
    if (num_voters_modified > 1) {
      return Status::InvalidArgument("it is not safe to modify the VOTER status "
                                     "of more than one peer at a time",
                                     SecureShortDebugString(req));
    }

    // We'll assign a new opid_index to this config change.
    new_config.clear_opid_index();

    RETURN_NOT_OK(ReplicateConfigChangeUnlocked(
        committed_config, std::move(new_config),
        [this, client_cb](const Status& s) {
          this->MarkDirtyOnSuccess("Config change replication complete",
                                   client_cb, s);
        }));
  } // Release lock before signaling request.
  peer_manager_->SignalRequest();
  return Status::OK();
}

Status RaftConsensus::UnsafeChangeConfig(
    const UnsafeChangeConfigRequestPB& req,
    optional<tserver::TabletServerErrorPB::Code>* error_code) {
  if (PREDICT_FALSE(!req.has_new_config())) {
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument("Request must contain 'new_config' argument "
                                   "to UnsafeChangeConfig()", SecureShortDebugString(req));
  }
  if (PREDICT_FALSE(!req.has_caller_id())) {
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument("Must specify 'caller_id' argument to UnsafeChangeConfig()",
                                   SecureShortDebugString(req));
  }

  // Grab the committed config and current term on this node.
  int64_t current_term;
  RaftConfigPB committed_config;
  int64_t all_replicated_index;
  int64_t last_committed_index;
  OpId preceding_opid;
  uint64_t msg_timestamp;
  {
    // Take the snapshot of the replica state and queue state so that
    // we can stick them in the consensus update request later.
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    current_term = CurrentTermUnlocked();
    committed_config = cmeta_->CommittedConfig();
    if (cmeta_->has_pending_config()) {
      LOG_WITH_PREFIX_UNLOCKED(WARNING)
            << "Replica has a pending config, but the new config "
            << "will be unsafely changed anyway. "
            << "Currently pending config on the node: "
            << SecureShortDebugString(cmeta_->PendingConfig());
    }
    all_replicated_index = queue_->GetAllReplicatedIndex();
    last_committed_index = queue_->GetCommittedIndex();
    preceding_opid = queue_->GetLastOpIdInLog();
    msg_timestamp = time_manager_->GetSerialTimestamp().value();
  }

  // Validate that passed replica uuids are part of the committed config
  // on this node.  This allows a manual recovery tool to only have to specify
  // the uuid of each replica in the new config without having to know the
  // addresses of each server (since we can get the address information from
  // the committed config). Additionally, only a subset of the committed config
  // is required for typical cluster repair scenarios.
  std::unordered_set<string> retained_peer_uuids;
  const RaftConfigPB& config = req.new_config();
  for (const RaftPeerPB& new_peer : config.peers()) {
    const string& peer_uuid = new_peer.permanent_uuid();
    retained_peer_uuids.insert(peer_uuid);
    if (!IsRaftConfigMember(peer_uuid, committed_config)) {
      *error_code = TabletServerErrorPB::INVALID_CONFIG;
      return Status::InvalidArgument(Substitute("Peer with uuid $0 is not in the committed  "
                                                "config on this replica, rejecting the  "
                                                "unsafe config change request for tablet $1. "
                                                "Committed config: $2",
                                                peer_uuid, req.tablet_id(),
                                                SecureShortDebugString(committed_config)));
    }
  }

  RaftConfigPB new_config = committed_config;
  for (const auto& peer : committed_config.peers()) {
    const string& peer_uuid = peer.permanent_uuid();
    if (!ContainsKey(retained_peer_uuids, peer_uuid)) {
      CHECK(RemoveFromRaftConfig(&new_config, peer_uuid));
    }
  }
  // Check that local peer is part of the new config and is a VOTER.
  // Although it is valid for a local replica to not have itself
  // in the committed config, it is rare and a replica without itself
  // in the latest config is definitely not caught up with the latest leader's log.
  if (!IsRaftConfigVoter(peer_uuid(), new_config)) {
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
                                              "a VOTER in the new config, "
                                              "rejecting the unsafe config "
                                              "change request for tablet $1. "
                                              "Rejected config: $2" ,
                                              peer_uuid(), req.tablet_id(),
                                              SecureShortDebugString(new_config)));
  }
  new_config.set_unsafe_config_change(true);
  int64_t replicate_opid_index = preceding_opid.index() + 1;
  new_config.set_opid_index(replicate_opid_index);

  // Sanity check the new config. 'type' is irrelevant here.
  Status s = VerifyRaftConfig(new_config);
  if (!s.ok()) {
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
    return Status::InvalidArgument(Substitute("The resulting new config for tablet $0  "
                                              "from passed parameters has failed raft "
                                              "config sanity check: $1",
                                              req.tablet_id(), s.ToString()));
  }

  // Prepare the consensus request as if the request is being generated
  // from a different leader.
  ConsensusRequestPB consensus_req;
  consensus_req.set_caller_uuid(req.caller_id());
  // Bumping up the term for the consensus request being generated.
  // This makes this request appear to come from a new leader that
  // the local replica doesn't know about yet. If the local replica
  // happens to be the leader, this will cause it to step down.
  const int64_t new_term = current_term + 1;
  consensus_req.set_caller_term(new_term);
  consensus_req.mutable_preceding_id()->CopyFrom(preceding_opid);
  consensus_req.set_committed_index(last_committed_index);
  consensus_req.set_all_replicated_index(all_replicated_index);

  // Prepare the replicate msg to be replicated.
  ReplicateMsg* replicate = consensus_req.add_ops();
  ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record();
  cc_req->set_tablet_id(req.tablet_id());
  *cc_req->mutable_old_config() = committed_config;
  *cc_req->mutable_new_config() = new_config;
  OpId* id = replicate->mutable_id();
  // Bumping up both the term and the opid_index from what's found in the log.
  id->set_term(new_term);
  id->set_index(replicate_opid_index);
  replicate->set_op_type(CHANGE_CONFIG_OP);
  replicate->set_timestamp(msg_timestamp);

  VLOG_WITH_PREFIX(3) << "UnsafeChangeConfig: Generated consensus request: "
                      << SecureShortDebugString(consensus_req);

  LOG_WITH_PREFIX(WARNING)
        << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, "
        << "COMMITTED CONFIG: " << SecureShortDebugString(committed_config)
        << "NEW CONFIG: " << SecureShortDebugString(new_config);

  ConsensusResponsePB consensus_resp;
  return Update(&consensus_req, &consensus_resp).AndThen([&consensus_resp]{
    return consensus_resp.has_error()
        ? StatusFromPB(consensus_resp.error().status()) : Status::OK();
  });
}

void RaftConsensus::Stop() {
  TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
               "peer", peer_uuid(),
               "tablet", options_.tablet_id);
  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    const State state = state_;
    if (state == kStopping || state == kStopped || state == kShutdown) {
      return;
    }
    // Transition to kStopping state.
    SetStateUnlocked(kStopping);
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
  }

  // Close the peer manager.
  if (peer_manager_) peer_manager_->Close();

  // We must close the queue after we close the peers.
  if (queue_) {
    // If we were leader, decrement the number of leaders there are now.
    if (queue_->IsInLeaderMode() && server_ctx_.num_leaders) {
      server_ctx_.num_leaders->IncrementBy(-1);
    }
    queue_->Close();
  }

  {
    ThreadRestrictions::AssertWaitAllowed();
    LockGuard l(lock_);
    if (pending_) {
      CHECK_OK(pending_->CancelPendingOps());
    }
    SetStateUnlocked(kStopped);

    // Clear leader status on Stop(), in case this replica was the leader. If
    // we don't do this, the log messages still show this node as the leader.
    // No need to sync it since it's not persistent state.
    if (cmeta_) {
      ClearLeaderUnlocked();
    }
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
  }

  // If we were the leader, stop withholding votes.
  auto max_time = MonoTime::Max();
  withhold_votes_until_.compare_exchange_strong(max_time, MonoTime::Min());


  // Shut down things that might acquire locks during destruction.
  if (raft_pool_token_) raft_pool_token_->Shutdown();
  if (failure_detector_) DisableFailureDetector();
}

void RaftConsensus::Shutdown() {
  // Avoid taking locks if already shut down so we don't violate
  // ThreadRestrictions assertions in the case where the RaftConsensus
  // destructor runs on the reactor thread due to an election callback being
  // the last outstanding reference.
  if (shutdown_) {
    return;
  }

  Stop();
  {
    LockGuard l(lock_);
    SetStateUnlocked(kShutdown);
  }
  shutdown_ = true;
}

Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg) {
  DCHECK(lock_.is_locked());
  OperationType op_type = msg->get()->op_type();
  CHECK(IsConsensusOnlyOperation(op_type))
      << "Expected a consensus-only op type, got " << OperationType_Name(op_type)
      << ": " << SecureShortDebugString(*msg->get());
  VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting consensus round: "
                               << SecureShortDebugString(msg->get()->id());
  auto client_cb = [this](const Status& s) {
    this->MarkDirtyOnSuccess("Replicated consensus-only round",
                             &DoNothingStatusCB, s);
  };
  scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
  auto* round_raw = round.get();
  round->SetConsensusReplicatedCallback(
      [this, round_raw, client_cb](const Status& s) {
        this->NonTxRoundReplicationFinished(round_raw, client_cb, s);
      });
  return AddPendingOperationUnlocked(round);
}

Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  CHECK_OK(CheckRunningUnlocked());
  return HandleTermAdvanceUnlocked(new_term);
}

string RaftConsensus::GetRequestVoteLogPrefixThreadSafe(const VoteRequestPB& request) const {
  return Substitute("$0Leader $1election vote request",
                    LogPrefixThreadSafe(),
                    request.is_pre_election() ? "pre-" : "");
}

string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
  DCHECK(lock_.is_locked());
  return Substitute("$0Leader $1election vote request",
                    LogPrefixUnlocked(),
                    request.is_pre_election() ? "pre-" : "");
}

void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
  response->set_responder_term(CurrentTermUnlocked());
  response->set_vote_granted(true);
}

void RaftConsensus::FillVoteResponseVoteDenied(
    ConsensusErrorPB::Code error_code,
    VoteResponsePB* response,
    ResponderTermPolicy responder_term_policy) {
  response->set_vote_granted(false);
  response->mutable_consensus_error()->set_code(error_code);
  if (responder_term_policy == ResponderTermPolicy::SET) {
    response->set_responder_term(CurrentTermUnlocked());
  }
}

Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request,
                                                    VoteResponsePB* response) {
  FillVoteResponseVoteDenied(ConsensusErrorPB::INVALID_TERM, response);
  string msg = Substitute("$0: Denying vote to candidate $1 for earlier term $2. "
                          "Current term is $3.",
                          GetRequestVoteLogPrefixUnlocked(*request),
                          request->candidate_uuid(),
                          request->candidate_term(),
                          CurrentTermUnlocked());
  LOG(INFO) << msg;
  StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
  return Status::OK();
}

Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request,
                                                           VoteResponsePB* response) {
  FillVoteResponseVoteGranted(response);
  LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. "
                          "Re-sending same reply.",
                          GetRequestVoteLogPrefixUnlocked(*request),
                          request->candidate_uuid(),
                          request->candidate_term());
  return Status::OK();
}

Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request,
                                                             VoteResponsePB* response) {
  FillVoteResponseVoteDenied(ConsensusErrorPB::ALREADY_VOTED, response);
  string msg = Substitute("$0: Denying vote to candidate $1 in current term $2: "
                          "Already voted for candidate $3 in this term.",
                          GetRequestVoteLogPrefixUnlocked(*request),
                          request->candidate_uuid(),
                          CurrentTermUnlocked(),
                          GetVotedForCurrentTermUnlocked());
  LOG(INFO) << msg;
  StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
  return Status::OK();
}

Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpId& local_last_logged_opid,
                                                       const VoteRequestPB* request,
                                                       VoteResponsePB* response) {
  FillVoteResponseVoteDenied(ConsensusErrorPB::LAST_OPID_TOO_OLD, response);
  string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
                          "replica has last-logged OpId of $3, which is greater than that of the "
                          "candidate, which has last-logged OpId of $4.",
                          GetRequestVoteLogPrefixUnlocked(*request),
                          request->candidate_uuid(),
                          request->candidate_term(),
                          SecureShortDebugString(local_last_logged_opid),
                          SecureShortDebugString(request->candidate_status().last_received()));
  LOG(INFO) << msg;
  StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
  return Status::OK();
}

Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request,
                                                      VoteResponsePB* response) {
  FillVoteResponseVoteDenied(ConsensusErrorPB::LEADER_IS_ALIVE, response,
                             ResponderTermPolicy::DO_NOT_SET);
  auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
                        "replica is either leader or believes a valid leader to "
                        "be alive.",
                        GetRequestVoteLogPrefixThreadSafe(*request),
                        request->candidate_uuid(),
                        request->candidate_term());
  VLOG(1) << msg;
  StatusToPB(Status::InvalidArgument(msg),
             response->mutable_consensus_error()->mutable_status());
  return Status::OK();
}

Status RaftConsensus::RequestVoteRespondIsBusy(
    const VoteRequestPB* request, VoteResponsePB* response) {
  // Don't set the term in the response: the requestor doesn't need it
  // to process the NO vote response in this case.
  FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response,
                             ResponderTermPolicy::DO_NOT_SET);
  auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
                        "replica is already servicing an update from "
                        "a current leader or another vote",
                        GetRequestVoteLogPrefixThreadSafe(*request),
                        request->candidate_uuid(),
                        request->candidate_term());
  VLOG(1) << msg;
  StatusToPB(Status::ServiceUnavailable(msg),
             response->mutable_consensus_error()->mutable_status());
  return Status::OK();
}

Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request,
                                                    VoteResponsePB* response) {
  // We know our vote will be "yes", so avoid triggering an election while we
  // persist our vote to disk. We use an exponential backoff to avoid too much
  // split-vote contention when nodes display high latencies.
  MonoDelta backoff = LeaderElectionExpBackoffDeltaUnlocked();
  SnoozeFailureDetector(string("vote granted"), backoff);

  if (!request->is_pre_election()) {
    // Persist our vote to disk.
    RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
  }

  FillVoteResponseVoteGranted(response);

  // Give peer time to become leader. Snooze one more time after persisting our
  // vote. When disk latency is high, this should help reduce churn.
  SnoozeFailureDetector(/*reason_for_log=*/nullopt, backoff);

  LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
                          GetRequestVoteLogPrefixUnlocked(*request),
                          request->candidate_uuid(),
                          CurrentTermUnlocked());
  return Status::OK();
}

RaftPeerPB::Role RaftConsensus::role() const {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  return cmeta_->active_role();
}

RaftConsensus::RoleAndMemberType RaftConsensus::GetRoleAndMemberType() const {
  ThreadRestrictions::AssertWaitAllowed();

  auto member_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
  const auto& local_peer_uuid = peer_uuid();

  LockGuard l(lock_);
  for (const auto& peer : cmeta_->ActiveConfig().peers()) {
    if (peer.permanent_uuid() == local_peer_uuid) {
      member_type = peer.member_type();
      break;
    }
  }

  return std::make_pair(cmeta_->active_role(), member_type);
}

int64_t RaftConsensus::CurrentTerm() const {
  LockGuard l(lock_);
  return CurrentTermUnlocked();
}

void RaftConsensus::SetStateUnlocked(State new_state) {
  DCHECK(lock_.is_locked());
  switch (new_state) {
    case kInitialized:
      CHECK_EQ(kNew, state_);
      break;
    case kRunning:
      CHECK_EQ(kInitialized, state_);
      break;
    case kStopping:
      CHECK(state_ != kStopped && state_ != kShutdown) << "State = " << State_Name(state_);
      break;
    case kStopped:
      CHECK_EQ(kStopping, state_);
      break;
    case kShutdown:
      CHECK(state_ == kStopped || state_ == kShutdown) << "State = " << State_Name(state_);
      break;
    default:
      LOG(FATAL) << "Disallowed transition to state = " << State_Name(new_state);
      break;
  }
  state_ = new_state;
}

const char* RaftConsensus::State_Name(State state) {
  switch (state) {
    case kNew:
      return "New";
    case kInitialized:
      return "Initialized";
    case kRunning:
      return "Running";
    case kStopping:
      return "Stopping";
    case kStopped:
      return "Stopped";
    case kShutdown:
      return "Shut down";
    default:
      LOG(DFATAL) << "Unknown State value: " << state;
      return "Unknown";
  }
}

MonoDelta RaftConsensus::MinimumElectionTimeout() {
  int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
      FLAGS_raft_heartbeat_interval_ms;
  return MonoDelta::FromMilliseconds(failure_timeout);
}

void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
  DCHECK(lock_.is_locked());
  failed_elections_since_stable_leader_ = 0;
  num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);
  cmeta_->set_leader_uuid(uuid);
  MarkDirty(Substitute("New leader $0", uuid));
}

Status RaftConsensus::ReplicateConfigChangeUnlocked(
    RaftConfigPB old_config,
    RaftConfigPB new_config,
    StatusCallback client_cb) {
  DCHECK(lock_.is_locked());
  auto cc_replicate = new ReplicateMsg();
  cc_replicate->set_op_type(CHANGE_CONFIG_OP);
  ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
  cc_req->set_tablet_id(options_.tablet_id);
  *cc_req->mutable_old_config() = std::move(old_config);
  *cc_req->mutable_new_config() = std::move(new_config);
  CHECK_OK(time_manager_->AssignTimestamp(cc_replicate));

  scoped_refptr<ConsensusRound> round(
      new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(cc_replicate))));
  auto* round_raw = round.get();
  round->SetConsensusReplicatedCallback(
      [this, round_raw, client_cb](const Status& s) {
        this->NonTxRoundReplicationFinished(round_raw, client_cb, s);
      });
  return AppendNewRoundToQueueUnlocked(round);
}

void RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
  DCHECK(lock_.is_locked());
  DCHECK_EQ(RaftPeerPB::LEADER, cmeta_->active_role());
  const RaftConfigPB& active_config = cmeta_->ActiveConfig();

  // Change the peers so that we're able to replicate messages remotely and
  // locally. The peer manager must be closed before updating the active config
  // in the queue -- when the queue is in LEADER mode, it checks that all
  // registered peers are a part of the active config.
  peer_manager_->Close();
  // TODO(todd): should use queue committed index here? in that case do
  // we need to pass it in at all?
  queue_->SetLeaderMode(pending_->GetCommittedIndex(),
                        CurrentTermUnlocked(),
                        active_config);
  peer_manager_->UpdateRaftConfig(active_config);
}

const string& RaftConsensus::peer_uuid() const {
  return local_peer_pb_.permanent_uuid();
}

const string& RaftConsensus::tablet_id() const {
  return options_.tablet_id;
}

Status RaftConsensus::ConsensusState(ConsensusStatePB* cstate,
                                     IncludeHealthReport report_health) const {
  ThreadRestrictions::AssertWaitAllowed();
  UniqueLock l(lock_);
  if (state_ == kShutdown) {
    return Status::IllegalState("Tablet replica is shutdown");
  }
  ConsensusStatePB cstate_tmp = cmeta_->ToConsensusStatePB();

  // If we need to include the health report, merge it into the committed
  // config iff we believe we are the current leader of the config.
  if (report_health == INCLUDE_HEALTH_REPORT &&
      cmeta_->active_role() == RaftPeerPB::LEADER) {
    auto reports = queue_->ReportHealthOfPeers();

    // We don't need to access the queue anymore, so drop the consensus lock.
    l.unlock();

    // Iterate through each peer in the committed config and attach the health
    // report to it.
    RaftConfigPB* committed_raft_config = cstate_tmp.mutable_committed_config();
    for (int i = 0; i < committed_raft_config->peers_size(); i++) {
      RaftPeerPB* peer = committed_raft_config->mutable_peers(i);
      const HealthReportPB* report = FindOrNull(reports, peer->permanent_uuid());
      if (!report) continue; // Only attach details if we know about the peer.
      *peer->mutable_health_report() = *report;
    }
  }
  *cstate = std::move(cstate_tmp);
  return Status::OK();
}

RaftConfigPB RaftConsensus::CommittedConfig() const {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  return cmeta_->CommittedConfig();
}

void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
  if (state_ != kRunning) {
    out << "Tablet " << EscapeForHtmlToString(tablet_id())
        << " not running" << std::endl;
    return;
  }
  const RaftPeerPB::Role role = cmeta_->GetRoleAndTerm().first;

  out << "<h1>Raft Consensus State</h1>" << std::endl;

  out << "<h2>State</h2>" << std::endl;
  out << "<pre>" << EscapeForHtmlToString(ToString()) << "</pre>" << std::endl;
  out << "<h2>Queue</h2>" << std::endl;
  out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;

  // Dump the queues on a leader.
  if (role == RaftPeerPB::LEADER) {
    out << "<h2>Queue overview</h2>" << std::endl;
    out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
    out << "<hr/>" << std::endl;
    out << "<h2>Queue details</h2>" << std::endl;
    queue_->DumpToHtml(out);
  }
}

void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult& result) {
  // We're running on a reactor thread; service the callback on another thread.
  //
  // There's no need to reenable the failure detector; if this fails, it's a
  // sign that RaftConsensus has stopped and we no longer need failure detection.
  auto self = shared_from_this();
  Status s = raft_pool_token_->Submit([=]() { self->DoElectionCallback(reason, result); });
  if (!s.ok()) {
    static const char* const msg = "unable to run election callback";
    CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg;
    WARN_NOT_OK(s, LogPrefixThreadSafe() + msg);
  }
}

void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResult& result) {
  const int64_t election_term = result.vote_request.candidate_term();
  const bool was_pre_election = result.vote_request.is_pre_election();
  const char* election_type = was_pre_election ? "pre-election" : "election";

  ThreadRestrictions::AssertWaitAllowed();
  UniqueLock l(lock_);

  Status s = CheckRunningUnlocked();
  if (PREDICT_FALSE(!s.ok())) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << " callback for term "
                                   << election_term << " while not running: "
                                   << s.ToString();
    return;
  }

  // If this election was not triggered by the failure detector, the fd may
  // still be enabled and needs to be snoozed, both if we win and lose:
  // - When we win because we're about to disable it and become leader.
  // - When we lose or otherwise we can fall into a cycle, where everyone keeps
  //   triggering elections but no election ever completes because by the time they
  //   finish another one is triggered already.
  //
  // This is a no-op if the failure detector is disabled..
  MonoDelta snooze_delta = LeaderElectionExpBackoffDeltaUnlocked();
  SnoozeFailureDetector(string("election complete"), snooze_delta);
  auto enable_fd = MakeScopedCleanup([&]() {
    // The failure detector was disabled if it triggered this election. Reenable
    // it when exiting this function.
    if (reason == ELECTION_TIMEOUT_EXPIRED) {
      EnableFailureDetector(snooze_delta);
    }
  });

  if (result.decision == VOTE_DENIED) {
    failed_elections_since_stable_leader_++;
    num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_);

    // If we called an election and one of the voters had a higher term than we did,
    // we should bump our term before we potentially try again. This is particularly
    // important with pre-elections to avoid getting "stuck" in a case like:
    //    Peer A: has ops through 1.10, term = 2, voted in term 2 for peer C
    //    Peer B: has ops through 1.15, term = 1
    // In this case, Peer B will reject peer A's pre-elections for term 3 because
    // the local log is longer. Peer A will reject B's pre-elections for term 2
    // because it already voted in term 2. The check below ensures that peer B
    // will bump to term 2 when it gets the vote rejection, such that its
    // next pre-election (for term 3) would succeed.
    if (result.highest_voter_term > CurrentTermUnlocked()) {
      HandleTermAdvanceUnlocked(result.highest_voter_term);
    }

    LOG_WITH_PREFIX_UNLOCKED(INFO)
        << "Leader " << election_type << " lost for term " << election_term
        << ". Reason: "
        << (!result.message.empty() ? result.message : "None given");
    return;
  }

  // In a pre-election, we collected votes for the _next_ term.
  // So, we need to adjust our expectations of what the current term should be.
  int64_t election_started_in_term = election_term;
  if (was_pre_election) {
    election_started_in_term--;
  }

  if (election_started_in_term != CurrentTermUnlocked()) {
    LOG_WITH_PREFIX_UNLOCKED(INFO)
        << "Leader " << election_type << " decision vote started in "
        << "defunct term " << election_started_in_term << ": won";
    return;
  }

  if (!cmeta_->IsVoterInConfig(peer_uuid(), ACTIVE_CONFIG)) {
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type
                                      << " decision while not in active config. "
                                      << "Result: Term " << election_term
                                      << ": won. RaftConfig: "
                                      << SecureShortDebugString(cmeta_->ActiveConfig());
    return;
  }

  // At this point we're either already leader, we're going to become leader,
  // or we're going to run a real election.
  //
  // In all cases, do not reenable the failure detector.
  enable_fd.cancel();

  if (cmeta_->active_role() == RaftPeerPB::LEADER) {
    // If this was a pre-election, it's possible to see the following interleaving:
    //
    //  1. Term N (follower): send a real election for term N.
    //  2. An externally-triggered election is started.
    //  3. Term N (follower): send a pre-election for term N+1.
    //  4. Election callback for real election from term N completes.
    //     Peer is now leader for term N.
    //  5. Pre-election callback from term N+1 completes, even though
    //     we are currently a leader of term N.
    // In this case, we should just ignore the pre-election, since we're
    // happily the leader of the prior term.
    if (was_pre_election) return;
    LOG_WITH_PREFIX_UNLOCKED(DFATAL)
        << "Leader " << election_type << " callback while already leader! "
        << "Result: Term " << election_term << ": won";
    return;
  }

  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Leader " << election_type << " won for term " << election_term;

  if (was_pre_election) {
    // We just won the pre-election. So, we need to call a real election.
    l.unlock();
    WARN_NOT_OK_EVERY_N_SECS(StartElection(NORMAL_ELECTION, reason),
                             "Couldn't start leader election after successful pre-election", 10);
  } else {
    // We won a real election. Convert role to LEADER.
    SetLeaderUuidUnlocked(peer_uuid());

    // TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown.
    // It races with the above state check.
    // This could be a problem during tablet deletion.
    CHECK_OK(BecomeLeaderUnlocked());
  }
}

optional<OpId> RaftConsensus::GetLastOpId(OpIdType type) {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  return GetLastOpIdUnlocked(type);
}

optional<OpId> RaftConsensus::GetLastOpIdUnlocked(OpIdType type) {
  // Return early if this method is called on an instance of RaftConsensus that
  // has not yet been started, failed during Init(), or failed during Start().
  if (!queue_ || !pending_) {
    return nullopt;
  }

  switch (type) {
    case RECEIVED_OPID:
      return queue_->GetLastOpIdInLog();
    case COMMITTED_OPID:
      return MakeOpId(pending_->GetTermWithLastCommittedOp(),
                      pending_->GetCommittedIndex());
    default:
      LOG(DFATAL) << LogPrefixUnlocked() << "Invalid OpIdType " << type;
      return nullopt;
  }
}

log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
  // Grab the watermarks from the queue. It's OK to fetch these two watermarks
  // separately -- the worst case is we see a relatively "out of date" watermark
  // which just means we'll retain slightly more than necessary in this invocation
  // of log GC.
  return log::RetentionIndexes(queue_->GetCommittedIndex(), // for durability
                               queue_->GetAllReplicatedIndex()); // for peers
}

void RaftConsensus::MarkDirty(const string& reason) {
  WARN_NOT_OK(raft_pool_token_->Submit([=]() { this->mark_dirty_clbk_(reason); }),
              LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
}

void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
                                       const StatusCallback& client_cb,
                                       const Status& status) {
  if (PREDICT_TRUE(status.ok())) {
    MarkDirty(reason);
  }
  client_cb(status);
}

void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
                                                  const StatusCallback& client_cb,
                                                  const Status& status) {
  // NOTE: lock_ is held here because this is triggered by
  // PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex().
  DCHECK(lock_.is_locked());
  OperationType op_type = round->replicate_msg()->op_type();
  const string& op_type_str = OperationType_Name(op_type);
  CHECK(IsConsensusOnlyOperation(op_type)) << "Unexpected op type: " << op_type_str;

  if (op_type == CHANGE_CONFIG_OP) {
    CompleteConfigChangeRoundUnlocked(round, status);
    // Fall through to the generic handling.
  }

  // TODO(mpercy): May need some refactoring to unlock 'lock_' before invoking
  // the client callback.

  if (!status.ok()) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: "
                                   << status.ToString();
    client_cb(status);
    return;
  }
  VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
                               << round->id();
  round_handler_->FinishConsensusOnlyRound(round);
  CommitMsg commit_msg;
  commit_msg.set_op_type(round->replicate_msg()->op_type());
  *commit_msg.mutable_commited_op_id() = round->id();

  CHECK_OK(log_->AsyncAppendCommit(
      commit_msg, [](const Status& s) {
        CrashIfNotOkStatusCB("Enqueued commit operation failed to write to WAL", s);
      }));

  client_cb(status);
}

void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) {
  DCHECK(lock_.is_locked());
  const OpId& op_id = round->replicate_msg()->id();

  if (!status.ok()) {
    // If the config change being aborted is the current pending one, abort it.
    if (cmeta_->has_pending_config() &&
        cmeta_->GetConfigOpIdIndex(PENDING_CONFIG) == op_id.index()) {
      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId "
                                     << op_id << ": " << status.ToString();
      cmeta_->clear_pending_config();

      // Disable leader failure detection if transitioning from VOTER to
      // NON_VOTER and vice versa.
      UpdateFailureDetectorState();
    } else {
      LOG_WITH_PREFIX_UNLOCKED(INFO)
          << "Skipping abort of non-pending config change with OpId "
          << op_id << ": " << status.ToString();
    }

    // It's possible to abort a config change which isn't the pending one in the following
    // sequence:
    // - replicate a config change
    // - it gets committed, so we write the new config to disk as the Committed configuration
    // - we crash before the COMMIT message hits the WAL
    // - we restart the server, and the config change is added as a pending round again,
    //   but isn't set as Pending because it's already committed.
    // - we delete the tablet before committing it
    // See KUDU-1735.
    return;
  }

  // Commit the successful config change.

  DCHECK(round->replicate_msg()->change_config_record().has_old_config());
  DCHECK(round->replicate_msg()->change_config_record().has_new_config());
  const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
  const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
  DCHECK(old_config.has_opid_index());
  DCHECK(new_config.has_opid_index());
  // Check if the pending Raft config has an OpId less than the committed
  // config. If so, this is a replay at startup in which the COMMIT
  // messages were delayed.
  int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
  if (new_config.opid_index() > committed_config_opid_index) {
    LOG_WITH_PREFIX_UNLOCKED(INFO)
        << "Committing config change with OpId "
        << op_id << ": "
        << DiffRaftConfigs(old_config, new_config)
        << ". New config: { " << SecureShortDebugString(new_config) << " }";
    CHECK_OK(SetCommittedConfigUnlocked(new_config));
  } else {
    LOG_WITH_PREFIX_UNLOCKED(INFO)
        << "Ignoring commit of config change with OpId "
        << op_id << " because the committed config has OpId index "
        << committed_config_opid_index << ". The config change we are ignoring is: "
        << "Old config: { " << SecureShortDebugString(old_config) << " }. "
        << "New config: { " << SecureShortDebugString(new_config) << " }";
  }
}

void RaftConsensus::EnableFailureDetector(optional<MonoDelta> delta) {
  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
    failure_detector_->Start(std::move(delta));
  }
}

void RaftConsensus::DisableFailureDetector() {
  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
    failure_detector_->Stop();
  }
}

void RaftConsensus::UpdateFailureDetectorState(optional<MonoDelta> delta) {
  DCHECK(lock_.is_locked());
  const auto& uuid = peer_uuid();
  if (uuid != cmeta_->leader_uuid() &&
      cmeta_->IsVoterInConfig(uuid, ACTIVE_CONFIG)) {
    // A voter that is not the leader should run the failure detector.
    EnableFailureDetector(std::move(delta));
  } else {
    // Otherwise, the local peer should not start leader elections
    // (e.g. if it is the leader, a non-voter, a non-participant, etc).
    DisableFailureDetector();
  }
}

void RaftConsensus::SnoozeFailureDetector(optional<string> reason_for_log,
                                          optional<MonoDelta> delta) {
  if (PREDICT_TRUE(failure_detector_ && FLAGS_enable_leader_failure_detection)) {
    if (reason_for_log) {
      VLOG(1) << LogPrefixThreadSafe()
              << Substitute("Snoozing failure detection for $0 ($1)",
                            delta ? delta->ToString() : "election timeout",
                            *reason_for_log);
    }

    if (!delta) {
      delta = MinimumElectionTimeout();
    }
    failure_detector_->Snooze(std::move(delta));
  }
}

void RaftConsensus::WithholdVotes() {
  MonoTime prev = withhold_votes_until_;
  MonoTime next = MonoTime::Now() + MinimumElectionTimeout();
  do {
    if (prev == MonoTime::Max()) {
      // Maximum withholding time already. It might be the case if replica
      // has become a leader already.
      break;
    }
    next = MonoTime::Now() + MinimumElectionTimeout();
  } while (!withhold_votes_until_.compare_exchange_weak(prev, next));
}

MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
  DCHECK(lock_.is_locked());
  // Compute a backoff factor based on how many leader elections have
  // failed since a stable leader was last seen.
  double backoff_factor = pow(1.5, failed_elections_since_stable_leader_ + 1);
  double min_timeout = MinimumElectionTimeout().ToMilliseconds();
  double max_timeout = std::min<double>(
      min_timeout * backoff_factor,
      FLAGS_leader_failure_exp_backoff_max_delta_ms);

  // Randomize the timeout between the minimum and the calculated value.
  // We do this after the above capping to the max. Otherwise, after a
  // churny period, we'd end up highly likely to backoff exactly the max
  // amount.
  double timeout = min_timeout + (max_timeout - min_timeout) * rng_.NextDoubleFraction();
  DCHECK_GE(timeout, min_timeout);

  return MonoDelta::FromMilliseconds(timeout);
}

Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
                                                FlushToDisk flush) {
  DCHECK(lock_.is_locked());
  if (new_term <= CurrentTermUnlocked()) {
    return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
                                           new_term, CurrentTermUnlocked()));
  }
  if (cmeta_->active_role() == RaftPeerPB::LEADER) {
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
                                   << CurrentTermUnlocked();
    RETURN_NOT_OK(BecomeReplicaUnlocked());
  }

  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
  RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush));
  if (term_metric_) {
    term_metric_->set_value(new_term);
  }
  last_received_cur_leader_ = MinimumOpId();
  return Status::OK();
}

Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const {
  DCHECK(lock_.is_locked());
  DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
  RETURN_NOT_OK(CheckRunningUnlocked());
  return CheckActiveLeaderUnlocked();
}

Status RaftConsensus::CheckRunningUnlocked() const {
  DCHECK(lock_.is_locked());
  if (PREDICT_FALSE(state_ != kRunning)) {
    return Status::IllegalState("RaftConsensus is not running",
                                Substitute("State = $0", State_Name(state_)));
  }
  return Status::OK();
}

Status RaftConsensus::CheckActiveLeaderUnlocked() const {
  DCHECK(lock_.is_locked());
  RaftPeerPB::Role role = cmeta_->active_role();
  switch (role) {
    case RaftPeerPB::LEADER:
      // Check for the consistency of the information in the consensus metadata
      // and the state of the consensus queue.
      DCHECK(queue_->IsInLeaderMode());
      if (leader_transfer_in_progress_) {
        return Status::ServiceUnavailable("leader transfer in progress");
      }
      return Status::OK();

    default:
      // Check for the consistency of the information in the consensus metadata
      // and the state of the consensus queue.
      DCHECK(!queue_->IsInLeaderMode());
      return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. "
                                             "Consensus state: $2",
                                             peer_uuid(),
                                             RaftPeerPB::Role_Name(role),
                                             SecureShortDebugString(cmeta_->ToConsensusStatePB())));
  }
}

Status RaftConsensus::CheckNoConfigChangePendingUnlocked() const {
  DCHECK(lock_.is_locked());
  if (cmeta_->has_pending_config()) {
    return Status::IllegalState(
        Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
                   "  Committed config: $0.\n  Pending config: $1",
                   SecureShortDebugString(cmeta_->CommittedConfig()),
                   SecureShortDebugString(cmeta_->PendingConfig())));
  }
  return Status::OK();
}

Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
  DCHECK(lock_.is_locked());
  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config),
                        "Invalid config to set as pending");
  if (!new_config.unsafe_config_change()) {
    CHECK(!cmeta_->has_pending_config())
        << "Attempt to set pending config while another is already pending! "
        << "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; "
        << "Attempted new pending config: " << SecureShortDebugString(new_config);
  } else if (cmeta_->has_pending_config()) {
    LOG_WITH_PREFIX_UNLOCKED(INFO)
        << "Allowing unsafe config change even though there is a pending config! "
        << "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; "
        << "New pending config: " << SecureShortDebugString(new_config);
  }
  cmeta_->set_pending_config(new_config);

  UpdateFailureDetectorState();

  return Status::OK();
}

Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
  TRACE_EVENT0("consensus", "RaftConsensus::SetCommittedConfigUnlocked");
  DCHECK(lock_.is_locked());
  DCHECK(config_to_commit.IsInitialized());
  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config_to_commit),
                        "Invalid config to set as committed");

  // Compare committed with pending configuration, ensure that they are the same.
  // In the event of an unsafe config change triggered by an administrator,
  // it is possible that the config being committed may not match the pending config
  // because unsafe config change allows multiple pending configs to exist.
  // Therefore we only need to validate that 'config_to_commit' matches the pending config
  // if the pending config does not have its 'unsafe_config_change' flag set.
  if (cmeta_->has_pending_config()) {
    RaftConfigPB pending_config = cmeta_->PendingConfig();
    if (!pending_config.unsafe_config_change()) {
      // Quorums must be exactly equal, even w.r.t. peer ordering.
      CHECK_EQ(pending_config.SerializeAsString(),
               config_to_commit.SerializeAsString())
          << Substitute("New committed config must equal pending config, but does not. "
                        "Pending config: $0, committed config: $1",
                        SecureShortDebugString(pending_config),
                        SecureShortDebugString(config_to_commit));
    }
  }
  cmeta_->set_committed_config(config_to_commit);
  cmeta_->clear_pending_config();
  CHECK_OK(cmeta_->Flush());
  return Status::OK();
}

Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
                                            FlushToDisk flush) {
  TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
               "term", new_term);
  DCHECK(lock_.is_locked());
  if (PREDICT_FALSE(new_term <= CurrentTermUnlocked())) {
    return Status::IllegalState(
        Substitute("Cannot change term to a term that is lower than or equal to the current one. "
                   "Current: $0, Proposed: $1", CurrentTermUnlocked(), new_term));
  }
  cmeta_->set_current_term(new_term);
  cmeta_->clear_voted_for();
  if (flush == FLUSH_TO_DISK) {
    CHECK_OK(cmeta_->Flush());
  }
  ClearLeaderUnlocked();
  return Status::OK();
}

int64_t RaftConsensus::CurrentTermUnlocked() const {
  DCHECK(lock_.is_locked());
  return cmeta_->current_term();
}

string RaftConsensus::GetLeaderUuidUnlocked() const {
  DCHECK(lock_.is_locked());
  return cmeta_->leader_uuid();
}

bool RaftConsensus::HasLeaderUnlocked() const {
  DCHECK(lock_.is_locked());
  return !GetLeaderUuidUnlocked().empty();
}

void RaftConsensus::ClearLeaderUnlocked() {
  DCHECK(lock_.is_locked());
  leader_is_ready_ = false;
  cmeta_->set_leader_uuid("");
}

bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
  DCHECK(lock_.is_locked());
  return cmeta_->has_voted_for();
}

Status RaftConsensus::SetVotedForCurrentTermUnlocked(const string& uuid) {
  TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked",
               "uuid", uuid);
  DCHECK(lock_.is_locked());
  cmeta_->set_voted_for(uuid);
  CHECK_OK(cmeta_->Flush());
  return Status::OK();
}

const string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
  DCHECK(lock_.is_locked());
  DCHECK(cmeta_->has_voted_for());
  return cmeta_->voted_for();
}

const ConsensusOptions& RaftConsensus::GetOptions() const {
  return options_;
}

string RaftConsensus::LogPrefix() const {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  return LogPrefixUnlocked();
}

string RaftConsensus::LogPrefixUnlocked() const {
  DCHECK(lock_.is_locked());
  // 'cmeta_' may not be set if initialization failed.
  string cmeta_info;
  if (cmeta_) {
    cmeta_info = Substitute(" [term $0 $1]",
                            cmeta_->current_term(),
                            RaftPeerPB::Role_Name(cmeta_->active_role()));
  }
  return Substitute("T $0 P $1$2: ", options_.tablet_id, peer_uuid(), cmeta_info);
}

string RaftConsensus::ToString() const {
  ThreadRestrictions::AssertWaitAllowed();
  LockGuard l(lock_);
  return ToStringUnlocked();
}

string RaftConsensus::ToStringUnlocked() const {
  DCHECK(lock_.is_locked());
  return Substitute("Replica: $0, State: $1, Role: $2",
                    peer_uuid(), State_Name(state_), RaftPeerPB::Role_Name(cmeta_->active_role()));
}

int64_t RaftConsensus::MetadataOnDiskSize() const {
  return cmeta_->on_disk_size();
}

ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
  return cmeta_.get();
}

int64_t RaftConsensus::GetMillisSinceLastLeaderHeartbeat() const {
    return last_leader_communication_time_micros_ == 0 ?
        0 : (GetMonoTimeMicros() - last_leader_communication_time_micros_) / 1000;
}

////////////////////////////////////////////////////////////////////////
// ConsensusBootstrapInfo
////////////////////////////////////////////////////////////////////////

ConsensusBootstrapInfo::ConsensusBootstrapInfo()
  : last_id(MinimumOpId()),
    last_committed_id(MinimumOpId()) {
}

ConsensusBootstrapInfo::~ConsensusBootstrapInfo() {
  STLDeleteElements(&orphaned_replicates);
}

////////////////////////////////////////////////////////////////////////
// ConsensusRound
////////////////////////////////////////////////////////////////////////

ConsensusRound::ConsensusRound(RaftConsensus* consensus,
                               unique_ptr<ReplicateMsg> replicate_msg,
                               ConsensusReplicatedCallback replicated_cb)
    : consensus_(consensus),
      replicate_msg_(new RefCountedReplicate(replicate_msg.release())),
      replicated_cb_(std::move(replicated_cb)),
      bound_term_(-1) {}

ConsensusRound::ConsensusRound(RaftConsensus* consensus,
                               ReplicateRefPtr replicate_msg)
    : consensus_(consensus),
      replicate_msg_(std::move(replicate_msg)),
      bound_term_(-1) {
  DCHECK(replicate_msg_);
}

void ConsensusRound::NotifyReplicationFinished(const Status& status) {
  if (PREDICT_FALSE(!replicated_cb_)) return;
  replicated_cb_(status);
}

Status ConsensusRound::CheckBoundTerm(int64_t current_term) const {
  if (PREDICT_FALSE(bound_term_ != -1 &&
                    bound_term_ != current_term)) {
    return Status::Aborted(
      strings::Substitute(
        "Op submitted in term $0 cannot be replicated in term $1",
        bound_term_, current_term));
  }
  return Status::OK();
}

}  // namespace consensus
}  // namespace kudu
