// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/consensus/consensus_queue.h"

#include <algorithm>
#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_set>
#include <utility>

#include <gflags/gflags.h>
#include <google/protobuf/stubs/port.h>

#include "kudu/common/common.pb.h"
#include "kudu/common/timestamp.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/flag_validators.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/url-coding.h"

DEFINE_int32(consensus_max_batch_size_bytes, 1024 * 1024,
             "The maximum per-tablet RPC batch size when updating peers.");
TAG_FLAG(consensus_max_batch_size_bytes, advanced);

DEFINE_int32(follower_unavailable_considered_failed_sec, 300,
             "Seconds that a leader is unable to successfully heartbeat to a "
             "follower after which the follower is considered to be failed and "
             "evicted from the config.");
TAG_FLAG(follower_unavailable_considered_failed_sec, advanced);
TAG_FLAG(follower_unavailable_considered_failed_sec, runtime);

DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
             "Injects a random sleep between 0 and this many milliseconds into "
             "asynchronous notifications from the consensus queue back to the "
             "consensus implementation.");
TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);

DEFINE_double(consensus_fail_log_read_ops, 0.0,
              "Fraction of the time when reading from the log cache will fail");
TAG_FLAG(consensus_fail_log_read_ops, hidden);

DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_bool(safe_time_advancement_without_writes);
DECLARE_int32(consensus_rpc_timeout_ms);
DECLARE_int64(rpc_max_message_size);

using kudu::log::Log;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using std::atomic;
using std::optional;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
using strings::Substitute;

static bool ValidateMaxMessageSizeFlags() {
  static const int64_t kSizeDelta = 1024;
  const int64_t raft_max_size = FLAGS_consensus_max_batch_size_bytes;
  const int64_t rpc_max_size = FLAGS_rpc_max_message_size;
  if (raft_max_size + kSizeDelta > rpc_max_size) {
    LOG(ERROR) << strings::Substitute(
        "--consensus_max_batch_size_bytes is set too high compared with "
        "--rpc_max_message_size; either increase --rpc_max_message_size "
        "at least up to $0 or decrease --consensus_max_batch_size_bytes "
        "down to $1", raft_max_size + kSizeDelta, rpc_max_size - kSizeDelta);
    return false;
  }
  return true;
}
GROUP_FLAG_VALIDATOR(max_message_size_flags, ValidateMaxMessageSizeFlags);

namespace kudu {
namespace consensus {

METRIC_DEFINE_gauge_int64(tablet, majority_done_ops, "Leader Operations Acked by Majority",
                          MetricUnit::kOperations,
                          "Number of operations in the leader queue ack'd by a majority but "
                          "not all peers. This metric is always zero for followers.",
                          kudu::MetricLevel::kDebug);
METRIC_DEFINE_gauge_int64(tablet, in_progress_ops, "Operations in Progress",
                          MetricUnit::kOperations,
                          "Number of operations in the peer's queue ack'd by a minority of "
                          "peers.",
                          kudu::MetricLevel::kDebug);
METRIC_DEFINE_gauge_int64(tablet, ops_behind_leader, "Operations Behind Leader",
                          MetricUnit::kOperations,
                          "Number of operations this server believes it is behind the leader.",
                          kudu::MetricLevel::kWarn);

const char* PeerStatusToString(PeerStatus p) {
  switch (p) {
    case PeerStatus::OK: return "OK";
    case PeerStatus::REMOTE_ERROR: return "REMOTE_ERROR";
    case PeerStatus::RPC_LAYER_ERROR: return "RPC_LAYER_ERROR";
    case PeerStatus::TABLET_FAILED: return "TABLET_FAILED";
    case PeerStatus::TABLET_NOT_FOUND: return "TABLET_NOT_FOUND";
    case PeerStatus::INVALID_TERM: return "INVALID_TERM";
    case PeerStatus::LMP_MISMATCH: return "LMP_MISMATCH";
    case PeerStatus::CANNOT_PREPARE: return "CANNOT_PREPARE";
    case PeerStatus::NEW: return "NEW";
  }
  DCHECK(false);
  return "<unknown>";
}

PeerMessageQueue::TrackedPeer::TrackedPeer(RaftPeerPB peer_pb)
    : peer_pb(std::move(peer_pb)),
      next_index(kInvalidOpIdIndex),
      last_received(MinimumOpId()),
      last_known_committed_index(MinimumOpId().index()),
      last_exchange_status(PeerStatus::NEW),
      last_communication_time(MonoTime::Now()),
      wal_catchup_possible(true),
      remote_server_quiescing(false),
      last_overall_health_status(HealthReportPB::UNKNOWN),
      status_log_throttler(std::make_shared<logging::LogThrottler>()),
      last_seen_term_(0) {
}

std::string PeerMessageQueue::TrackedPeer::ToString() const {
  return Substitute("Peer: $0, Status: $1, Last received: $2, Next index: $3, "
                    "Last known committed idx: $4, Time since last communication: $5",
                    SecureShortDebugString(peer_pb),
                    PeerStatusToString(last_exchange_status),
                    OpIdToString(last_received), next_index,
                    last_known_committed_index,
                    (MonoTime::Now() - last_communication_time).ToString());
}

#define INSTANTIATE_METRIC(x) \
  x.Instantiate(metric_entity, 0)
PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
  : num_majority_done_ops(INSTANTIATE_METRIC(METRIC_majority_done_ops)),
    num_in_progress_ops(INSTANTIATE_METRIC(METRIC_in_progress_ops)),
    num_ops_behind_leader(INSTANTIATE_METRIC(METRIC_ops_behind_leader)) {
}
#undef INSTANTIATE_METRIC

PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
                                   scoped_refptr<log::Log> log,
                                   TimeManager* time_manager,
                                   RaftPeerPB local_peer_pb,
                                   string tablet_id,
                                   unique_ptr<ThreadPoolToken> raft_pool_observers_token,
                                   const atomic<bool>* server_quiescing,
                                   OpId last_locally_replicated,
                                   const OpId& last_locally_committed,
                                   const bool* allow_status_msg_for_failed_peer)
    : raft_pool_observers_token_(std::move(raft_pool_observers_token)),
      server_quiescing_(server_quiescing),
      local_peer_pb_(std::move(local_peer_pb)),
      tablet_id_(std::move(tablet_id)),
      successor_watch_in_progress_(false),
      log_cache_(metric_entity, std::move(log), local_peer_pb_.permanent_uuid(), tablet_id_),
      metrics_(metric_entity),
      time_manager_(time_manager),
      allow_status_msg_for_failed_peer_(allow_status_msg_for_failed_peer) {
  DCHECK(local_peer_pb_.has_permanent_uuid());
  DCHECK(local_peer_pb_.has_last_known_addr());
  DCHECK(last_locally_replicated.IsInitialized());
  DCHECK(last_locally_committed.IsInitialized());
  queue_state_.current_term = 0;
  queue_state_.first_index_in_current_term.reset();
  queue_state_.committed_index = 0;
  queue_state_.all_replicated_index = 0;
  queue_state_.majority_replicated_index = 0;
  queue_state_.last_idx_appended_to_leader = 0;
  queue_state_.mode = NON_LEADER;
  queue_state_.majority_size_ = -1;
  queue_state_.last_appended = std::move(last_locally_replicated);
  queue_state_.committed_index = last_locally_committed.index();
  queue_state_.state = kQueueOpen;
  // TODO(mpercy): Merge LogCache::Init() with its constructor.
  log_cache_.Init(queue_state_.last_appended);
}

void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
                                     int64_t current_term,
                                     const RaftConfigPB& active_config) {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  if (current_term != queue_state_.current_term) {
    CHECK_GT(current_term, queue_state_.current_term) << "Terms should only increase";
    queue_state_.first_index_in_current_term.reset();
    queue_state_.current_term = current_term;
  }

  queue_state_.committed_index = committed_index;
  queue_state_.majority_replicated_index = committed_index;
  queue_state_.active_config.reset(new RaftConfigPB(active_config));
  queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config));
  queue_state_.mode = LEADER;

  TrackLocalPeerUnlocked();
  CheckPeersInActiveConfigIfLeaderUnlocked();

  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
                                 << queue_state_.ToString();

  // Reset last communication time with all peers to reset the clock on the
  // failure timeout.
  const auto now = MonoTime::Now();
  for (const PeersMap::value_type& entry : peers_map_) {
    entry.second->last_communication_time = now;
  }
  time_manager_->SetLeaderMode();
}

