blob: 742dbeb064430afece400b6eab161e31c4f18e63 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <map>
#include <string>
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
namespace kudu {
class Status;
namespace consensus {
class TimeManager;
// Tracks the pending consensus rounds being managed by a Raft replica (either leader
// or follower).
//
// This class is not thread-safe.
//
// NOTE: each round is associated with a single round, though "round" refers to
// the logical Raft replication and "op" refers to the replicated batch of
// operations.
class PendingRounds {
public:
PendingRounds(std::string log_prefix, TimeManager* time_manager);
~PendingRounds();
// Set the committed op during startup. This should be done after appending
// any of the pending ops, and will take care of triggering any that are now
// considered committed.
Status SetInitialCommittedOpId(const OpId& committed_op);
// Returns the the ConsensusRound with the provided index, if there is any, or NULL
// if there isn't.
scoped_refptr<ConsensusRound> GetPendingOpByIndexOrNull(int64_t index);
// Add 'round' to the set of rounds waiting to be committed.
Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
// Advances the committed index.
// This is a no-op if the committed index has not changed.
Status AdvanceCommittedIndex(int64_t committed_index);
// Aborts pending operations after, but not including 'index'. The OpId with 'index'
// will become our new last received id. If there are pending operations with indexes
// higher than 'index' those operations are aborted.
void AbortOpsAfter(int64_t index);
// Returns true if an operation is in this replica's log, namely:
// - If the op's index is lower than or equal to our committed index
// - If the op id matches an inflight op.
// If an operation with the same index is in our log but the terms
// are different 'term_mismatch' is set to true, it is false otherwise.
bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch);
// Returns the id of the latest pending op (i.e. the one with the latest
// index). This must be called under the lock.
OpId GetLastPendingOpOpId() const;
// Used by replicas to cancel pending ops. Pending ops are those that have
// completed prepare/replicate but are waiting on the LEADER's commit to
// complete. This does not cancel ops being applied.
Status CancelPendingOps();
// Returns the number of ops that are currently in the pending state
// i.e. ops for which Prepare() is done or under way.
int GetNumPendingOps() const;
// Returns the watermark below which all operations are known to
// be committed according to consensus.
// TODO(todd): these should probably be removed in favor of using the queue.
int64_t GetCommittedIndex() const;
int64_t GetTermWithLastCommittedOp() const;
// Checks that 'current' correctly follows 'previous'. Specifically it checks
// that the term is the same or higher and that the index is sequential.
static Status CheckOpInSequence(const OpId& previous, const OpId& current);
private:
const std::string& LogPrefix() const { return log_prefix_; }
const std::string log_prefix_;
// Index=>Round map that manages pending ops, i.e. operations for which we've
// received a replicate message from the leader but have yet to be committed.
// The key is the index of the replicate operation.
typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
IndexToRoundMap pending_ops_;
// The OpId of the round that was last committed. Initialized to MinimumOpId().
OpId last_committed_op_id_;
TimeManager* time_manager_;
DISALLOW_COPY_AND_ASSIGN(PendingRounds);
};
} // namespace consensus
} // namespace kudu