blob: 240ae4fc7e670f2d27b413ac8e81e8775d716b69 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus_state.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/logging.h"
#include "kudu/util/status.h"
#include "kudu/util/trace.h"
namespace kudu {
namespace consensus {
using std::string;
using strings::Substitute;
using strings::SubstituteAndAppend;
//////////////////////////////////////////////////
// ReplicaState
//////////////////////////////////////////////////
ReplicaState::ReplicaState(ConsensusOptions options, string peer_uuid,
gscoped_ptr<ConsensusMetadata> cmeta,
ReplicaTransactionFactory* txn_factory)
: options_(std::move(options)),
peer_uuid_(std::move(peer_uuid)),
cmeta_(std::move(cmeta)),
next_index_(0),
txn_factory_(txn_factory),
last_received_op_id_(MinimumOpId()),
last_received_op_id_current_leader_(MinimumOpId()),
last_committed_index_(MinimumOpId()),
state_(kInitialized) {
CHECK(cmeta_) << "ConsensusMeta passed as NULL";
}
Status ReplicaState::StartUnlocked(const OpId& last_id_in_wal) {
DCHECK(update_lock_.is_locked());
// Our last persisted term can be higher than the last persisted operation
// (i.e. if we called an election) but reverse should never happen.
CHECK_LE(last_id_in_wal.term(), GetCurrentTermUnlocked()) << LogPrefixUnlocked()
<< "The last op in the WAL with id " << OpIdToString(last_id_in_wal)
<< " has a term (" << last_id_in_wal.term() << ") that is greater "
<< "than the latest recorded term, which is " << GetCurrentTermUnlocked();
next_index_ = last_id_in_wal.index() + 1;
last_received_op_id_.CopyFrom(last_id_in_wal);
state_ = kRunning;
return Status::OK();
}
Status ReplicaState::LockForStart(UniqueLock* lock) const {
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(&update_lock_);
CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
<< " Replica is not in kInitialized state";
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::LockForRead(UniqueLock* lock) const {
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(&update_lock_);
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
ThreadRestrictions::AssertWaitAllowed();
DCHECK(!msg.has_id()) << "Should not have an ID yet: " << msg.ShortDebugString();
UniqueLock l(&update_lock_);
if (PREDICT_FALSE(state_ != kRunning)) {
return Status::IllegalState("Replica not in running state");
}
RETURN_NOT_OK(CheckActiveLeaderUnlocked());
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::LockForCommit(UniqueLock* lock) const {
TRACE_EVENT0("consensus", "ReplicaState::LockForCommit");
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(&update_lock_);
if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
return Status::IllegalState("Replica not in running state");
}
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::LockForMajorityReplicatedIndexUpdate(UniqueLock* lock) const {
TRACE_EVENT0("consensus", "ReplicaState::LockForMajorityReplicatedIndexUpdate");
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(&update_lock_);
if (PREDICT_FALSE(state_ != kRunning)) {
return Status::IllegalState("Replica not in running state");
}
if (PREDICT_FALSE(GetActiveRoleUnlocked() != RaftPeerPB::LEADER)) {
return Status::IllegalState("Replica not LEADER");
}
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::CheckActiveLeaderUnlocked() const {
RaftPeerPB::Role role = GetActiveRoleUnlocked();
switch (role) {
case RaftPeerPB::LEADER:
return Status::OK();
default:
ConsensusStatePB cstate = ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE);
return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. "
"Consensus state: $2",
peer_uuid_,
RaftPeerPB::Role_Name(role),
cstate.ShortDebugString()));
}
}
Status ReplicaState::LockForConfigChange(UniqueLock* lock) const {
TRACE_EVENT0("consensus", "ReplicaState::LockForConfigChange");
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(&update_lock_);
// Can only change the config on running replicas.
if (PREDICT_FALSE(state_ != kRunning)) {
return Status::IllegalState("Unable to lock ReplicaState for config change",
Substitute("State = $0", state_));
}
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::LockForUpdate(UniqueLock* lock) const {
TRACE_EVENT0("consensus", "ReplicaState::LockForUpdate");
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(&update_lock_);
if (PREDICT_FALSE(state_ != kRunning)) {
return Status::IllegalState("Replica not in running state");
}
if (!IsRaftConfigVoter(peer_uuid_, ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE).config())) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
}
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::LockForShutdown(UniqueLock* lock) {
TRACE_EVENT0("consensus", "ReplicaState::LockForShutdown");
ThreadRestrictions::AssertWaitAllowed();
UniqueLock l(&update_lock_);
if (state_ != kShuttingDown && state_ != kShutDown) {
state_ = kShuttingDown;
}
lock->swap(&l);
return Status::OK();
}
Status ReplicaState::ShutdownUnlocked() {
DCHECK(update_lock_.is_locked());
CHECK_EQ(state_, kShuttingDown);
state_ = kShutDown;
return Status::OK();
}
RaftPeerPB::Role ReplicaState::GetActiveRoleUnlocked() const {
DCHECK(update_lock_.is_locked());
return cmeta_->active_role();
}
bool ReplicaState::IsConfigChangePendingUnlocked() const {
DCHECK(update_lock_.is_locked());
return cmeta_->has_pending_config();
}
Status ReplicaState::CheckNoConfigChangePendingUnlocked() const {
DCHECK(update_lock_.is_locked());
if (IsConfigChangePendingUnlocked()) {
return Status::IllegalState(
Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
" Committed config: $0.\n Pending config: $1",
GetCommittedConfigUnlocked().ShortDebugString(),
GetPendingConfigUnlocked().ShortDebugString()));
}
return Status::OK();
}
Status ReplicaState::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
DCHECK(update_lock_.is_locked());
RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM),
"Invalid config to set as pending");
CHECK(!cmeta_->has_pending_config())
<< "Attempt to set pending config while another is already pending! "
<< "Existing pending config: " << cmeta_->pending_config().ShortDebugString() << "; "
<< "Attempted new pending config: " << new_config.ShortDebugString();
cmeta_->set_pending_config(new_config);
return Status::OK();
}
void ReplicaState::ClearPendingConfigUnlocked() {
cmeta_->clear_pending_config();
}
const RaftConfigPB& ReplicaState::GetPendingConfigUnlocked() const {
DCHECK(update_lock_.is_locked());
CHECK(IsConfigChangePendingUnlocked()) << "No pending config";
return cmeta_->pending_config();
}
Status ReplicaState::SetCommittedConfigUnlocked(const RaftConfigPB& committed_config) {
TRACE_EVENT0("consensus", "ReplicaState::SetCommittedConfigUnlocked");
DCHECK(update_lock_.is_locked());
DCHECK(committed_config.IsInitialized());
RETURN_NOT_OK_PREPEND(VerifyRaftConfig(committed_config, COMMITTED_QUORUM),
"Invalid config to set as committed");
// Compare committed with pending configuration, ensure they are the same.
// Pending will not have an opid_index, so ignore that field.
DCHECK(cmeta_->has_pending_config());
RaftConfigPB config_no_opid = committed_config;
config_no_opid.clear_opid_index();
const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
// Quorums must be exactly equal, even w.r.t. peer ordering.
CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(), config_no_opid.SerializeAsString())
<< Substitute("New committed config must equal pending config, but does not. "
"Pending config: $0, committed config: $1",
pending_config.ShortDebugString(), committed_config.ShortDebugString());
cmeta_->set_committed_config(committed_config);
cmeta_->clear_pending_config();
CHECK_OK(cmeta_->Flush());
return Status::OK();
}
const RaftConfigPB& ReplicaState::GetCommittedConfigUnlocked() const {
DCHECK(update_lock_.is_locked());
return cmeta_->committed_config();
}
const RaftConfigPB& ReplicaState::GetActiveConfigUnlocked() const {
DCHECK(update_lock_.is_locked());
return cmeta_->active_config();
}
bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
*term_mismatch = false;
if (op_id.index() <= GetCommittedOpIdUnlocked().index()) {
return true;
}
if (op_id.index() > GetLastReceivedOpIdUnlocked().index()) {
return false;
}
scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index());
DCHECK(round);
if (round->id().term() != op_id.term()) {
*term_mismatch = true;
return false;
}
return true;
}
Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term) {
TRACE_EVENT1("consensus", "ReplicaState::SetCurrentTermUnlocked",
"term", new_term);
DCHECK(update_lock_.is_locked());
if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
return Status::IllegalState(
Substitute("Cannot change term to a term that is lower than or equal to the current one. "
"Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
}
cmeta_->set_current_term(new_term);
cmeta_->clear_voted_for();
CHECK_OK(cmeta_->Flush());
ClearLeaderUnlocked();
last_received_op_id_current_leader_ = MinimumOpId();
return Status::OK();
}
const int64_t ReplicaState::GetCurrentTermUnlocked() const {
DCHECK(update_lock_.is_locked());
return cmeta_->current_term();
}
void ReplicaState::SetLeaderUuidUnlocked(const std::string& uuid) {
DCHECK(update_lock_.is_locked());
cmeta_->set_leader_uuid(uuid);
}
const string& ReplicaState::GetLeaderUuidUnlocked() const {
DCHECK(update_lock_.is_locked());
return cmeta_->leader_uuid();
}
const bool ReplicaState::HasVotedCurrentTermUnlocked() const {
DCHECK(update_lock_.is_locked());
return cmeta_->has_voted_for();
}
Status ReplicaState::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
TRACE_EVENT1("consensus", "ReplicaState::SetVotedForCurrentTermUnlocked",
"uuid", uuid);
DCHECK(update_lock_.is_locked());
cmeta_->set_voted_for(uuid);
CHECK_OK(cmeta_->Flush());
return Status::OK();
}
const std::string& ReplicaState::GetVotedForCurrentTermUnlocked() const {
DCHECK(update_lock_.is_locked());
DCHECK(cmeta_->has_voted_for());
return cmeta_->voted_for();
}
ReplicaTransactionFactory* ReplicaState::GetReplicaTransactionFactoryUnlocked() const {
return txn_factory_;
}
const string& ReplicaState::GetPeerUuid() const {
return peer_uuid_;
}
const ConsensusOptions& ReplicaState::GetOptions() const {
return options_;
}
int ReplicaState::GetNumPendingTxnsUnlocked() const {
DCHECK(update_lock_.is_locked());
return pending_txns_.size();
}
Status ReplicaState::CancelPendingTransactions() {
{
ThreadRestrictions::AssertWaitAllowed();
UniqueLock lock(&update_lock_);
if (state_ != kShuttingDown) {
return Status::IllegalState("Can only wait for pending commits on kShuttingDown state.");
}
if (pending_txns_.empty()) {
return Status::OK();
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Trying to abort " << pending_txns_.size()
<< " pending transactions.";
for (const auto& txn : pending_txns_) {
const scoped_refptr<ConsensusRound>& round = txn.second;
// We cancel only transactions whose applies have not yet been triggered.
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting transaction as it isn't in flight: "
<< txn.second->replicate_msg()->ShortDebugString();
round->NotifyReplicationFinished(Status::Aborted("Transaction aborted"));
}
}
return Status::OK();
}
void ReplicaState::GetUncommittedPendingOperationsUnlocked(
vector<scoped_refptr<ConsensusRound> >* ops) {
for (const IndexToRoundMap::value_type& entry : pending_txns_) {
if (entry.first > last_committed_index_.index()) {
ops->push_back(entry.second);
}
}
}
Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) {
DCHECK(update_lock_.is_locked());
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting all transactions after (but not including): "
<< new_preceding_idx << ". Current State: " << ToStringUnlocked();
DCHECK_GE(new_preceding_idx, 0);
OpId new_preceding;
auto iter = pending_txns_.lower_bound(new_preceding_idx);
// Either the new preceding id is in the pendings set or it must be equal to the
// committed index since we can't truncate already committed operations.
if (iter != pending_txns_.end() && (*iter).first == new_preceding_idx) {
new_preceding = (*iter).second->replicate_msg()->id();
++iter;
} else {
CHECK_EQ(new_preceding_idx, last_committed_index_.index());
new_preceding = last_committed_index_;
}
// This is the same as UpdateLastReceivedOpIdUnlocked() but we do it
// here to avoid the bounds check, since we're breaking monotonicity.
last_received_op_id_ = new_preceding;
last_received_op_id_current_leader_ = last_received_op_id_;
next_index_ = new_preceding.index() + 1;
for (; iter != pending_txns_.end();) {
const scoped_refptr<ConsensusRound>& round = (*iter).second;
auto op_type = round->replicate_msg()->op_type();
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Aborting uncommitted " << OperationType_Name(op_type)
<< " operation due to leader change: " << round->replicate_msg()->id();
// When aborting a config-change operation, go back to using the committed
// configuration.
if (PREDICT_FALSE(op_type == CHANGE_CONFIG_OP)) {
CHECK(IsConfigChangePendingUnlocked())
<< LogPrefixUnlocked() << "Aborting CHANGE_CONFIG_OP but "
<< "there was no pending config set. Op: "
<< round->replicate_msg()->ShortDebugString();
ClearPendingConfigUnlocked();
}
round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader"));
// Erase the entry from pendings.
pending_txns_.erase(iter++);
}
return Status::OK();
}
Status ReplicaState::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
DCHECK(update_lock_.is_locked());
if (PREDICT_FALSE(state_ != kRunning)) {
// Special case when we're configuring and this is a config change, refuse
// everything else.
// TODO: Don't require a NO_OP to get to kRunning state
if (round->replicate_msg()->op_type() != NO_OP) {
return Status::IllegalState("Cannot trigger prepare. Replica is not in kRunning state.");
}
}
// Mark pending configuration.
if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
DCHECK(round->replicate_msg()->change_config_record().has_old_config());
DCHECK(round->replicate_msg()->change_config_record().old_config().has_opid_index());
DCHECK(round->replicate_msg()->change_config_record().has_new_config());
DCHECK(!round->replicate_msg()->change_config_record().new_config().has_opid_index());
const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
// The leader has to mark the configuration as pending before it gets here
// because the active configuration affects the replication queue.
// Do one last sanity check.
Status s = CheckNoConfigChangePendingUnlocked();
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndAppend(Substitute("\n New config: $0", new_config.ShortDebugString()));
LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
return s;
}
// Check if the pending Raft config has an OpId less than the committed
// config. If so, this is a replay at startup in which the COMMIT
// messages were delayed.
const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
if (round->replicate_msg()->id().index() > committed_config.opid_index()) {
CHECK_OK(SetPendingConfigUnlocked(new_config));
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Ignoring setting pending config change with OpId "
<< round->replicate_msg()->id() << " because the committed config has OpId index "
<< committed_config.opid_index() << ". The config change we are ignoring is: "
<< "Old config: { " << old_config.ShortDebugString() << " }. "
<< "New config: { " << new_config.ShortDebugString() << " }";
}
}
}
InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round);
return Status::OK();
}
scoped_refptr<ConsensusRound> ReplicaState::GetPendingOpByIndexOrNullUnlocked(int64_t index) {
DCHECK(update_lock_.is_locked());
return FindPtrOrNull(pending_txns_, index);
}
Status ReplicaState::UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
OpId* committed_index,
bool* committed_index_changed) {
DCHECK(update_lock_.is_locked());
DCHECK(majority_replicated.IsInitialized());
DCHECK(last_committed_index_.IsInitialized());
if (PREDICT_FALSE(state_ == kShuttingDown || state_ == kShutDown)) {
return Status::ServiceUnavailable("Cannot trigger apply. Replica is shutting down.");
}
if (PREDICT_FALSE(state_ != kRunning)) {
return Status::IllegalState("Cannot trigger apply. Replica is not in kRunning state.");
}
// If the last committed operation was in the current term (the normal case)
// then 'committed_index' is simply equal to majority replicated.
if (last_committed_index_.term() == GetCurrentTermUnlocked()) {
RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated,
committed_index_changed));
committed_index->CopyFrom(last_committed_index_);
return Status::OK();
}
// If the last committed operation is not in the current term (such as when
// we change leaders) but 'majority_replicated' is then we can advance the
// 'committed_index' too.
if (majority_replicated.term() == GetCurrentTermUnlocked()) {
OpId previous = last_committed_index_;
RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated,
committed_index_changed));
committed_index->CopyFrom(last_committed_index_);
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advanced the committed_index across terms."
<< " Last committed operation was: " << previous.ShortDebugString()
<< " New committed index is: " << last_committed_index_.ShortDebugString();
return Status::OK();
}
committed_index->CopyFrom(last_committed_index_);
KLOG_EVERY_N_SECS(WARNING, 1) << LogPrefixUnlocked()
<< "Can't advance the committed index across term boundaries"
<< " until operations from the current term are replicated."
<< " Last committed operation was: " << last_committed_index_.ShortDebugString() << ","
<< " New majority replicated is: " << majority_replicated.ShortDebugString() << ","
<< " Current term is: " << GetCurrentTermUnlocked();
return Status::OK();
}
Status ReplicaState::AdvanceCommittedIndexUnlocked(const OpId& committed_index,
bool *committed_index_changed) {
*committed_index_changed = false;
// If we already committed up to (or past) 'id' return.
// This can happen in the case that multiple UpdateConsensus() calls end
// up in the RPC queue at the same time, and then might get interleaved out
// of order.
if (last_committed_index_.index() >= committed_index.index()) {
VLOG_WITH_PREFIX_UNLOCKED(1)
<< "Already marked ops through " << last_committed_index_ << " as committed. "
<< "Now trying to mark " << committed_index << " which would be a no-op.";
return Status::OK();
}
if (pending_txns_.empty()) {
last_committed_index_.CopyFrom(committed_index);
VLOG_WITH_PREFIX_UNLOCKED(1) << "No transactions to mark as committed up to: "
<< committed_index.ShortDebugString();
return Status::OK();
}
// Start at the operation after the last committed one.
auto iter = pending_txns_.upper_bound(last_committed_index_.index());
// Stop at the operation after the last one we must commit.
auto end_iter = pending_txns_.upper_bound(committed_index.index());
CHECK(iter != pending_txns_.end());
VLOG_WITH_PREFIX_UNLOCKED(1) << "Last triggered apply was: "
<< last_committed_index_.ShortDebugString()
<< " Starting to apply from log index: " << (*iter).first;
OpId prev_id = last_committed_index_;
while (iter != end_iter) {
scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
DCHECK(round);
const OpId& current_id = round->id();
if (PREDICT_TRUE(!OpIdEquals(prev_id, MinimumOpId()))) {
CHECK_OK(CheckOpInSequence(prev_id, current_id));
}
pending_txns_.erase(iter++);
// Set committed configuration.
if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
DCHECK(round->replicate_msg()->change_config_record().has_old_config());
DCHECK(round->replicate_msg()->change_config_record().has_new_config());
RaftConfigPB old_config = round->replicate_msg()->change_config_record().old_config();
RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config();
DCHECK(old_config.has_opid_index());
DCHECK(!new_config.has_opid_index());
new_config.set_opid_index(current_id.index());
// Check if the pending Raft config has an OpId less than the committed
// config. If so, this is a replay at startup in which the COMMIT
// messages were delayed.
const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
if (new_config.opid_index() > committed_config.opid_index()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Committing config change with OpId "
<< current_id << ". "
<< "Old config: { " << old_config.ShortDebugString() << " }. "
<< "New config: { " << new_config.ShortDebugString() << " }";
CHECK_OK(SetCommittedConfigUnlocked(new_config));
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Ignoring commit of config change with OpId "
<< current_id << " because the committed config has OpId index "
<< committed_config.opid_index() << ". The config change we are ignoring is: "
<< "Old config: { " << old_config.ShortDebugString() << " }. "
<< "New config: { " << new_config.ShortDebugString() << " }";
}
}
prev_id.CopyFrom(round->id());
round->NotifyReplicationFinished(Status::OK());
}
last_committed_index_.CopyFrom(committed_index);
*committed_index_changed = true;
return Status::OK();
}
const OpId& ReplicaState::GetCommittedOpIdUnlocked() const {
DCHECK(update_lock_.is_locked());
return last_committed_index_;
}
Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const {
int64_t term = GetCurrentTermUnlocked();
const OpId& opid = GetCommittedOpIdUnlocked();
if (opid.term() != term) {
return Status::IllegalState("Latest committed op is not from this term", OpIdToString(opid));
}
return Status::OK();
}
void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpId& op_id) {
DCHECK(update_lock_.is_locked());
DCHECK_LE(OpIdCompare(last_received_op_id_, op_id), 0)
<< "Previously received OpId: " << last_received_op_id_.ShortDebugString()
<< ", updated OpId: " << op_id.ShortDebugString()
<< ", Trace:" << std::endl << Trace::CurrentTrace()->DumpToString(true);
last_received_op_id_ = op_id;
last_received_op_id_current_leader_ = last_received_op_id_;
next_index_ = op_id.index() + 1;
}
const OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const {
DCHECK(update_lock_.is_locked());
return last_received_op_id_;
}
const OpId& ReplicaState::GetLastReceivedOpIdCurLeaderUnlocked() const {
DCHECK(update_lock_.is_locked());
return last_received_op_id_current_leader_;
}
OpId ReplicaState::GetLastPendingTransactionOpIdUnlocked() const {
DCHECK(update_lock_.is_locked());
return pending_txns_.empty()
? MinimumOpId() : (--pending_txns_.end())->second->id();
}
void ReplicaState::NewIdUnlocked(OpId* id) {
DCHECK(update_lock_.is_locked());
id->set_term(GetCurrentTermUnlocked());
id->set_index(next_index_++);
}
void ReplicaState::CancelPendingOperation(const OpId& id) {
OpId previous = id;
previous.set_index(previous.index() - 1);
DCHECK(update_lock_.is_locked());
CHECK_EQ(GetCurrentTermUnlocked(), id.term());
CHECK_EQ(next_index_, id.index() + 1);
next_index_ = id.index();
// We don't use UpdateLastReceivedOpIdUnlocked because we're actually
// updating it back to a lower value and we need to avoid the checks
// that method has.
// This is only ok if we do _not_ release the lock after calling
// NewIdUnlocked() (which we don't in RaftConsensus::Replicate()).
last_received_op_id_ = previous;
scoped_refptr<ConsensusRound> round = EraseKeyReturnValuePtr(&pending_txns_, id.index());
DCHECK(round);
}
string ReplicaState::LogPrefix() {
ReplicaState::UniqueLock lock;
CHECK_OK(LockForRead(&lock));
return LogPrefixUnlocked();
}
string ReplicaState::LogPrefixUnlocked() const {
DCHECK(update_lock_.is_locked());
return Substitute("T $0 P $1 [term $2 $3]: ",
options_.tablet_id,
peer_uuid_,
GetCurrentTermUnlocked(),
RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
}
string ReplicaState::LogPrefixThreadSafe() const {
return Substitute("T $0 P $1: ",
options_.tablet_id,
peer_uuid_);
}
ReplicaState::State ReplicaState::state() const {
DCHECK(update_lock_.is_locked());
return state_;
}
string ReplicaState::ToString() const {
ThreadRestrictions::AssertWaitAllowed();
ReplicaState::UniqueLock lock(&update_lock_);
return ToStringUnlocked();
}
string ReplicaState::ToStringUnlocked() const {
DCHECK(update_lock_.is_locked());
string ret;
SubstituteAndAppend(&ret, "Replica: $0, State: $1, Role: $2\n",
peer_uuid_, state_,
RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
SubstituteAndAppend(&ret, "Watermarks: {Received: $0 Committed: $1}\n",
last_received_op_id_.ShortDebugString(),
last_committed_index_.ShortDebugString());
return ret;
}
Status ReplicaState::CheckOpInSequence(const OpId& previous, const OpId& current) {
if (current.term() < previous.term()) {
return Status::Corruption(Substitute("New operation's term is not >= than the previous "
"op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
}
if (current.index() != previous.index() + 1) {
return Status::Corruption(Substitute("New operation's index does not follow the previous"
" op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
}
return Status::OK();
}
} // namespace consensus
} // namespace kudu