void PeerMessageQueue::SetNonLeaderMode(const RaftConfigPB& active_config) {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  queue_state_.active_config.reset(new RaftConfigPB(active_config));
  queue_state_.mode = NON_LEADER;
  queue_state_.majority_size_ = -1;

  // Update this when stepping down, since it doesn't get tracked as LEADER.
  queue_state_.last_idx_appended_to_leader = queue_state_.last_appended.index();

  TrackLocalPeerUnlocked();

  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
                                 << queue_state_.ToString();

  time_manager_->SetNonLeaderMode();
}

void PeerMessageQueue::TrackPeer(const RaftPeerPB& peer_pb) {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  TrackPeerUnlocked(peer_pb);
}

void PeerMessageQueue::TrackPeerUnlocked(const RaftPeerPB& peer_pb) {
  CHECK(!peer_pb.permanent_uuid().empty()) << SecureShortDebugString(peer_pb);
  CHECK(peer_pb.has_member_type()) << SecureShortDebugString(peer_pb);
  DCHECK(queue_lock_.is_locked());
  DCHECK_EQ(queue_state_.state, kQueueOpen);

  TrackedPeer* tracked_peer = new TrackedPeer(peer_pb);
  // We don't know the last operation received by the peer so, following the
  // Raft protocol, we set next_index to one past the end of our own log. This
  // way, if calling this method is the result of a successful leader election
  // and the logs between the new leader and remote peer match, the
  // peer->next_index will point to the index of the soon-to-be-written NO_OP
  // entry that is used to assert leadership. If we guessed wrong, and the peer
  // does not have a log that matches ours, the normal queue negotiation
  // process will eventually find the right point to resume from.
  tracked_peer->next_index = queue_state_.last_appended.index() + 1;
  InsertOrDie(&peers_map_, tracked_peer->uuid(), tracked_peer);

  CheckPeersInActiveConfigIfLeaderUnlocked();

  // We don't know how far back this peer is, so set the all replicated watermark to
  // 0. We'll advance it when we know how far along the peer is.
  queue_state_.all_replicated_index = 0;
}

void PeerMessageQueue::UntrackPeer(const string& uuid) {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  UntrackPeerUnlocked(uuid);
}

void PeerMessageQueue::UntrackPeerUnlocked(const string& uuid) {
  DCHECK(queue_lock_.is_locked());
  TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid);
  delete peer; // Deleting a nullptr is safe.
}

void PeerMessageQueue::TrackLocalPeerUnlocked() {
  DCHECK(queue_lock_.is_locked());
  RaftPeerPB* local_peer_in_config;
  Status s = GetRaftConfigMember(queue_state_.active_config.get(),
                                 local_peer_pb_.permanent_uuid(),
                                 &local_peer_in_config);
  auto local_copy = local_peer_pb_;
  if (!s.ok()) {
    // The local peer is not a member of the config. The queue requires the
    // 'member_type' field to be set for any tracked peer, so we explicitly
    // mark the local peer as a NON_VOTER. This case is only possible when the
    // local peer is not the leader, so the choice is not particularly
    // important, but NON_VOTER is the most reasonable option.
    local_copy.set_member_type(RaftPeerPB::NON_VOTER);
    local_peer_in_config = &local_copy;
  }
  CHECK(local_peer_in_config->member_type() == RaftPeerPB::VOTER ||
        queue_state_.mode != LEADER)
      << "local peer " << local_peer_pb_.permanent_uuid()
      << " is not a voter in config: " << queue_state_.ToString();
  if (ContainsKey(peers_map_, local_peer_pb_.permanent_uuid())) {
    UntrackPeerUnlocked(local_peer_pb_.permanent_uuid());
  }
  TrackPeerUnlocked(*local_peer_in_config);
}

unordered_map<string, HealthReportPB> PeerMessageQueue::ReportHealthOfPeers() const {
  unordered_map<string, HealthReportPB> reports;
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  for (const auto& entry : peers_map_) {
    const string& peer_uuid = entry.first;
    const TrackedPeer* peer = entry.second;
    HealthReportPB report;
    auto overall_health = peer->last_overall_health_status;
    // We always consider the local peer (ourselves) to be healthy.
    // TODO(mpercy): Is this always a safe assumption?
    if (peer_uuid == local_peer_pb_.permanent_uuid()) {
      overall_health = HealthReportPB::HEALTHY;
    }
    report.set_overall_health(overall_health);
    reports.emplace(peer_uuid, std::move(report));
  }
  return reports;
}

void PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const {
  DCHECK(queue_lock_.is_locked());
  if (queue_state_.mode != LEADER) return;
  std::unordered_set<string> config_peer_uuids;
  for (const RaftPeerPB& peer_pb : queue_state_.active_config->peers()) {
    InsertOrDie(&config_peer_uuids, peer_pb.permanent_uuid());
  }
  for (const PeersMap::value_type& entry : peers_map_) {
    if (!ContainsKey(config_peer_uuids, entry.first)) {
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << Substitute("Peer $0 is not in the active config. "
                                                    "Queue state: $1",
                                                    entry.first,
                                                    queue_state_.ToString());
    }
  }
}

void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id,
                                               const StatusCallback& callback,
                                               const Status& status) {
  // If the server is being destructed, the log may be unavailable.
  if (PREDICT_FALSE(status.IsServiceUnavailable())) {
    callback(Status::OK());
    return;
  }
  CHECK_OK(status);

  // Fake an RPC response from the local peer.
  // TODO: we should probably refactor the ResponseFromPeer function
  // so that we don't need to construct this fake response, but this
  // seems to work for now.
  ConsensusResponsePB fake_response;
  fake_response.set_responder_uuid(local_peer_pb_.permanent_uuid());
  *fake_response.mutable_status()->mutable_last_received() = id;
  *fake_response.mutable_status()->mutable_last_received_current_leader() = id;
  {
    std::lock_guard<simple_spinlock> lock(queue_lock_);
    fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_index);
  }
  ResponseFromPeer(local_peer_pb_.permanent_uuid(), fake_response);

  callback(status);
}

Status PeerMessageQueue::AppendOperation(const ReplicateRefPtr& msg) {
  return AppendOperations({ msg }, [](const Status& s) {
    CrashIfNotOkStatusCB("Enqueued replicate operation failed to write to WAL", s);
  });
}

