blob: 5d5447d5f229b41b423591d3a959fadb818f06c1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/optional/optional_fwd.hpp>
#include <iosfwd>
#include <memory>
#include <string>
#include <vector>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
namespace kudu {
class MonoDelta;
namespace log {
class Log;
struct RetentionIndexes;
namespace master {
class SysCatalogTable;
namespace server {
class Clock;
namespace tablet {
class TabletPeer;
namespace tserver {
class TabletServerErrorPB;
namespace consensus {
class ConsensusCommitContinuation;
class ConsensusRound;
class ReplicaTransactionFactory;
typedef int64_t ConsensusTerm;
typedef StatusCallback ConsensusReplicatedCallback;
struct ConsensusOptions {
std::string tablet_id;
// After completing bootstrap, some of the results need to be plumbed through
// into the consensus implementation.
struct ConsensusBootstrapInfo {
// The id of the last operation in the log
OpId last_id;
// The id of the last committed operation in the log.
OpId last_committed_id;
// REPLICATE messages which were in the log with no accompanying
// COMMIT. These need to be passed along to consensus init in order
// to potentially commit them.
// These are owned by the ConsensusBootstrapInfo instance.
std::vector<ReplicateMsg*> orphaned_replicates;
// The external interface for a consensus peer.
// Note: Even though Consensus points to Log, it needs to be destroyed
// after it. See Log class header comment for the reason why. On the other
// hand Consensus must be quiesced before closing the log, otherwise it
// will try to write to a destroyed/closed log.
// The order of these operations on shutdown must therefore be:
// 1 - quiesce Consensus
// 2 - close/destroy Log
// 3 - destroy Consensus
class Consensus : public RefCountedThreadSafe<Consensus> {
Consensus() {}
// Starts running the consensus algorithm.
virtual Status Start(const ConsensusBootstrapInfo& info) = 0;
// Returns true if consensus is running.
virtual bool IsRunning() const = 0;
// Emulates a leader election by simply making this peer leader.
virtual Status EmulateElection() = 0;
// Modes for StartElection().
enum ElectionMode {
// A normal leader election. Peers will not vote for this node
// if they believe that a leader is alive.
// A "pre-election". Peers will vote as they would for a normal
// election, except that the votes will not be "binding". In other
// words, they will not durably record their vote.
// In this mode, peers will vote for this candidate even if they
// think a leader is alive. This can be used for a faster hand-off
// between a leader and one of its replicas.
// Reasons for StartElection().
enum ElectionReason {
// The election is being called because the Raft configuration has only
// a single node and has just started up.
// The election is being called because the timeout expired. In other
// words, the previous leader probably failed (or there was no leader
// in this term)
// The election is being started because of an explicit external request.
// Triggers a leader election.
virtual Status StartElection(ElectionMode mode, ElectionReason reason) = 0;
// Wait until the node has LEADER role.
// Returns Status::TimedOut if the role is not LEADER within 'timeout'.
virtual Status WaitUntilLeaderForTests(const MonoDelta& timeout) = 0;
// Implement a LeaderStepDown() request.
virtual Status StepDown(LeaderStepDownResponsePB* resp) {
return Status::NotSupported("Not implemented.");
// Creates a new ConsensusRound, the entity that owns all the data
// structures required for a consensus round, such as the ReplicateMsg
// (and later on the CommitMsg). ConsensusRound will also point to and
// increase the reference count for the provided callbacks.
scoped_refptr<ConsensusRound> NewRound(
gscoped_ptr<ReplicateMsg> replicate_msg,
const ConsensusReplicatedCallback& replicated_cb);
// Called by a Leader to replicate an entry to the state machine.
// From the leader instance perspective execution proceeds as follows:
// Leader RaftConfig
// + +
// 1) Req->| Replicate() |
// | |
// 2) +-------------replicate-------------->|
// |<---------------ACK------------------+
// | |
// 3) +--+ |
// <----+ round.NotifyReplicationFinished()|
// | |
// 3a) | +------ update commitIndex ------->|
// | |
// 1) Caller calls Replicate(), method returns immediately to the caller and
// runs asynchronously.
// 2) Leader replicates the entry to the peers using the consensus
// algorithm, proceeds as soon as a majority of voters acknowledges the
// entry.
// 3) Leader defers to the caller by calling ConsensusRound::NotifyReplicationFinished,
// which calls the ConsensusReplicatedCallback.
// 3a) The leader asynchronously notifies other peers of the new
// commit index, which tells them to apply the operation.
// This method can only be called on the leader, i.e. role() == LEADER
virtual Status Replicate(const scoped_refptr<ConsensusRound>& round) = 0;
// Ensures that the consensus implementation is currently acting as LEADER,
// and thus is allowed to submit operations to be prepared before they are
// replicated. To avoid a time-of-check-to-time-of-use (TOCTOU) race, the
// implementation also stores the current term inside the round's "bound_term"
// member. When we eventually are about to replicate the transaction, we verify
// that the term has not changed in the meantime.
virtual Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
return Status::OK();
// Messages sent from LEADER to FOLLOWERS and LEARNERS to update their
// state machines. This is equivalent to "AppendEntries()" in Raft
// terminology.
// ConsensusRequestPB contains a sequence of 0 or more operations to apply
// on the replica. If there are 0 operations the request is considered
// 'status-only' i.e. the leader is communicating with the follower only
// in order to pass back and forth information on watermarks (eg committed
// operation ID, replicated op id, etc).
// If the sequence contains 1 or more operations they will be replicated
// in the same order as the leader, and submitted for asynchronous Prepare
// in the same order.
// The leader also provides information on the index of the latest
// operation considered committed by consensus. The replica uses this
// information to update the state of any pending (previously replicated/prepared)
// transactions.
// Returns Status::OK if the response has been filled (regardless of accepting
// or rejecting the specific request). Returns non-OK Status if a specific
// error response could not be formed, which will result in the service
// returning an UNKNOWN_ERROR RPC error code to the caller and including the
// stringified Status message.
virtual Status Update(const ConsensusRequestPB* request,
ConsensusResponsePB* response) = 0;
// Messages sent from CANDIDATEs to voting peers to request their vote
// in leader election.
virtual Status RequestVote(const VoteRequestPB* request,
VoteResponsePB* response) = 0;
// Implement a ChangeConfig() request.
virtual Status ChangeConfig(const ChangeConfigRequestPB& req,
const StatusCallback& client_cb,
boost::optional<tserver::TabletServerErrorPB::Code>* error) {
return Status::NotSupported("Not implemented.");
// Returns the current Raft role of this instance.
virtual RaftPeerPB::Role role() const = 0;
// Returns the uuid of this peer.
virtual std::string peer_uuid() const = 0;
// Returns the id of the tablet whose updates this consensus instance helps coordinate.
virtual std::string tablet_id() const = 0;
// Returns a copy of the committed state of the Consensus system.
virtual ConsensusStatePB ConsensusState(ConsensusConfigType type) const = 0;
// Returns a copy of the current committed Raft configuration.
virtual RaftConfigPB CommittedConfig() const = 0;
virtual void DumpStatusHtml(std::ostream& out) const = 0;
// Stops running the consensus algorithm.
virtual void Shutdown() = 0;
// Returns the last OpId (either received or committed, depending on the
// 'type' argument) that the Consensus implementation knows about.
// Primarily used for testing purposes.
virtual Status GetLastOpId(OpIdType type, OpId* id) {
return Status::NotFound("Not implemented.");
// Return the log indexes which the consensus implementation would like to retain.
// The returned 'for_durability' index ensures that no logs are GCed before
// the operation is fully committed. The returned 'for_peers' index indicates
// the index of the farthest-behind peer so that the log will try to avoid
// GCing these before the peer has caught up.
virtual log::RetentionIndexes GetRetentionIndexes() = 0;
friend class RefCountedThreadSafe<Consensus>;
friend class tablet::TabletPeer;
friend class master::SysCatalogTable;
// This class is refcounted.
virtual ~Consensus() {}
enum State {
// Factory for replica transactions.
// An implementation of this factory must be registered prior to consensus
// start, and is used to create transactions when the consensus implementation receives
// messages from the leader.
// Replica transactions execute the following way:
// - When a ReplicateMsg is first received from the leader, the Consensus
// instance creates the ConsensusRound and calls StartReplicaTransaction().
// This will trigger the Prepare(). At the same time replica consensus
// instance immediately stores the ReplicateMsg in the Log. Once the replicate
// message is stored in stable storage an ACK is sent to the leader (i.e. the
// replica Consensus instance does not wait for Prepare() to finish).
// - When the CommitMsg for a replicate is first received from the leader
// the replica waits for the corresponding Prepare() to finish (if it has
// not completed yet) and then proceeds to trigger the Apply().
// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
// a CommitMsg to the log to ensure that the operation can be properly restored
// on a restart.
class ReplicaTransactionFactory {
virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
virtual ~ReplicaTransactionFactory() {}
// Context for a consensus round on the LEADER side, typically created as an
// out-parameter of Consensus::Append.
// This class is ref-counted because we want to ensure it stays alive for the
// duration of the Transaction when it is associated with a Transaction, while
// we also want to ensure it has a proper lifecycle when a ConsensusRound is
// pushed that is not associated with a Tablet transaction.
class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
// Ctor used for leader transactions. Leader transactions can and must specify the
// callbacks prior to initiating the consensus round.
ConsensusRound(Consensus* consensus, gscoped_ptr<ReplicateMsg> replicate_msg,
ConsensusReplicatedCallback replicated_cb);
// Ctor used for follower/learner transactions. These transactions do not use the
// replicate callback and the commit callback is set later, after the transaction
// is actually started.
ConsensusRound(Consensus* consensus,
const ReplicateRefPtr& replicate_msg);
ReplicateMsg* replicate_msg() {
return replicate_msg_->get();
const ReplicateRefPtr& replicate_scoped_refptr() {
return replicate_msg_;
// Returns the id of the (replicate) operation this context
// refers to. This is only set _after_ Consensus::Replicate(context).
OpId id() const {
return replicate_msg_->get()->id();
// Register a callback that is called by Consensus to notify that the round
// is considered either replicated, if 'status' is OK(), or that it has
// permanently failed to replicate if 'status' is anything else. If 'status'
// is OK() then the operation can be applied to the state machine, otherwise
// the operation should be aborted.
void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) {
replicated_cb_ = replicated_cb;
// If a continuation was set, notifies it that the round has been replicated.
void NotifyReplicationFinished(const Status& status);
// Binds this round such that it may not be eventually executed in any term
// other than 'term'.
// See CheckBoundTerm().
void BindToTerm(int64_t term) {
DCHECK_EQ(bound_term_, -1);
bound_term_ = term;
// Check for a rare race in which an operation is submitted to the LEADER in some term,
// then before the operation is prepared, the replica loses its leadership, receives
// more operations as a FOLLOWER, and then regains its leadership. We detect this case
// by setting the ConsensusRound's "bound term" when it is first submitted to the
// PREPARE queue, and validate that the term is still the same when we have finished
// preparing it. See KUDU-597 for details.
// If this round has not been bound to any term, this is a no-op.
Status CheckBoundTerm(int64_t current_term) const;
friend class RaftConsensusQuorumTest;
friend class RefCountedThreadSafe<ConsensusRound>;
~ConsensusRound() {}
Consensus* consensus_;
// This round's replicate message.
ReplicateRefPtr replicate_msg_;
// The continuation that will be called once the transaction is
// deemed committed/aborted by consensus.
ConsensusReplicatedCallback replicated_cb_;
// The leader term that this round was submitted in. CheckBoundTerm()
// ensures that, when it is eventually replicated, the term has not
// changed in the meantime.
// Set to -1 if no term has been bound.
int64_t bound_term_;
} // namespace consensus
} // namespace kudu
#endif /* CONSENSUS_H_ */