| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #pragma once |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <functional> |
| #include <optional> |
| #include <ostream> |
| #include <memory> |
| #include <string> |
| #include <unordered_map> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| #include <gtest/gtest_prod.h> |
| |
| #include "kudu/consensus/log_cache.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/ref_counted_replicate.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/threading/thread_collision_warner.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/status_callback.h" |
| |
| namespace kudu { |
| class ThreadPoolToken; |
| |
| namespace log { |
| class Log; |
| } |
| |
| namespace logging { |
| class LogThrottler; |
| } |
| |
| namespace consensus { |
| class ConsensusRequestPB; |
| class ConsensusResponsePB; |
| class ConsensusStatusPB; |
| class PeerMessageQueueObserver; |
| class StartTabletCopyRequestPB; |
| class TimeManager; |
| |
| // The id for the server-wide consensus queue MemTracker. |
| extern const char kConsensusQueueParentTrackerId[]; |
| |
| // State enum for the last known status of a peer tracked by the |
| // ConsensusQueue. |
| enum class PeerStatus { |
| // The peer has not yet had a round of communication. |
| NEW, |
| |
| // The last exchange with the peer was successful. We transmitted |
| // an update to the peer and it accepted it. |
| OK, |
| |
| // Some tserver-level or consensus-level error occurred that didn't |
| // fall into any of the below buckets. |
| REMOTE_ERROR, |
| |
| // Some RPC-layer level error occurred. For example, a network error or timeout |
| // occurred while attempting to send the RPC. |
| RPC_LAYER_ERROR, |
| |
| // The remote tablet server indicated that the tablet was in a FAILED state. |
| TABLET_FAILED, |
| |
| // The remote tablet server indicated that the tablet was in a NOT_FOUND state. |
| TABLET_NOT_FOUND, |
| |
| // The remote tablet server indicated that the term of this leader was older |
| // than its latest seen term. |
| INVALID_TERM, |
| |
| // The remote tablet server was unable to prepare any operations in the most recent |
| // batch. |
| CANNOT_PREPARE, |
| |
| // The remote tablet server's log was divergent from the leader's log. |
| LMP_MISMATCH, |
| }; |
| |
| const char* PeerStatusToString(PeerStatus p); |
| |
| // Tracks the state of the peers and which ops they have replicated. Owns the |
| // LogCache which actually holds the replicate messages which are en route to |
| // the various peers. |
| // |
| // This also takes care of pushing requests to peers as new operations are |
| // added, and notifying RaftConsensus when the commit index advances. |
| // |
| // TODO(todd): Right now this class is able to track one outstanding operation |
| // per peer. If we want to have more than one outstanding RPC we need to |
| // modify it. |
| class PeerMessageQueue { |
| public: |
| struct TrackedPeer { |
| explicit TrackedPeer(RaftPeerPB peer_pb); |
| |
| TrackedPeer() = default; |
| |
| // Copy a given TrackedPeer. |
| TrackedPeer& operator=(const TrackedPeer& tracked_peer) = default; |
| |
| // Check that the terms seen from a given peer only increase |
| // monotonically. |
| void CheckMonotonicTerms(int64_t term) { |
| DCHECK_GE(term, last_seen_term_) << "peer info: " << ToString(); |
| last_seen_term_ = term; |
| } |
| |
| const std::string& uuid() const { |
| return peer_pb.permanent_uuid(); |
| } |
| |
| std::string ToString() const; |
| |
| RaftPeerPB peer_pb; |
| |
| // Next index to send to the peer. |
| // This corresponds to "nextIndex" as specified in Raft. |
| int64_t next_index; |
| |
| // The last operation that we've sent to this peer and that |
| // it acked. Used for watermark movement. |
| OpId last_received; |
| |
| // The last committed index this peer knows about. |
| int64_t last_known_committed_index; |
| |
| // The status after our last attempt to communicate with the peer. |
| // See the comments within the PeerStatus enum above for details. |
| PeerStatus last_exchange_status; |
| |
| // The time of the last communication with the peer. |
| // |
| // NOTE: this does not indicate that the peer successfully made progress at the |
| // given time -- this only indicates that we got some indication that the tablet |
| // server process was alive. It could be that the tablet was not found, etc. |
| // Consult last_exchange_status for details. |
| // |
| // Defaults to the time of construction, so does not necessarily mean that |
| // successful communication ever took place. |
| MonoTime last_communication_time; |
| |
| // Set to false if it is determined that the remote peer has fallen behind |
| // the local peer's WAL. |
| bool wal_catchup_possible; |
| |
| // Whether the peer's server is quiescing, which dictates whether the peer |
| // is a candidate for leadership successor. |
| bool remote_server_quiescing; |
| |
| // The peer's latest overall health status. |
| HealthReportPB::HealthStatus last_overall_health_status; |
| |
| // Throttler for how often we will log status messages pertaining to this |
| // peer (eg when it is lagging, etc). |
| std::shared_ptr<logging::LogThrottler> status_log_throttler; |
| |
| private: |
| // The last term we saw from a given peer. |
| // This is only used for sanity checking that a peer doesn't |
| // go backwards in time. |
| int64_t last_seen_term_; |
| }; |
| |
| PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity, |
| scoped_refptr<log::Log> log, |
| TimeManager* time_manager, |
| RaftPeerPB local_peer_pb, |
| std::string tablet_id, |
| std::unique_ptr<ThreadPoolToken> raft_pool_observers_token, |
| const std::atomic<bool>* server_quiescing, |
| OpId last_locally_replicated, |
| const OpId& last_locally_committed, |
| const bool* allow_status_msg_for_failed_peer = nullptr); |
| |
| // Changes the queue to leader mode, meaning it tracks majority replicated |
| // operations and notifies observers when those change. |
| // 'committed_index' corresponds to the id of the last committed operation, |
| // i.e. operations with ids <= 'committed_index' should be considered committed. |
| // |
| // 'current_term' corresponds to the leader's current term, this is different |
| // from 'committed_index.term()' if the leader has not yet committed an |
| // operation in the current term. |
| // 'active_config' is the currently-active Raft config. This must always be |
| // a superset of the tracked peers, and that is enforced with runtime CHECKs. |
| void SetLeaderMode(int64_t committed_index, |
| int64_t current_term, |
| const RaftConfigPB& active_config); |
| |
| // Changes the queue to non-leader mode. Currently tracked peers will still |
| // be tracked so that the cache is only evicted when the peers no longer need |
| // the operations but the queue will no longer advance the majority replicated |
| // index or notify observers of its advancement. |
| void SetNonLeaderMode(const RaftConfigPB& active_config); |
| |
| // Makes the queue track this peer. |
| void TrackPeer(const RaftPeerPB& peer_pb); |
| |
| // Makes the queue untrack this peer. |
| void UntrackPeer(const std::string& uuid); |
| |
| // Returns a health report for all active peers. |
| // Returns IllegalState if the local peer is not the leader of the config. |
| std::unordered_map<std::string, HealthReportPB> ReportHealthOfPeers() const; |
| |
| // Appends a single message to be replicated to the peers. |
| // Returns OK unless the message could not be added to the queue for some |
| // reason (e.g. the queue reached max size). |
| // If it returns OK the queue takes ownership of 'msg'. |
| // |
| // This is thread-safe against all of the read methods, but not thread-safe |
| // with concurrent Append calls. |
| Status AppendOperation(const ReplicateRefPtr& msg); |
| |
| // Appends a vector of messages to be replicated to the peers. |
| // Returns OK unless the message could not be added to the queue for some |
| // reason (e.g. the queue reached max size), calls 'log_append_callback' when |
| // the messages are durable in the local Log. |
| // If it returns OK the queue takes ownership of 'msgs'. |
| // |
| // This is thread-safe against all of the read methods, but not thread-safe |
| // with concurrent Append calls. |
| Status AppendOperations(std::vector<ReplicateRefPtr> msgs, |
| const StatusCallback& log_append_callback); |
| |
| // Truncate all operations coming after 'index'. Following this, the 'last_appended' |
| // operation is reset to the OpId with this index, and the log cache will be truncated |
| // accordingly. |
| void TruncateOpsAfter(int64_t index); |
| |
| // Return the last OpId in the log. |
| // Note that this can move backwards after a truncation (TruncateOpsAfter). |
| OpId GetLastOpIdInLog() const; |
| |
| // Return the next OpId to be appended to the queue in the current term. |
| OpId GetNextOpId() const; |
| |
| // Assembles a request for a peer, adding entries past 'op_id' up to |
| // 'consensus_max_batch_size_bytes'. |
| // Returns OK if the request was assembled, or Status::NotFound() if the |
| // peer with 'uuid' was not tracked, of if the queue is not in leader mode. |
| // Returns Status::Incomplete if we try to read an operation index from the |
| // log that has not been written. |
| // |
| // WARNING: In order to avoid copying the same messages to every peer, |
| // entries are added to 'request' via AddAllocated() methods. |
| // The owner of 'request' is expected not to delete the request prior |
| // to removing the entries through ExtractSubRange() or any other method |
| // that does not delete the entries. The simplest way is to pass the same |
| // instance of ConsensusRequestPB to RequestForPeer(): the buffer will |
| // replace the old entries with new ones without de-allocating the old |
| // ones if they are still required. |
| Status RequestForPeer(const std::string& uuid, |
| ConsensusRequestPB* request, |
| std::vector<ReplicateRefPtr>* msg_refs, |
| bool* needs_tablet_copy); |
| |
| // Fill in a StartTabletCopyRequest for the specified peer. |
| // If that peer should not initiate Tablet Copy, returns a non-OK status. |
| // On success, also internally resets peer->needs_tablet_copy to false. |
| Status GetTabletCopyRequestForPeer(const std::string& uuid, |
| StartTabletCopyRequestPB* req); |
| |
| // Inform the queue of a new status known for one of its peers. |
| // 'ps' indicates an interpretation of the status, while 'status' |
| // may contain a more specific error message in the case of one of |
| // the error statuses. |
| void UpdatePeerStatus(const std::string& peer_uuid, |
| PeerStatus ps, |
| const Status& status); |
| |
| // Updates the request queue with the latest response from a request to a |
| // consensus peer. |
| // Returns true iff there are more requests pending in the queue for this |
| // peer and another request should be sent immediately, with no intervening |
| // delay. |
| bool ResponseFromPeer(const std::string& peer_uuid, |
| const ConsensusResponsePB& response); |
| |
| // Called by the consensus implementation to update the queue's watermarks |
| // based on information provided by the leader. This is used for metrics and |
| // log retention. |
| void UpdateFollowerWatermarks(int64_t committed_index, |
| int64_t all_replicated_index); |
| |
| // Updates the last op appended to the leader and the corresponding lag metric. |
| // This should not be called by a leader. |
| void UpdateLastIndexAppendedToLeader(int64_t last_idx_appended_to_leader); |
| |
| // Closes the queue. Once the queue is closed, peers are still allowed to |
| // call UntrackPeer() and ResponseFromPeer(), however no additional peers may |
| // be tracked and no additional messages may be enqueued. |
| void Close(); |
| |
| int64_t GetQueuedOperationsSizeBytesForTests() const; |
| |
| // Returns the last message replicated by all peers. |
| int64_t GetAllReplicatedIndex() const; |
| |
| // Returns the committed index. All operations with index less than or equal to |
| // this index have been committed. |
| int64_t GetCommittedIndex() const; |
| |
| // Return true if the committed index falls within the current term. |
| bool IsCommittedIndexInCurrentTerm() const; |
| |
| // Whether the queue run in the leader mode. |
| bool IsInLeaderMode() const; |
| |
| // Returns the current majority replicated index, for tests. |
| int64_t GetMajorityReplicatedIndexForTests() const; |
| |
| // Returns a copy of the TrackedPeer with 'uuid' or crashes if the peer is |
| // not being tracked. |
| TrackedPeer GetTrackedPeerForTests(const std::string& uuid); |
| |
| std::string ToString() const; |
| |
| // Dumps the contents of the queue to the provided string vector. |
| void DumpToStrings(std::vector<std::string>* lines) const; |
| |
| void DumpToHtml(std::ostream& out) const; |
| |
| void RegisterObserver(PeerMessageQueueObserver* observer); |
| |
| Status UnRegisterObserver(PeerMessageQueueObserver* observer); |
| |
| struct Metrics { |
| // Keeps track of the number of ops. that are completed by a majority but still need |
| // to be replicated to a minority (IsDone() is true, IsAllDone() is false). |
| scoped_refptr<AtomicGauge<int64_t> > num_majority_done_ops; |
| // Keeps track of the number of ops. that are still in progress (IsDone() returns false). |
| scoped_refptr<AtomicGauge<int64_t> > num_in_progress_ops; |
| // Keeps track of the number of ops. behind the leader the peer is, measured as the difference |
| // between the latest appended op index on this peer versus on the leader (0 if leader). |
| scoped_refptr<AtomicGauge<int64_t> > num_ops_behind_leader; |
| |
| explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity); |
| }; |
| |
| ~PeerMessageQueue(); |
| |
| // Begin or end the watch for an eligible successor. If 'successor_uuid' is |
| // std::nullopt, the queue will notify its observers when 'successor_uuid' is |
| // caught up to the leader. Otherwise, it will notify its observers |
| // with the UUID of the first voter that is caught up. |
| void BeginWatchForSuccessor(const std::optional<std::string>& successor_uuid); |
| void EndWatchForSuccessor(); |
| |
| private: |
| FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex); |
| FRIEND_TEST(ConsensusQueueTest, TestQueueMovesWatermarksBackward); |
| FRIEND_TEST(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics); |
| FRIEND_TEST(ConsensusQueueTest, TestStatusMessagesToFailedUnrecoverablePeer); |
| FRIEND_TEST(ConsensusQueueUnitTest, PeerHealthStatus); |
| FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty); |
| |
| // Mode specifies how the queue currently behaves: |
| // LEADER - Means the queue tracks remote peers and replicates whatever messages |
| // are appended. Observers are notified of changes. |
| // NON_LEADER - Means the queue only tracks the local peer (remote peers are ignored). |
| // Observers are not notified of changes. |
| enum Mode { |
| LEADER, |
| NON_LEADER |
| }; |
| |
| enum State { |
| kQueueOpen, |
| kQueueClosed |
| }; |
| |
| // Types of replicas to count when advancing a queue watermark. |
| enum ReplicaTypes { |
| ALL_REPLICAS, |
| VOTER_REPLICAS, |
| }; |
| |
| struct QueueState { |
| |
| // The first operation that has been replicated to all currently |
| // tracked peers. |
| int64_t all_replicated_index; |
| |
| // The index of the last operation replicated to a majority. |
| // This is usually the same as 'committed_index' but might not |
| // be if the terms changed. |
| int64_t majority_replicated_index; |
| |
| // The index of the last operation to be considered committed. |
| int64_t committed_index; |
| |
| // The index of the last operation appended to the leader. A follower will use this to |
| // determine how many ops behind the leader it is, as a soft metric for follower lag. |
| int64_t last_idx_appended_to_leader; |
| |
| // The opid of the last operation appended to the queue. |
| OpId last_appended; |
| |
| // The queue's owner current_term. |
| // Set by the last appended operation. |
| // If the queue owner's term is less than the term observed |
| // from another peer the queue owner must step down. |
| int64_t current_term; |
| |
| // The first index that we saw that was part of this current term. |
| // When the term advances, this is set to std::nullopt, and then set |
| // when the first operation is appended in the new term. |
| std::optional<int64_t> first_index_in_current_term; |
| |
| // The size of the majority for the queue. |
| int majority_size_; |
| |
| State state; |
| |
| // The current mode of the queue. |
| Mode mode; |
| |
| // The currently-active raft config. Only set if in LEADER mode. |
| std::unique_ptr<RaftConfigPB> active_config; |
| |
| std::string ToString() const; |
| }; |
| |
| // Returns true iff given 'desired_op' is found in the local WAL. |
| // If the op is not found, returns false. |
| // If the log cache returns some error other than NotFound, crashes with a |
| // fatal error. |
| bool IsOpInLog(const OpId& desired_op) const; |
| |
| // Return true if it would be safe to evict the peer 'evict_uuid' at this |
| // point in time. |
| bool SafeToEvictUnlocked(const std::string& evict_uuid) const; |
| |
| // Update a peer's last_health_status field and trigger the appropriate |
| // notifications. |
| void UpdatePeerHealthUnlocked(TrackedPeer* peer); |
| |
| // Update the peer's last exchange status, and other fields, based on the |
| // response. Sets 'lmp_mismatch' to true if the given response indicates |
| // there was a log-matching property mismatch on the remote, otherwise sets |
| // it to false. |
| void UpdateExchangeStatus(TrackedPeer* peer, const TrackedPeer& prev_peer_state, |
| const ConsensusResponsePB& response, bool* lmp_mismatch); |
| |
| // Check if the peer is a NON_VOTER candidate ready for promotion. If so, |
| // trigger promotion. |
| void PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& prev_peer_state, |
| const ConsensusStatusPB& status); |
| |
| // If there is a graceful leadership change underway, notify queue observers |
| // to initiate leadership transfer to the specified peer under the following |
| // conditions: |
| // * 'peer' has fully caught up to the leader |
| // * 'peer' is the designated successor, or no successor was designated |
| void TransferLeadershipIfNeeded(const TrackedPeer& peer, |
| const ConsensusStatusPB& status); |
| |
| // Calculate a peer's up-to-date health status based on internal fields. |
| static HealthReportPB::HealthStatus PeerHealthStatus(const TrackedPeer& peer); |
| |
| // Asynchronously trigger various types of observer notifications on a |
| // separate thread. |
| void NotifyObserversOfCommitIndexChange(int64_t new_commit_index); |
| void NotifyObserversOfTermChange(int64_t term); |
| void NotifyObserversOfFailedFollower(const std::string& uuid, |
| int64_t term, |
| const std::string& reason); |
| void NotifyObserversOfPeerToPromote(const std::string& peer_uuid); |
| void NotifyObserversOfSuccessor(const std::string& peer_uuid); |
| void NotifyObserversOfPeerHealthChange(); |
| |
| // Notify all PeerMessageQueueObservers using the given callback function. |
| void NotifyObserversTask(const std::function<void(PeerMessageQueueObserver*)>& func); |
| |
| typedef std::unordered_map<std::string, TrackedPeer*> PeersMap; |
| |
| std::string ToStringUnlocked() const; |
| |
| std::string LogPrefixUnlocked() const; |
| |
| void DumpToStringsUnlocked(std::vector<std::string>* lines) const; |
| |
| // Updates the metrics based on index math. |
| void UpdateMetricsUnlocked(); |
| |
| // Update the metric that measures how many ops behind the leader the local |
| // replica believes it is (0 if leader). |
| void UpdateLagMetricsUnlocked(); |
| |
| void ClearUnlocked(); |
| |
| // Returns the last operation in the message queue, or |
| // 'preceding_first_op_in_queue_' if the queue is empty. |
| const OpId& GetLastOp() const; |
| |
| void TrackPeerUnlocked(const RaftPeerPB& peer_pb); |
| |
| void UntrackPeerUnlocked(const std::string& uuid); |
| |
| // We need the local peer in the config because it contains the current |
| // 'member_type' of the local node while 'local_peer_pb_' does not. |
| void TrackLocalPeerUnlocked(); |
| |
| // Checks that if the queue is in LEADER mode then all registered peers are |
| // in the active config. Crashes with a FATAL log message if this invariant |
| // does not hold. If the queue is in NON_LEADER mode, does nothing. |
| void CheckPeersInActiveConfigIfLeaderUnlocked() const; |
| |
| // Callback when a REPLICATE message has finished appending to the local log. |
| void LocalPeerAppendFinished(const OpId& id, |
| const StatusCallback& callback, |
| const Status& status); |
| |
| // Advances 'watermark' to the smallest op that 'num_peers_required' have. |
| // If 'replica_types' is set to VOTER_REPLICAS, the 'num_peers_required' is |
| // interpreted as "number of voters required". If 'replica_types' is set to |
| // ALL_REPLICAS, 'num_peers_required' counts any peer, regardless of its |
| // voting status. |
| void AdvanceQueueWatermark(const char* type, |
| int64_t* watermark, |
| const OpId& replicated_before, |
| const OpId& replicated_after, |
| int num_peers_required, |
| ReplicaTypes replica_types, |
| const TrackedPeer* who_caused); |
| |
| std::vector<PeerMessageQueueObserver*> observers_; |
| |
| // The pool token which executes observer notifications. |
| std::unique_ptr<ThreadPoolToken> raft_pool_observers_token_; |
| |
| // Shared boolean that indicates whether the server is quiescing, in which |
| // case leadership should be transferred away from this peer. |
| const std::atomic<bool>* server_quiescing_; |
| |
| // PB containing identifying information about the local peer. |
| const RaftPeerPB local_peer_pb_; |
| |
| // The id of the tablet. |
| const std::string tablet_id_; |
| |
| QueueState queue_state_; |
| |
| // The currently tracked peers. |
| PeersMap peers_map_; |
| mutable simple_spinlock queue_lock_; // TODO(todd): rename |
| |
| bool successor_watch_in_progress_; |
| std::optional<std::string> designated_successor_uuid_; |
| |
| // We assume that we never have multiple threads racing to append to the queue. |
| // This fake mutex adds some extra assurance that this implementation property |
| // doesn't change. |
| DFAKE_MUTEX(append_fake_lock_); |
| |
| LogCache log_cache_; |
| |
| Metrics metrics_; |
| |
| TimeManager* time_manager_; |
| |
| const bool* allow_status_msg_for_failed_peer_; |
| }; |
| |
| // The interface between RaftConsensus and the PeerMessageQueue. |
| class PeerMessageQueueObserver { |
| public: |
| // Notify the observer that the commit index has advanced to 'committed_index'. |
| virtual void NotifyCommitIndex(int64_t committed_index) = 0; |
| |
| // Notify the observer that a follower replied with a term |
| // higher than that established in the queue. |
| virtual void NotifyTermChange(int64_t term) = 0; |
| |
| // Notify the observer that a peer is unable to catch up due to falling behind |
| // the leader's log GC threshold. |
| virtual void NotifyFailedFollower(const std::string& peer_uuid, |
| int64_t term, |
| const std::string& reason) = 0; |
| |
| // Notify the observer that the specified peer is ready to be promoted from |
| // NON_VOTER to VOTER. |
| virtual void NotifyPeerToPromote(const std::string& peer_uuid) = 0; |
| |
| // Notify the observer that the specified peer is ready to become leader, and |
| // and it should be told to run an election. |
| virtual void NotifyPeerToStartElection(const std::string& peer_uuid) = 0; |
| |
| // Notify the observer that the health of one of the peers has changed. |
| virtual void NotifyPeerHealthChange() = 0; |
| |
| virtual ~PeerMessageQueueObserver() {} |
| }; |
| |
| } // namespace consensus |
| } // namespace kudu |