Status PeerMessageQueue::AppendOperations(vector<ReplicateRefPtr> msgs,
                                          const StatusCallback& log_append_callback) {

  DFAKE_SCOPED_LOCK(append_fake_lock_);
  std::unique_lock<simple_spinlock> lock(queue_lock_);

  OpId last_id = msgs.back()->get()->id();

  // "Snoop" on the appended operations to watch for term changes (as follower)
  // and to determine the first index in our term (as leader).
  //
  // TODO: it would be a cleaner design to explicitly set the first index in the
  // leader term as part of SetLeaderMode(). However, we are currently also
  // using that method to handle refreshing the peer list during configuration
  // changes, so the refactor isn't trivial.
  for (const auto& msg : msgs) {
    const auto& id = msg->get()->id();
    if (id.term() > queue_state_.current_term) {
      queue_state_.current_term = id.term();
      queue_state_.first_index_in_current_term = id.index();
    } else if (id.term() == queue_state_.current_term &&
               !queue_state_.first_index_in_current_term.has_value()) {
      queue_state_.first_index_in_current_term = id.index();
    }
  }

  // Update safe time in the TimeManager if we're leader.
  // This will 'unpin' safe time advancement, which had stopped since we
  // assigned a timestamp to the message.
  //
  // Until we have leader leases, replicas only call this when the message is committed.
  if (queue_state_.mode == LEADER) {
    time_manager_->AdvanceSafeTimeWithMessage(*msgs.back()->get());
  }

  // Unlock ourselves during Append to prevent a deadlock: it's possible that
  // the log buffer is full, in which case AppendOperations would block. However,
  // for the log buffer to empty, it may need to call LocalPeerAppendFinished()
  // which also needs queue_lock_.
  lock.unlock();
  RETURN_NOT_OK(log_cache_.AppendOperations(
      std::move(msgs), [this, last_id, log_append_callback](const Status& s) {
        this->LocalPeerAppendFinished(last_id, log_append_callback, s);
      }));
  lock.lock();
  DCHECK(last_id.IsInitialized());
  queue_state_.last_appended = last_id;
  UpdateMetricsUnlocked();

  return Status::OK();
}

void PeerMessageQueue::TruncateOpsAfter(int64_t index) {
  DFAKE_SCOPED_LOCK(append_fake_lock_); // should not race with append.
  OpId op;
  CHECK_OK_PREPEND(log_cache_.LookupOpId(index, &op),
                   Substitute("$0: cannot truncate ops after bad index $1",
                              LogPrefixUnlocked(),
                              index));
  {
    std::unique_lock<simple_spinlock> lock(queue_lock_);
    DCHECK(op.IsInitialized());
    queue_state_.last_appended = op;
  }
  log_cache_.TruncateOpsAfter(op.index());
}

OpId PeerMessageQueue::GetLastOpIdInLog() const {
  std::unique_lock<simple_spinlock> lock(queue_lock_);
  DCHECK(queue_state_.last_appended.IsInitialized());
  return queue_state_.last_appended;
}

OpId PeerMessageQueue::GetNextOpId() const {
  std::unique_lock<simple_spinlock> lock(queue_lock_);
  DCHECK(queue_state_.last_appended.IsInitialized());
  return MakeOpId(queue_state_.current_term,
                  queue_state_.last_appended.index() + 1);
}

bool PeerMessageQueue::SafeToEvictUnlocked(const string& evict_uuid) const {
  DCHECK(queue_lock_.is_locked());
  DCHECK_EQ(LEADER, queue_state_.mode);
  auto now = MonoTime::Now();

  int remaining_voters = 0;
  int remaining_viable_voters = 0;

  for (const auto& e : peers_map_) {
    const auto& uuid = e.first;
    const auto& peer = e.second;
    if (uuid == evict_uuid) {
      continue;
    }
    if (!IsRaftConfigVoter(uuid, *queue_state_.active_config)) {
      continue;
    }
    remaining_voters++;

    bool viable = true;
    // Being alive, the local peer itself (the leader) is always a viable
    // voter: the criteria below apply only to non-local peers.
    if (uuid != local_peer_pb_.permanent_uuid()) {
      // Only consider a peer to be a viable voter if...
      // ...its last exchange was successful
      viable &= peer->last_exchange_status == PeerStatus::OK;

      // ...the peer is up to date with the latest majority.
      //
      //    This indicates that it's actively participating in majorities and likely to
      //    replicate a config change immediately when we propose it.
      viable &= peer->last_received.index() >= queue_state_.majority_replicated_index;

      // ...we have communicated successfully with it recently.
      //
      //    This handles the case where the tablet has had no recent writes and therefore
      //    even a replica that is down would have participated in the latest majority.
      auto unreachable_time = now - peer->last_communication_time;
      viable &= unreachable_time.ToMilliseconds() < FLAGS_consensus_rpc_timeout_ms;
    }
    if (viable) {
      remaining_viable_voters++;
    }
  }

  // We never drop from 2 to 1 automatically, at least for now. We may want
  // to revisit this later, we're just being cautious with this.
  if (remaining_voters <= 1) {
    VLOG(2) << LogPrefixUnlocked() << "Not evicting P $0 (only one voter would remain)";
    return false;
  }
  // Don't evict anything if the remaining number of viable voters is not enough
  // to form a quorum.
  if (remaining_viable_voters < MajoritySize(remaining_voters)) {
    VLOG(2) << LogPrefixUnlocked() << Substitute(
        "Not evicting P $0 (only $1/$2 remaining voters appear viable)",
        evict_uuid, remaining_viable_voters, remaining_voters);
    return false;
  }

  return true;
}

void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
  DCHECK(queue_lock_.is_locked());
  DCHECK_EQ(LEADER, queue_state_.mode);

  auto overall_health_status = PeerHealthStatus(*peer);

  // Prepare error messages for different conditions.
  string error_msg;
  if (overall_health_status == HealthReportPB::FAILED ||
      overall_health_status == HealthReportPB::FAILED_UNRECOVERABLE) {
    if (peer->last_exchange_status == PeerStatus::TABLET_FAILED) {
      error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid());
    } else if (!peer->wal_catchup_possible) {
      error_msg = Substitute("The logs necessary to catch up peer $0 have been "
                             "garbage collected. The replica will never be able "
                             "to catch up", peer->uuid());
    } else {
      error_msg = Substitute("Leader has been unable to successfully communicate "
                             "with peer $0 for more than $1 seconds ($2)",
                             peer->uuid(),
                             FLAGS_follower_unavailable_considered_failed_sec,
                             (MonoTime::Now() - peer->last_communication_time).ToString());
    }
  }

  bool changed = overall_health_status != peer->last_overall_health_status;
  peer->last_overall_health_status = overall_health_status;

  if (FLAGS_raft_prepare_replacement_before_eviction) {
    // Only take action when there is a change.
    if (changed) {
      // Only log a message when the status changes to some flavor of failure.
      if (overall_health_status == HealthReportPB::FAILED ||
          overall_health_status == HealthReportPB::FAILED_UNRECOVERABLE) {
        LOG_WITH_PREFIX_UNLOCKED(INFO) << error_msg;
      }
      NotifyObserversOfPeerHealthChange();
    }
  } else {
    if ((overall_health_status == HealthReportPB::FAILED ||
         overall_health_status == HealthReportPB::FAILED_UNRECOVERABLE) &&
        SafeToEvictUnlocked(peer->uuid())) {
      NotifyObserversOfFailedFollower(peer->uuid(), queue_state_.current_term, error_msg);
    }
  }
}

