blob: 9200b074d028082065a50873b77dd0255a8754f8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/pending_rounds.h"
#include <iterator>
#include <ostream>
#include <utility>
#include <glog/logging.h>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/thread_restrictions.h"
using kudu::pb_util::SecureShortDebugString;
using std::string;
using strings::Substitute;
namespace kudu {
namespace consensus {
//------------------------------------------------------------
// PendingRounds
//------------------------------------------------------------
PendingRounds::PendingRounds(string log_prefix, TimeManager* time_manager)
: log_prefix_(std::move(log_prefix)),
last_committed_op_id_(MinimumOpId()),
time_manager_(time_manager) {}
PendingRounds::~PendingRounds() {
}
Status PendingRounds::CancelPendingOps() {
ThreadRestrictions::AssertWaitAllowed();
if (pending_ops_.empty()) {
return Status::OK();
}
LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_ops_.size()
<< " pending ops.";
// Abort ops in reverse index order. See KUDU-1678.
for (auto op = pending_ops_.crbegin(); op != pending_ops_.crend(); ++op) {
const scoped_refptr<ConsensusRound>& round = op->second;
// We cancel only ops whose applies have not yet been triggered.
LOG_WITH_PREFIX(INFO) << "Aborting op as it isn't in flight: "
<< SecureShortDebugString(*round->replicate_msg());
round->NotifyReplicationFinished(Status::Aborted("Op aborted"));
}
return Status::OK();
}
void PendingRounds::AbortOpsAfter(int64_t index) {
LOG_WITH_PREFIX(INFO) << "Aborting all ops after (but not including) "
<< index;
DCHECK_GE(index, 0);
OpId new_preceding;
auto iter = pending_ops_.lower_bound(index);
// Either the new preceding id is in the pendings set or it must be equal to the
// committed index since we can't truncate already committed operations.
if (iter != pending_ops_.end() && (*iter).first == index) {
new_preceding = (*iter).second->replicate_msg()->id();
++iter;
} else {
CHECK_EQ(index, last_committed_op_id_.index());
new_preceding = last_committed_op_id_;
}
for (; iter != pending_ops_.end();) {
const scoped_refptr<ConsensusRound>& round = (*iter).second;
auto op_type = round->replicate_msg()->op_type();
LOG_WITH_PREFIX(INFO)
<< "Aborting uncommitted " << OperationType_Name(op_type)
<< " operation due to leader change: " << round->replicate_msg()->id();
round->NotifyReplicationFinished(Status::Aborted("Op aborted by new leader"));
// Erase the entry from pendings.
pending_ops_.erase(iter++);
}
}
Status PendingRounds::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
InsertOrDie(&pending_ops_, round->replicate_msg()->id().index(), round);
return Status::OK();
}
scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t index) {
return FindPtrOrNull(pending_ops_, index);
}
bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
*term_mismatch = false;
if (op_id.index() <= GetCommittedIndex()) {
return true;
}
scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNull(op_id.index());
if (!round) {
return false;
}
if (round->id().term() != op_id.term()) {
*term_mismatch = true;
return false;
}
return true;
}
OpId PendingRounds::GetLastPendingOpOpId() const {
return pending_ops_.empty()
? MinimumOpId() : (--pending_ops_.end())->second->id();
}
Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
// If we already committed up to (or past) 'id' return.
// This can happen in the case that multiple UpdateConsensus() calls end
// up in the RPC queue at the same time, and then might get interleaved out
// of order.
if (last_committed_op_id_.index() >= committed_index) {
VLOG_WITH_PREFIX(1)
<< "Already marked ops through " << last_committed_op_id_ << " as committed. "
<< "Now trying to mark " << committed_index << " which would be a no-op.";
return Status::OK();
}
if (pending_ops_.empty()) {
LOG(ERROR) << "Advancing commit index to " << committed_index
<< " from " << last_committed_op_id_
<< " we have no pending ops"
<< GetStackTrace();
VLOG_WITH_PREFIX(1) << "No ops to mark as committed up to: "
<< committed_index;
return Status::OK();
}
// Start at the operation after the last committed one.
auto iter = pending_ops_.upper_bound(last_committed_op_id_.index());
// Stop at the operation after the last one we must commit.
auto end_iter = pending_ops_.upper_bound(committed_index);
CHECK(iter != pending_ops_.end());
VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
<< last_committed_op_id_
<< " Starting to apply from log index: " << (*iter).first;
while (iter != end_iter) {
scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
DCHECK(round);
const OpId& current_id = round->id();
if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
}
pending_ops_.erase(iter++);
last_committed_op_id_ = round->id();
time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
round->NotifyReplicationFinished(Status::OK());
}
return Status::OK();
}
Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) {
CHECK_EQ(last_committed_op_id_.index(), 0);
if (!pending_ops_.empty()) {
int64_t first_pending_index = pending_ops_.begin()->first;
if (committed_op.index() < first_pending_index) {
if (committed_op.index() != first_pending_index - 1) {
return Status::Corruption(Substitute(
"pending operations should start at first operation "
"after the committed operation (committed=$0, first pending=$1)",
OpIdToString(committed_op), first_pending_index));
}
last_committed_op_id_ = committed_op;
}
RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index()));
CHECK_EQ(SecureShortDebugString(last_committed_op_id_),
SecureShortDebugString(committed_op));
} else {
last_committed_op_id_ = committed_op;
}
return Status::OK();
}
Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) {
if (current.term() < previous.term()) {
return Status::Corruption(Substitute("New operation's term is not >= than the previous "
"op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
}
if (current.index() != previous.index() + 1) {
return Status::Corruption(Substitute("New operation's index does not follow the previous"
" op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
}
return Status::OK();
}
int64_t PendingRounds::GetCommittedIndex() const {
return last_committed_op_id_.index();
}
int64_t PendingRounds::GetTermWithLastCommittedOp() const {
return last_committed_op_id_.term();
}
int PendingRounds::GetNumPendingOps() const {
return pending_ops_.size();
}
} // namespace consensus
} // namespace kudu