// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/consensus/pending_rounds.h"

#include <iterator>
#include <ostream>
#include <utility>

#include <glog/logging.h>

#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/thread_restrictions.h"

using kudu::pb_util::SecureShortDebugString;
using std::string;
using strings::Substitute;

namespace kudu {
namespace consensus {

//------------------------------------------------------------
// PendingRounds
//------------------------------------------------------------

PendingRounds::PendingRounds(string log_prefix, TimeManager* time_manager)
    : log_prefix_(std::move(log_prefix)),
      last_committed_op_id_(MinimumOpId()),
      time_manager_(time_manager) {}

PendingRounds::~PendingRounds() {
}

Status PendingRounds::CancelPendingOps() {
  ThreadRestrictions::AssertWaitAllowed();
  if (pending_ops_.empty()) {
    return Status::OK();
  }

  LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_ops_.size()
                        << " pending ops.";
  // Abort ops in reverse index order. See KUDU-1678.
  for (auto op = pending_ops_.crbegin(); op != pending_ops_.crend(); ++op) {
    const scoped_refptr<ConsensusRound>& round = op->second;
    // We cancel only ops whose applies have not yet been triggered.
    LOG_WITH_PREFIX(INFO) << "Aborting op as it isn't in flight: "
                          << SecureShortDebugString(*round->replicate_msg());
    round->NotifyReplicationFinished(Status::Aborted("Op aborted"));
  }
  return Status::OK();
}

void PendingRounds::AbortOpsAfter(int64_t index) {
  LOG_WITH_PREFIX(INFO) << "Aborting all ops after (but not including) "
                        << index;

  DCHECK_GE(index, 0);
  OpId new_preceding;

  auto iter = pending_ops_.lower_bound(index);

  // Either the new preceding id is in the pendings set or it must be equal to the
  // committed index since we can't truncate already committed operations.
  if (iter != pending_ops_.end() && (*iter).first == index) {
    new_preceding = (*iter).second->replicate_msg()->id();
    ++iter;
  } else {
    CHECK_EQ(index, last_committed_op_id_.index());
    new_preceding = last_committed_op_id_;
  }

  for (; iter != pending_ops_.end();) {
    const scoped_refptr<ConsensusRound>& round = (*iter).second;
    auto op_type = round->replicate_msg()->op_type();
    LOG_WITH_PREFIX(INFO)
        << "Aborting uncommitted " << OperationType_Name(op_type)
        << " operation due to leader change: " << round->replicate_msg()->id();

    round->NotifyReplicationFinished(Status::Aborted("Op aborted by new leader"));
    // Erase the entry from pendings.
    pending_ops_.erase(iter++);
  }
}

Status PendingRounds::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
  InsertOrDie(&pending_ops_, round->replicate_msg()->id().index(), round);
  return Status::OK();
}

scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t index) {
  return FindPtrOrNull(pending_ops_, index);
}

bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {

  *term_mismatch = false;

  if (op_id.index() <= GetCommittedIndex()) {
    return true;
  }

  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNull(op_id.index());
  if (!round) {
    return false;
  }

  if (round->id().term() != op_id.term()) {
    *term_mismatch = true;
    return false;
  }
  return true;
}

OpId PendingRounds::GetLastPendingOpOpId() const {
  return pending_ops_.empty()
      ? MinimumOpId() : (--pending_ops_.end())->second->id();
}

Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
  // If we already committed up to (or past) 'id' return.
  // This can happen in the case that multiple UpdateConsensus() calls end
  // up in the RPC queue at the same time, and then might get interleaved out
  // of order.
  if (last_committed_op_id_.index() >= committed_index) {
    VLOG_WITH_PREFIX(1)
      << "Already marked ops through " << last_committed_op_id_ << " as committed. "
      << "Now trying to mark " << committed_index << " which would be a no-op.";
    return Status::OK();
  }

  if (pending_ops_.empty()) {
    LOG(ERROR) << "Advancing commit index to " << committed_index
               << " from " << last_committed_op_id_
               << " we have no pending ops"
               << GetStackTrace();
    VLOG_WITH_PREFIX(1) << "No ops to mark as committed up to: "
                        << committed_index;
    return Status::OK();
  }

  // Start at the operation after the last committed one.
  auto iter = pending_ops_.upper_bound(last_committed_op_id_.index());
  // Stop at the operation after the last one we must commit.
  auto end_iter = pending_ops_.upper_bound(committed_index);
  CHECK(iter != pending_ops_.end());

  VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
      <<  last_committed_op_id_
      << " Starting to apply from log index: " << (*iter).first;

  while (iter != end_iter) {
    scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
    DCHECK(round);
    const OpId& current_id = round->id();

    if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
      CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
    }

    pending_ops_.erase(iter++);
    last_committed_op_id_ = round->id();
    time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
    round->NotifyReplicationFinished(Status::OK());
  }

  return Status::OK();
}

Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) {
  CHECK_EQ(last_committed_op_id_.index(), 0);
  if (!pending_ops_.empty()) {
    int64_t first_pending_index = pending_ops_.begin()->first;
    if (committed_op.index() < first_pending_index) {
      if (committed_op.index() != first_pending_index - 1) {
        return Status::Corruption(Substitute(
            "pending operations should start at first operation "
            "after the committed operation (committed=$0, first pending=$1)",
            OpIdToString(committed_op), first_pending_index));
      }
      last_committed_op_id_ = committed_op;
    }

    RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index()));
    CHECK_EQ(SecureShortDebugString(last_committed_op_id_),
             SecureShortDebugString(committed_op));

  } else {
    last_committed_op_id_ = committed_op;
  }
  return Status::OK();
}

Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) {
  if (current.term() < previous.term()) {
    return Status::Corruption(Substitute("New operation's term is not >= than the previous "
        "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
  }
  if (current.index() != previous.index() + 1) {
    return Status::Corruption(Substitute("New operation's index does not follow the previous"
        " op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
  }
  return Status::OK();
}

int64_t PendingRounds::GetCommittedIndex() const {
  return last_committed_op_id_.index();
}

int64_t PendingRounds::GetTermWithLastCommittedOp() const {
  return last_committed_op_id_.term();
}

int PendingRounds::GetNumPendingOps() const {
  return pending_ops_.size();
}

}  // namespace consensus
}  // namespace kudu