// While reporting on the replica health status, it's important to report on
// the 'definitive' health statuses once they surface. That allows the system
// to expedite decisions on replica replacement because the more 'definitive'
// statuses have less uncertainty and provide more information (compared
// with less 'definitive' statuses). Informally, the level of 'definitiveness'
// could be measured by the number of possible state transitions on the replica
// health status state diagram.
//
// The health status chain below has increasing level of 'definitiveness'
// left to right:
//
//   UNKNOWN --> HEALTHY --> FAILED --> FAILED_UNRECOVERABLE
//
// For example, in the case when a replica has been unreachable longer than the
// time interval specified by the --follower_unavailable_considered_failed_sec
// flag, the system should start reporting its health status as FAILED.
// However, once the replica falls behind the WAL log GC threshold, the system
// should start reporting its healths status as FAILED_UNRECOVERABLE. The code
// below is written to adhere to that informal policy.
HealthReportPB::HealthStatus PeerMessageQueue::PeerHealthStatus(const TrackedPeer& peer) {
  // Replicas that have fallen behind the leader's retained WAL segments are
  // failed irrecoverably and will not come back because they cannot ever catch
  // up with the leader replica.
  if (!peer.wal_catchup_possible) {
    return HealthReportPB::FAILED_UNRECOVERABLE;
  }

  // Replicas returning TABLET_FAILED status are considered irrecoverably
  // failed because the TABLED_FAILED status manifests about IO failures
  // caused by disk corruption, etc.
  if (peer.last_exchange_status == PeerStatus::TABLET_FAILED) {
    return HealthReportPB::FAILED_UNRECOVERABLE;
  }

  // Replicas which have been unreachable for too long are considered failed,
  // unless it's known that they have failed irrecoverably (see above). They
  // might come back at some point and successfully catch up with the leader.
  auto max_unreachable = MonoDelta::FromSeconds(FLAGS_follower_unavailable_considered_failed_sec);
  if (MonoTime::Now() - peer.last_communication_time > max_unreachable) {
    return HealthReportPB::FAILED;
  }

  // The happy case: replicas returned OK during the recent exchange are considered healthy.
  if (peer.last_exchange_status == PeerStatus::OK) {
    return HealthReportPB::HEALTHY;
  }

  // Other cases are for various situations when there hasn't been a contact
  // with the replica yet or it's impossible to definitely tell the health
  // status of the replica based on the last exchange status (transient error,
  // etc.). For such cases, the replica health status is reported as UNKNOWN.
  return HealthReportPB::UNKNOWN;
}

Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                        ConsensusRequestPB* request,
                                        vector<ReplicateRefPtr>* msg_refs,
                                        bool* needs_tablet_copy) {
  // Maintain a thread-safe copy of necessary members.
  OpId preceding_id;
  int64_t current_term;
  TrackedPeer peer_copy;
  MonoDelta unreachable_time;
  {
    std::lock_guard<simple_spinlock> lock(queue_lock_);
    DCHECK_EQ(queue_state_.state, kQueueOpen);
    DCHECK_NE(uuid, local_peer_pb_.permanent_uuid());

    TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
      return Status::NotFound(Substitute("peer $0 is no longer tracked or "
                                         "queue is not in leader mode", uuid));
    }
    peer_copy = *peer;

    // Clear the requests without deleting the entries, as they may be in use by other peers.
    request->mutable_ops()->ExtractSubrange(0, request->ops_size(), nullptr);

    // This is initialized to the queue's last appended op but gets set to the id of the
    // log entry preceding the first one in 'messages' if messages are found for the peer.
    preceding_id = queue_state_.last_appended;
    current_term = queue_state_.current_term;

    request->set_committed_index(queue_state_.committed_index);
    request->set_all_replicated_index(queue_state_.all_replicated_index);
    request->set_last_idx_appended_to_leader(queue_state_.last_appended.index());
    request->set_caller_term(current_term);
    unreachable_time = MonoTime::Now() - peer_copy.last_communication_time;
  }

  // Always trigger a health status update check at the end of this function.
  bool wal_catchup_progress = false;
  bool wal_catchup_failure = false;
  SCOPED_CLEANUP({
      std::lock_guard<simple_spinlock> lock(queue_lock_);
      TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
      if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
        VLOG(1) << LogPrefixUnlocked() << "peer " << uuid
                << " is no longer tracked or queue is not in leader mode";
        return;
      }
      if (wal_catchup_progress) peer->wal_catchup_possible = true;
      if (wal_catchup_failure) peer->wal_catchup_possible = false;
      UpdatePeerHealthUnlocked(peer);
    });

  if (peer_copy.last_exchange_status == PeerStatus::TABLET_NOT_FOUND) {
    VLOG(3) << LogPrefixUnlocked() << "Peer " << uuid << " needs tablet copy" << THROTTLE_MSG;
    *needs_tablet_copy = true;
    return Status::OK();
  }
  *needs_tablet_copy = false;

  // Should we send log messages to the peer?
  // If we've never communicated with the peer, we don't know what messages to
  // send, so we'll send a status-only request instead.
  //
  // There are cases where it's beneficial to send status-only messages to a peer
  // in FAILED_UNRECOVERABLE state as it helps the peer transition out of the
  // FAILED_UNRECOVERABLE state, case in point external system catalog copy that's done when
  // adding a new master. In such a case don't try to send log messages which is expected
  // to fail because the log has been GC'ed but instead send status-only message.
  if (peer_copy.last_exchange_status != PeerStatus::NEW &&
      (allow_status_msg_for_failed_peer_ == nullptr || !*allow_status_msg_for_failed_peer_ ||
       peer_copy.wal_catchup_possible)) {
    // We grab requests from the log starting at the last_received point.

    // The batch of messages to send to the peer.
    vector<ReplicateRefPtr> messages;
    int64_t max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSizeLong();

    // We try to get the follower's next_index from our log.
    Status s = log_cache_.ReadOps(peer_copy.next_index - 1,
                                  max_batch_size,
                                  &messages,
                                  &preceding_id);

    // Inject failure to simulate follower falling behind and the leader has GC'ed its logs.
    if (PREDICT_FALSE(fault_injection::MaybeTrue(FLAGS_consensus_fail_log_read_ops))) {
      wal_catchup_failure = true;
      return Status::NotFound("INJECTED FAILURE");
    }

    if (PREDICT_FALSE(!s.ok())) {
      // It's normal to have a NotFound() here if a follower falls behind where
      // the leader has GCed its logs. The follower replica will hang around
      // for a while until it's evicted.
      if (PREDICT_TRUE(s.IsNotFound())) {
        KLOG_EVERY_N_SECS_THROTTLER(INFO, 60, *peer_copy.status_log_throttler, "logs_gced")
            << LogPrefixUnlocked()
            << Substitute("The logs necessary to catch up peer $0 have been "
                          "garbage collected. The follower will never be able "
                          "to catch up ($1)", uuid, s.ToString());
        wal_catchup_failure = true;
        return s;
      }
      if (s.IsIncomplete()) {
        // IsIncomplete() means that we tried to read beyond the head of the log
        // (in the future). This is usually a sign that this peer is under load
        // and is about to step down as leader. See KUDU-1078.
        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Error trying to read ahead of the log "
                                       << "while preparing peer request: "
                                       << s.ToString() << ". Destination peer: "
                                       << peer_copy.ToString();
        return s;
      }
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: "
                                      << s.ToString() << ". Destination peer: "
                                      << peer_copy.ToString();
    }

    // Since we were able to read ops through the log cache, we know that
    // catchup is possible.
    wal_catchup_progress = true;

    // We use AddAllocated rather than copy, because we pin the log cache at the
    // "all replicated" point. At some point we may want to allow partially loading
    // (and not pinning) earlier messages. At that point we'll need to do something
    // smarter here, like copy or ref-count.
    for (const ReplicateRefPtr& msg : messages) {
      request->mutable_ops()->AddAllocated(msg->get());
    }
    msg_refs->swap(messages);
  }

  DCHECK(preceding_id.IsInitialized());
  request->mutable_preceding_id()->CopyFrom(preceding_id);

  // If we are sending ops to the follower, but the batch doesn't reach the current
  // committed index, we can consider the follower lagging, and it's worth
  // logging this fact periodically.
  if (request->ops_size() > 0) {
    int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
    if (last_op_sent < request->committed_index()) {
      KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, *peer_copy.status_log_throttler, "lagging")
          << LogPrefixUnlocked() << "Peer " << uuid << " is lagging by at least "
          << (request->committed_index() - last_op_sent)
          << " ops behind the committed index " << THROTTLE_MSG;
    }
  // If we're not sending ops to the follower, set the safe time on the request.
  // TODO(dralves) When we have leader leases, send this all the time.
  } else {
    if (PREDICT_TRUE(FLAGS_safe_time_advancement_without_writes)) {
      request->set_safe_timestamp(time_manager_->GetSafeTime().value());
    } else {
      KLOG_EVERY_N_SECS(WARNING, 300) << "Safe time advancement without writes is disabled. "
            "Snapshot reads on non-leader replicas may stall if there are no writes in progress.";
    }
  }

  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
    if (request->ops_size() > 0) {
      VLOG_WITH_PREFIX_UNLOCKED(2)
          << Substitute("Sending request with operations to Peer: $0. Size: $1. From: $2. To: $3",
                        uuid,
                        request->ops_size(),
                        SecureShortDebugString(request->ops(0).id()),
                        SecureShortDebugString(request->ops(request->ops_size() - 1).id()));
    } else {
      VLOG_WITH_PREFIX_UNLOCKED(2)
          << Substitute("Sending status only request to Peer: $0: $1",
                        uuid,
                        SecureDebugString(*request));
    }
  }

  return Status::OK();
}

Status PeerMessageQueue::GetTabletCopyRequestForPeer(const string& uuid,
                                                     StartTabletCopyRequestPB* req) {
  TrackedPeer* peer = nullptr;
  int64_t current_term;
  {
    std::lock_guard<simple_spinlock> lock(queue_lock_);
    DCHECK_EQ(queue_state_.state, kQueueOpen);
    DCHECK_NE(uuid, local_peer_pb_.permanent_uuid());
    peer = FindPtrOrNull(peers_map_, uuid);
    current_term = queue_state_.current_term;
    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
      return Status::NotFound("Peer not tracked or queue not in leader mode.");
    }
    if (PREDICT_FALSE(peer->last_exchange_status != PeerStatus::TABLET_NOT_FOUND)) {
      return Status::IllegalState("Peer does not need to initiate Tablet Copy", uuid);
    }
  }
  req->Clear();
  req->set_dest_uuid(uuid);
  req->set_tablet_id(tablet_id_);
  req->set_copy_peer_uuid(local_peer_pb_.permanent_uuid());
  *req->mutable_copy_peer_addr() = local_peer_pb_.last_known_addr();
  req->set_caller_term(current_term);
  return Status::OK();
}

void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
                                             int64_t* watermark,
                                             const OpId& replicated_before,
                                             const OpId& replicated_after,
                                             int num_peers_required,
                                             ReplicaTypes replica_types,
                                             const TrackedPeer* who_caused) {
  DCHECK(queue_lock_.is_locked());
  VLOG_WITH_PREFIX_UNLOCKED(2)
      << Substitute("Updating $0 watermark: Peer ($1) changed from $2 to $3. "
                    "Current value: $4",
                    type,
                    who_caused->ToString(),
                    OpIdToString(replicated_before),
                    OpIdToString(replicated_after),
                    *watermark);

  // Go through the peer's watermarks, we want the highest watermark that
  // 'num_peers_required' of peers has replicated. To find this we do the
  // following:
  // - Store all the peer's 'last_received' in a vector
  // - Sort the vector
  // - Find the vector.size() - 'num_peers_required' position, this
  //   will be the new 'watermark'.
  vector<int64_t> watermarks;
  for (const PeersMap::value_type& peer : peers_map_) {
    if (replica_types == VOTER_REPLICAS &&
        peer.second->peer_pb.member_type() != RaftPeerPB::VOTER) {
      continue;
    }
    // TODO(todd): The fact that we only consider peers whose last exchange was
    // successful can cause the "all_replicated" watermark to lag behind
    // farther than necessary. For example:
    // - local peer has replicated opid 100
    // - remote peer A has replicated opid 100
    // - remote peer B has replication opid 10 and is catching up
    // - remote peer A goes down
    // Here we'd start getting a non-OK last_exchange_status for peer A.
    // In that case, the 'all_replicated_watermark', which requires 3 peers, would not
    // be updateable, even once we've replicated peer 'B' up to opid 100. It would
    // get "stuck" at 10. In fact, in this case, the 'majority_replicated_watermark' would
    // also move *backwards* when peer A started getting errors.
    //
    // The issue with simply removing this condition is that 'last_received' does not
    // perfectly correspond to the 'match_index' in Raft Figure 2. It is simply the
    // highest operation in a peer's log, regardless of whether that peer currently
    // holds a prefix of the leader's log. So, in the case that the last exchange
    // was an error (LMP mismatch, for example), the 'last_received' is _not_ usable
    // for watermark calculation. This could be fixed by separately storing the
    // 'match_index' on a per-peer basis and using that for watermark calculation.
    if (peer.second->last_exchange_status == PeerStatus::OK) {
      watermarks.push_back(peer.second->last_received.index());
    }
  }

  // If we haven't enough peers to calculate the watermark return.
  if (watermarks.size() < num_peers_required) {
    VLOG_WITH_PREFIX_UNLOCKED(3) << "Watermarks size: " << watermarks.size() << ", "
                                 << "Num peers required: " << num_peers_required;
    return;
  }

  std::sort(watermarks.begin(), watermarks.end());

  int64_t new_watermark = watermarks[watermarks.size() - num_peers_required];
  int64_t old_watermark = *watermark;
  *watermark = new_watermark;

  VLOG_WITH_PREFIX_UNLOCKED(1)
      << Substitute("Updated $0 watermark from $1 to $2",
                    type, old_watermark, new_watermark);
  if (VLOG_IS_ON(3)) {
    VLOG_WITH_PREFIX_UNLOCKED(3) << "Peers: ";
    for (const auto& peer : peers_map_) {
      VLOG_WITH_PREFIX_UNLOCKED(3) << "Peer: " << peer.second->ToString();
    }
    VLOG_WITH_PREFIX_UNLOCKED(3) << "Sorted watermarks:";
    for (const auto watermark : watermarks) {
      VLOG_WITH_PREFIX_UNLOCKED(3) << "Watermark: " << watermark;
    }
  }
}

void PeerMessageQueue::BeginWatchForSuccessor(
    const optional<string>& successor_uuid) {
  std::lock_guard<simple_spinlock> l(queue_lock_);
  successor_watch_in_progress_ = true;
  designated_successor_uuid_ = successor_uuid;
}

void PeerMessageQueue::EndWatchForSuccessor() {
  std::lock_guard<simple_spinlock> l(queue_lock_);
  successor_watch_in_progress_ = false;
}

void PeerMessageQueue::UpdateFollowerWatermarks(int64_t committed_index,
                                                int64_t all_replicated_index) {
  std::lock_guard<simple_spinlock> l(queue_lock_);
  DCHECK_EQ(queue_state_.mode, NON_LEADER);
  queue_state_.committed_index = committed_index;
  queue_state_.all_replicated_index = all_replicated_index;
  UpdateMetricsUnlocked();
}

void PeerMessageQueue::UpdateLastIndexAppendedToLeader(int64_t last_idx_appended_to_leader) {
  std::lock_guard<simple_spinlock> l(queue_lock_);
  DCHECK_EQ(queue_state_.mode, NON_LEADER);
  queue_state_.last_idx_appended_to_leader = last_idx_appended_to_leader;
  UpdateLagMetricsUnlocked();
}

void PeerMessageQueue::UpdatePeerStatus(const string& peer_uuid,
                                        PeerStatus ps,
                                        const Status& status) {
  std::unique_lock<simple_spinlock> l(queue_lock_);
  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
  if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
    VLOG(1) << LogPrefixUnlocked() << "peer " << peer_uuid
            << " is no longer tracked or queue is not in leader mode";
    return;
  }
  peer->last_exchange_status = ps;

  if (ps != PeerStatus::RPC_LAYER_ERROR) {
    // So long as we got _any_ response from the follower, we consider it a 'communication'.
    // RPC_LAYER_ERROR indicates something like a connection failure, indicating that the
    // host itself is likely down.
    //
    // This indicates that the node is at least online.
    peer->last_communication_time = MonoTime::Now();
  }

  switch (ps) {
    case PeerStatus::NEW:
      LOG_WITH_PREFIX_UNLOCKED(DFATAL) << "Should not update an existing peer to 'NEW' state";
      break;

    case PeerStatus::RPC_LAYER_ERROR:
      // Most controller errors are caused by network issues or corner cases
      // like shutdown and failure to deserialize a protobuf. Therefore, we
      // generally consider these errors to indicate an unreachable peer.
      DCHECK(!status.ok());
      break;

    case PeerStatus::TABLET_NOT_FOUND:
      VLOG_WITH_PREFIX_UNLOCKED(1) << "Peer needs tablet copy: " << peer->ToString();
      break;

    case PeerStatus::TABLET_FAILED: {
      UpdatePeerHealthUnlocked(peer);
      return;
    }

    case PeerStatus::REMOTE_ERROR:
    case PeerStatus::INVALID_TERM:
    case PeerStatus::LMP_MISMATCH:
    case PeerStatus::CANNOT_PREPARE:
      // No special handling here for these - we assume that we'll just retry until
      // we make progress.
      break;

    case PeerStatus::OK:
      DCHECK(status.ok());
      break;
  }
}

void PeerMessageQueue::UpdateExchangeStatus(TrackedPeer* peer,
                                            const TrackedPeer& prev_peer_state,
                                            const ConsensusResponsePB& response,
                                            bool* lmp_mismatch) {
  DCHECK(queue_lock_.is_locked());
  const ConsensusStatusPB& status = response.status();

  peer->last_communication_time = MonoTime::Now();
  peer->last_known_committed_index = status.last_committed_idx();

  if (PREDICT_TRUE(!status.has_error())) {
    peer->last_exchange_status = PeerStatus::OK;
    *lmp_mismatch = false;
    return;
  }

  switch (status.error().code()) {
    case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH:
      peer->last_exchange_status = PeerStatus::LMP_MISMATCH;
      DCHECK(status.has_last_received());
      if (prev_peer_state.last_exchange_status == PeerStatus::NEW) {
        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString();
      } else {
        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
                                       << peer->ToString();
      }
      *lmp_mismatch = true;
      return;

    case ConsensusErrorPB::INVALID_TERM:
      peer->last_exchange_status = PeerStatus::INVALID_TERM;
      CHECK(response.has_responder_term());
      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " << peer->ToString();
      NotifyObserversOfTermChange(response.responder_term());
      *lmp_mismatch = false;
      return;

    default:
      // Other ConsensusStatusPB error codes (such as remote errors) are
      // supposed to be handled higher up in the stack.
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: "
          << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response: "
          << SecureShortDebugString(response);
  }
}

void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
                                       const ConsensusStatusPB& status) {
  DCHECK(queue_lock_.is_locked());
  if (queue_state_.mode != PeerMessageQueue::LEADER ||
      peer->last_exchange_status != PeerStatus::OK) {
    return;
  }

  // TODO(mpercy): It would be more efficient to cache the member type in the
  // TrackedPeer data structure.
  RaftPeerPB* peer_pb;
  Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
                                 peer->uuid(), &peer_pb);
  if (s.ok() &&
      peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
      peer_pb->attrs().promote()) {

    // Only promote the peer if it is within one round-trip of being fully
    // caught-up with the current commit index, as measured by recent
    // UpdateConsensus() operation batch sizes.

    // If we had never previously contacted this peer, wait until the second
    // time we contact them to try to promote them.
    if (prev_peer_state.last_received.index() == 0) return;

    int64_t last_batch_size =
        std::max<int64_t>(0, peer->last_received.index() - prev_peer_state.last_received.index());
    bool peer_caught_up =
        !OpIdEquals(status.last_received_current_leader(), MinimumOpId()) &&
        status.last_received_current_leader().index() + last_batch_size
            >= queue_state_.committed_index;
    if (!peer_caught_up) return;

    // TODO(mpercy): Implement a SafeToPromote() check to ensure that we only
    // try to promote a NON_VOTER to VOTER if we will be able to commit the
    // resulting config change operation.
    NotifyObserversOfPeerToPromote(peer->uuid());
  }
}

void PeerMessageQueue::TransferLeadershipIfNeeded(const TrackedPeer& peer,
                                                  const ConsensusStatusPB& status) {
  DCHECK(queue_lock_.is_locked());
  bool server_quiescing = server_quiescing_ && *server_quiescing_;
  // Only transfer leadership if the local peer has begun looking for a
  // successor, or if the server is quiescing. Otherwise, exit early.
  if (!successor_watch_in_progress_ && !server_quiescing) {
    return;
  }

  // Do some basic sanity checks that we can actually transfer leadership to
  // the given peer.
  if (queue_state_.mode != PeerMessageQueue::LEADER ||
      local_peer_pb_.permanent_uuid() == peer.uuid() ||
      peer.last_exchange_status != PeerStatus::OK ||
      peer.remote_server_quiescing) {
    return;
  }

  // If looking for a specific successor, ignore peers as appropriate.
  if (successor_watch_in_progress_ &&
      designated_successor_uuid_ && peer.uuid() != *designated_successor_uuid_) {
    return;
  }

  RaftPeerPB* peer_pb;
  Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
                                 peer.uuid(), &peer_pb);
  if (!s.ok() || peer_pb->member_type() != RaftPeerPB::VOTER) {
    return;
  }

  bool peer_caught_up =
      !OpIdEquals(status.last_received_current_leader(), MinimumOpId()) &&
      OpIdEquals(status.last_received_current_leader(), queue_state_.last_appended);
  if (!peer_caught_up) {
    return;
  }

  VLOG(1) << "Successor watch: peer " << peer.uuid() << " is caught up to "
          << "the leader at OpId " << OpIdToString(status.last_received_current_leader());
  successor_watch_in_progress_ = false;
  NotifyObserversOfSuccessor(peer.uuid());
}

bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
                                        const ConsensusResponsePB& response) {
  DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
      << response.InitializationErrorString() << ". Response: " << SecureShortDebugString(response);
  CHECK(!response.has_error());

  bool send_more_immediately = false;
  optional<int64_t> updated_commit_index;
  Mode mode_copy;
  {
    std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);

    TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
    if (PREDICT_FALSE(queue_state_.state != kQueueOpen || peer == nullptr)) {
      LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked, disregarding "
          "peer response. Response: " << SecureShortDebugString(response);
      return send_more_immediately;
    }

    // Sanity checks.
    // Some of these can be eventually removed, but they are handy for now.
    DCHECK(response.status().IsInitialized())
        << "Error: Uninitialized: " << response.InitializationErrorString()
        << ". Response: "<< SecureShortDebugString(response);
    // TODO(mpercy): Include uuid in error messages as well.
    DCHECK(response.has_responder_uuid() && !response.responder_uuid().empty())
        << "Got response from peer with empty UUID";

    DCHECK(!response.has_error()); // Application-level errors should be handled elsewhere.
    DCHECK(response.has_status()); // Responses should always have a status.
    // The status must always have a last received op id and a last committed index.
    const ConsensusStatusPB& status = response.status();
    DCHECK(status.has_last_received());
    DCHECK(status.has_last_received_current_leader());
    DCHECK(status.has_last_committed_idx());

    // Take a snapshot of the previously-recorded peer state.
    const TrackedPeer prev_peer_state = *peer;

    // Update the peer's last exchange status based on the response.
    // In this case, if there is a log matching property (LMP) mismatch, we
    // want to immediately send another request as we attempt to sync the log
    // offset between the local leader and the remote peer.
    UpdateExchangeStatus(peer, prev_peer_state, response, &send_more_immediately);

    // If the peer is hosted on a server that is quiescing, note that now.
    peer->remote_server_quiescing = response.has_server_quiescing() &&
                                    response.server_quiescing();

    // If the reported last-received op for the replica is in our local log,
    // then resume sending entries from that point onward. Otherwise, resume
    // after the last op they received from us. If we've never successfully
    // sent them anything, start after the last-committed op in their log, which
    // is guaranteed by the Raft protocol to be a valid op.

    bool peer_has_prefix_of_log = IsOpInLog(status.last_received());
    if (peer_has_prefix_of_log) {
      // If the latest thing in their log is in our log, we are in sync.
      peer->last_received = status.last_received();
      peer->next_index = peer->last_received.index() + 1;

      // Check if the peer is a NON_VOTER candidate ready for promotion.
      PromoteIfNeeded(peer, prev_peer_state, status);

      TransferLeadershipIfNeeded(*peer, status);
    } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
      // Their log may have diverged from ours, however we are in the process
      // of replicating our ops to them, so continue doing so. Eventually, we
      // will cause the divergent entry in their log to be overwritten.
      peer->last_received = status.last_received_current_leader();
      peer->next_index = peer->last_received.index() + 1;

    } else {
      // The peer is divergent and they have not (successfully) received
      // anything from us yet. Start sending from their last committed index.
      // This logic differs from the Raft spec slightly because instead of
      // stepping back one-by-one from the end until we no longer have an LMP
      // error, we jump back to the last committed op indicated by the peer with
      // the hope that doing so will result in a faster catch-up process.
      DCHECK_GE(peer->last_known_committed_index, 0);
      peer->next_index = peer->last_known_committed_index + 1;
      LOG_WITH_PREFIX_UNLOCKED(INFO)
          << "Peer " << peer_uuid << " log is divergent from this leader: "
          << "its last log entry " << OpIdToString(status.last_received()) << " is not in "
          << "this leader's log and it has not received anything from this leader yet. "
          << "Falling back to committed index " << peer->last_known_committed_index;
    }

    if (peer->last_exchange_status != PeerStatus::OK) {
      // In this case, 'send_more_immediately' has already been set by
      // UpdateExchangeStatus() to true in the case of an LMP mismatch, false
      // otherwise.
      return send_more_immediately;
    }

    if (response.has_responder_term()) {
      // The peer must have responded with a term that is greater than or equal to
      // the last known term for that peer.
      peer->CheckMonotonicTerms(response.responder_term());

      // If the responder didn't send an error back that must mean that it has
      // a term that is the same or lower than ours.
      CHECK_LE(response.responder_term(), queue_state_.current_term);
    }

    VLOG_WITH_PREFIX_UNLOCKED(2)
        << Substitute("Received Response from Peer ($0). Response: $1",
                      peer->ToString(), SecureShortDebugString(response));

    mode_copy = queue_state_.mode;

    // If we're the leader, we can compute the new watermarks based on the progress
    // of our followers.
    // NOTE: it's possible this node might have lost its leadership (and the notification
    // is just pending behind the lock we're holding), but any future leader will observe
    // the same watermarks and make the same advancement, so this is safe.
    if (mode_copy == LEADER) {
      // Advance the majority replicated index.
      AdvanceQueueWatermark("majority_replicated",
                            &queue_state_.majority_replicated_index,
                            /*replicated_before=*/ prev_peer_state.last_received,
                            /*replicated_after=*/ peer->last_received,
                            /*num_peers_required=*/ queue_state_.majority_size_,
                            VOTER_REPLICAS,
                            peer);

      // Advance the all replicated index.
      AdvanceQueueWatermark("all_replicated",
                            &queue_state_.all_replicated_index,
                            /*replicated_before=*/ prev_peer_state.last_received,
                            /*replicated_after=*/ peer->last_received,
                            /*num_peers_required=*/ peers_map_.size(),
                            ALL_REPLICAS,
                            peer);

      // If the majority-replicated index is in our current term,
      // and it is above our current committed index, then
      // we can advance the committed index.
      //
      // It would seem that the "it is above our current committed index"
      // check is redundant (and could be a CHECK), but in fact the
      // majority-replicated index can currently go down, since we don't
      // consider peers whose last contact was an error in the watermark
      // calculation. See the TODO in AdvanceQueueWatermark() for more details.
      int64_t commit_index_before = queue_state_.committed_index;
      if (queue_state_.first_index_in_current_term.has_value() &&
          queue_state_.majority_replicated_index >= queue_state_.first_index_in_current_term &&
          queue_state_.majority_replicated_index > queue_state_.committed_index) {
        queue_state_.committed_index = queue_state_.majority_replicated_index;
      } else {
        VLOG_WITH_PREFIX_UNLOCKED(2) << "Cannot advance commit index, waiting for > "
                                     << "first index in current leader term: "
                                     << *queue_state_.first_index_in_current_term << ". "
                                     << "current majority_replicated_index: "
                                     << queue_state_.majority_replicated_index << ", "
                                     << "current committed_index: "
                                     << queue_state_.committed_index;

      }

      // Only notify observers if the commit index actually changed.
      if (queue_state_.committed_index != commit_index_before) {
        DCHECK_GT(queue_state_.committed_index, commit_index_before);
        updated_commit_index = queue_state_.committed_index;
        VLOG_WITH_PREFIX_UNLOCKED(2) << "Commit index advanced from "
                                     << commit_index_before << " to "
                                     << *updated_commit_index;
      }
    }

    // If the peer's committed index is lower than our own, or if our log has
    // the next request for the peer, set 'send_more_immediately' to true.
    send_more_immediately = peer->last_known_committed_index < queue_state_.committed_index ||
                            log_cache_.HasOpBeenWritten(peer->next_index);

    log_cache_.EvictThroughOp(queue_state_.all_replicated_index);

    UpdateMetricsUnlocked();
  }

  if (mode_copy == LEADER && updated_commit_index.has_value()) {
  // Suppress false positive about 'updated_commit_index' used when uninitialized.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
    NotifyObserversOfCommitIndexChange(*updated_commit_index);
#pragma GCC diagnostic pop
  }

  return send_more_immediately;
}

PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(const string& uuid) {
  std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);
  TrackedPeer* tracked = FindOrDie(peers_map_, uuid);
  return *tracked;
}

int64_t PeerMessageQueue::GetAllReplicatedIndex() const {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  return queue_state_.all_replicated_index;
}

int64_t PeerMessageQueue::GetCommittedIndex() const {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  return queue_state_.committed_index;
}

bool PeerMessageQueue::IsCommittedIndexInCurrentTerm() const {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  return queue_state_.first_index_in_current_term.has_value() &&
      queue_state_.committed_index >= *queue_state_.first_index_in_current_term;
}

bool PeerMessageQueue::IsInLeaderMode() const {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  return queue_state_.mode == Mode::LEADER;
}

int64_t PeerMessageQueue::GetMajorityReplicatedIndexForTests() const {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  return queue_state_.majority_replicated_index;
}


void PeerMessageQueue::UpdateMetricsUnlocked() {
  DCHECK(queue_lock_.is_locked());
  // Since operations have consecutive indices we can update the metrics based
  // on simple index math.
  // For non-leaders, majority_done_ops isn't meaningful because followers don't
  // track when an op is replicated to all peers.
  metrics_.num_majority_done_ops->set_value(queue_state_.mode == LEADER ?
    queue_state_.committed_index - queue_state_.all_replicated_index
    : 0);
  metrics_.num_in_progress_ops->set_value(
    queue_state_.last_appended.index() - queue_state_.committed_index);

  UpdateLagMetricsUnlocked();
}

void PeerMessageQueue::UpdateLagMetricsUnlocked() {
  DCHECK(queue_lock_.is_locked());
  metrics_.num_ops_behind_leader->set_value(queue_state_.mode == LEADER ? 0 :
      queue_state_.last_idx_appended_to_leader - queue_state_.last_appended.index());
}

void PeerMessageQueue::DumpToStrings(vector<string>* lines) const {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  DumpToStringsUnlocked(lines);
}

void PeerMessageQueue::DumpToStringsUnlocked(vector<string>* lines) const {
  DCHECK(queue_lock_.is_locked());
  lines->push_back("Watermarks:");
  for (const PeersMap::value_type& entry : peers_map_) {
    lines->push_back(
        Substitute("Peer: $0 Watermark: $1", entry.first, entry.second->ToString()));
  }

  log_cache_.DumpToStrings(lines);
}

void PeerMessageQueue::DumpToHtml(std::ostream& out) const {
  using std::endl;

  std::lock_guard<simple_spinlock> lock(queue_lock_);
  out << "<h3>Watermarks</h3>" << endl;
  out << "<table>" << endl;
  out << "  <tr><th>Peer</th><th>Watermark</th></tr>" << endl;
  for (const PeersMap::value_type& entry : peers_map_) {
    out << Substitute("  <tr><td>$0</td><td>$1</td></tr>",
                      EscapeForHtmlToString(entry.first),
                      EscapeForHtmlToString(entry.second->ToString())) << endl;
  }
  out << "</table>" << endl;
  out << "<p>" << queue_state_.ToString() << "</p>" << endl;

  log_cache_.DumpToHtml(out);
}

void PeerMessageQueue::ClearUnlocked() {
  DCHECK(queue_lock_.is_locked());
  STLDeleteValues(&peers_map_);
  queue_state_.state = kQueueClosed;
}

void PeerMessageQueue::Close() {
  raft_pool_observers_token_->Shutdown();

  std::lock_guard<simple_spinlock> lock(queue_lock_);
  ClearUnlocked();
}

int64_t PeerMessageQueue::GetQueuedOperationsSizeBytesForTests() const {
  return log_cache_.BytesUsed();
}

string PeerMessageQueue::ToString() const {
  // Even though metrics are thread-safe obtain the lock so that we get
  // a "consistent" snapshot of the metrics.
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  return ToStringUnlocked();
}

string PeerMessageQueue::ToStringUnlocked() const {
  DCHECK(queue_lock_.is_locked());
  return Substitute("Consensus queue metrics: "
                    "Only Majority Done Ops: $0, In Progress Ops: $1, Cache: $2",
                    metrics_.num_majority_done_ops->value(), metrics_.num_in_progress_ops->value(),
                    log_cache_.StatsString());
}

void PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  auto iter = std::find(observers_.begin(), observers_.end(), observer);
  if (iter == observers_.end()) {
    observers_.push_back(observer);
  }
}

Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) {
  std::lock_guard<simple_spinlock> lock(queue_lock_);
  auto iter = std::find(observers_.begin(), observers_.end(), observer);
  if (iter == observers_.end()) {
    return Status::NotFound("Can't find observer.");
  }
  observers_.erase(iter);
  return Status::OK();
}

bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const {
  OpId log_op;
  Status s = log_cache_.LookupOpId(desired_op.index(), &log_op);
  if (PREDICT_TRUE(s.ok())) {
    return OpIdEquals(desired_op, log_op);
  }
  if (PREDICT_TRUE(s.IsNotFound() || s.IsIncomplete())) {
    return false;
  }
  LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error while reading the log: " << s.ToString();
  return false; // Unreachable; here to squelch GCC warning.
}

void PeerMessageQueue::NotifyObserversOfCommitIndexChange(int64_t new_commit_index) {
  WARN_NOT_OK(raft_pool_observers_token_->Submit(
      [=](){ this->NotifyObserversTask(
          [=](PeerMessageQueueObserver* observer) {
            observer->NotifyCommitIndex(new_commit_index);
          });
      }),
      LogPrefixUnlocked() + "Unable to notify RaftConsensus of commit index change.");
}

void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
  WARN_NOT_OK(raft_pool_observers_token_->Submit(
      [=](){ this->NotifyObserversTask(
          [=](PeerMessageQueueObserver* observer) {
            observer->NotifyTermChange(term);
          });
      }),
      LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change.");
}

void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
                                                       int64_t term,
                                                       const string& reason) {
  WARN_NOT_OK(raft_pool_observers_token_->Submit(
      [=](){ this->NotifyObserversTask(
          [=](PeerMessageQueueObserver* observer) {
            observer->NotifyFailedFollower(uuid, term, reason);
          });
      }),
      LogPrefixUnlocked() + "Unable to notify RaftConsensus of abandoned follower.");
}

void PeerMessageQueue::NotifyObserversOfPeerToPromote(const string& peer_uuid) {
  WARN_NOT_OK(raft_pool_observers_token_->Submit(
      [=](){ this->NotifyObserversTask(
          [=](PeerMessageQueueObserver* observer) {
            observer->NotifyPeerToPromote(peer_uuid);
          });
      }),
      LogPrefixUnlocked() + "Unable to notify RaftConsensus of peer to promote.");
}

void PeerMessageQueue::NotifyObserversOfSuccessor(const string& peer_uuid) {
  DCHECK(queue_lock_.is_locked());
  WARN_NOT_OK(raft_pool_observers_token_->Submit(
      [=](){ this->NotifyObserversTask(
          [=](PeerMessageQueueObserver* observer) {
            observer->NotifyPeerToStartElection(peer_uuid);
          });
      }),
      LogPrefixUnlocked() + "Unable to notify RaftConsensus of available successor.");
}

void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
  WARN_NOT_OK(raft_pool_observers_token_->Submit(
      [=](){ this->NotifyObserversTask(
          [](PeerMessageQueueObserver* observer) {
            observer->NotifyPeerHealthChange();
          });
      }),
      LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change.");
}

void PeerMessageQueue::NotifyObserversTask(
    const std::function<void(PeerMessageQueueObserver*)>& func) {
  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
  std::vector<PeerMessageQueueObserver*> observers_copy;
  {
    std::lock_guard<simple_spinlock> lock(queue_lock_);
    observers_copy = observers_;
  }
  for (PeerMessageQueueObserver* observer : observers_copy) {
    func(observer);
  }
}

PeerMessageQueue::~PeerMessageQueue() {
  Close();
}

string PeerMessageQueue::LogPrefixUnlocked() const {
  // TODO: we should probably use an atomic here. We'll just annotate
  // away the TSAN error for now, since the worst case is a slightly out-of-date
  // log message, and not very likely.
  Mode mode = ANNOTATE_UNPROTECTED_READ(queue_state_.mode);
  return Substitute("T $0 P $1 [$2]: ",
                    tablet_id_,
                    local_peer_pb_.permanent_uuid(),
                    mode == LEADER ? "LEADER" : "NON_LEADER");
}

string PeerMessageQueue::QueueState::ToString() const {
  return Substitute("All replicated index: $0, Majority replicated index: $1, "
      "Committed index: $2, Last appended: $3, Last appended by leader: $4, Current term: $5, "
      "Majority size: $6, State: $7, Mode: $8$9",
      all_replicated_index, majority_replicated_index,
      committed_index, OpIdToString(last_appended), last_idx_appended_to_leader, current_term,
      majority_size_, state, (mode == LEADER ? "LEADER" : "NON_LEADER"),
      active_config ? ", active raft config: " + SecureShortDebugString(*active_config) : "");
}

}  // namespace consensus
}  // namespace kudu
