| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/consensus/raft_consensus.h" |
| |
| #include <algorithm> |
| #include <cmath> |
| #include <cstdint> |
| #include <functional> |
| #include <iterator> |
| #include <memory> |
| #include <mutex> |
| #include <optional> |
| #include <ostream> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| |
| #include <gflags/gflags.h> |
| #include <google/protobuf/util/message_differencer.h> |
| |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus_meta.h" |
| #include "kudu/consensus/consensus_meta_manager.h" |
| #include "kudu/consensus/consensus_peers.h" |
| #include "kudu/consensus/leader_election.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/peer_manager.h" |
| #include "kudu/consensus/pending_rounds.h" |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/consensus/time_manager.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/gutil/strings/stringpiece.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/walltime.h" |
| #include "kudu/rpc/periodic.h" |
| #include "kudu/util/async_util.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/process_memory.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/random_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/thread_restrictions.h" |
| #include "kudu/util/threadpool.h" |
| #include "kudu/util/trace.h" |
| #include "kudu/util/url-coding.h" |
| |
| DEFINE_int32(raft_heartbeat_interval_ms, 500, |
| "The heartbeat interval for Raft replication. The leader produces heartbeats " |
| "to followers at this interval. The followers expect a heartbeat at this interval " |
| "and consider a leader to have failed if it misses several in a row."); |
| TAG_FLAG(raft_heartbeat_interval_ms, advanced); |
| |
| DEFINE_double(leader_failure_max_missed_heartbeat_periods, 3.0, |
| "Maximum heartbeat periods that the leader can fail to heartbeat in before we " |
| "consider the leader to be failed. The total failure timeout in milliseconds is " |
| "raft_heartbeat_interval_ms times leader_failure_max_missed_heartbeat_periods. " |
| "The value passed to this flag may be fractional."); |
| TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced); |
| |
| DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000, |
| "Maximum time to sleep in between leader election retries, in addition to the " |
| "regular timeout. When leader election fails the interval in between retries " |
| "increases exponentially, up to this value."); |
| TAG_FLAG(leader_failure_exp_backoff_max_delta_ms, experimental); |
| |
| DEFINE_bool(enable_leader_failure_detection, true, |
| "Whether to enable failure detection of tablet leaders. If enabled, attempts will be " |
| "made to elect a follower as a new leader when the leader is detected to have failed."); |
| TAG_FLAG(enable_leader_failure_detection, unsafe); |
| |
| DEFINE_bool(evict_failed_followers, true, |
| "Whether to evict followers from the Raft config that have fallen " |
| "too far behind the leader's log to catch up normally or have been " |
| "unreachable by the leader for longer than " |
| "follower_unavailable_considered_failed_sec"); |
| TAG_FLAG(evict_failed_followers, advanced); |
| TAG_FLAG(evict_failed_followers, runtime); |
| |
| DEFINE_bool(follower_reject_update_consensus_requests, false, |
| "Whether a follower will return an error for all UpdateConsensus() requests. " |
| "Warning! This is only intended for testing."); |
| TAG_FLAG(follower_reject_update_consensus_requests, unsafe); |
| |
| DEFINE_bool(follower_fail_all_prepare, false, |
| "Whether a follower will fail preparing all ops. " |
| "Warning! This is only intended for testing."); |
| TAG_FLAG(follower_fail_all_prepare, unsafe); |
| |
| DEFINE_bool(raft_enable_pre_election, true, |
| "When enabled, candidates will call a pre-election before " |
| "running a real leader election."); |
| TAG_FLAG(raft_enable_pre_election, experimental); |
| TAG_FLAG(raft_enable_pre_election, runtime); |
| |
| DEFINE_bool(raft_enable_tombstoned_voting, true, |
| "When enabled, tombstoned tablets may vote in elections."); |
| TAG_FLAG(raft_enable_tombstoned_voting, experimental); |
| TAG_FLAG(raft_enable_tombstoned_voting, runtime); |
| |
| // Enable improved re-replication (KUDU-1097). |
| DEFINE_bool(raft_prepare_replacement_before_eviction, true, |
| "When enabled, failed replicas will only be evicted after a " |
| "replacement has been prepared for them."); |
| TAG_FLAG(raft_prepare_replacement_before_eviction, advanced); |
| TAG_FLAG(raft_prepare_replacement_before_eviction, experimental); |
| |
| DECLARE_int32(memory_limit_warn_threshold_percentage); |
| |
| // Metrics |
| // --------- |
| METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections, |
| "Follower Memory Pressure Rejections", |
| kudu::MetricUnit::kRequests, |
| "Number of RPC requests rejected due to " |
| "memory pressure while FOLLOWER.", |
| kudu::MetricLevel::kWarn); |
| METRIC_DEFINE_gauge_int64(tablet, raft_term, |
| "Current Raft Consensus Term", |
| kudu::MetricUnit::kUnits, |
| "Current Term of the Raft Consensus algorithm. This number increments " |
| "each time a leader election is started.", |
| kudu::MetricLevel::kDebug); |
| METRIC_DEFINE_gauge_int64(tablet, failed_elections_since_stable_leader, |
| "Failed Elections Since Stable Leader", |
| kudu::MetricUnit::kUnits, |
| "Number of failed elections on this node since there was a stable " |
| "leader. This number increments on each failed election and resets on " |
| "each successful one.", |
| kudu::MetricLevel::kWarn); |
| METRIC_DEFINE_gauge_int64(tablet, time_since_last_leader_heartbeat, |
| "Time Since Last Leader Heartbeat", |
| kudu::MetricUnit::kMilliseconds, |
| "The time elapsed since the last heartbeat from the leader " |
| "in milliseconds. This metric is identically zero on a leader replica.", |
| kudu::MetricLevel::kDebug); |
| |
| |
| using google::protobuf::util::MessageDifferencer; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::PeriodicTimer; |
| using kudu::tserver::TabletServerErrorPB; |
| using std::nullopt; |
| using std::optional; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::weak_ptr; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace consensus { |
| |
| RaftConsensus::RaftConsensus( |
| ConsensusOptions options, |
| RaftPeerPB local_peer_pb, |
| scoped_refptr<ConsensusMetadataManager> cmeta_manager, |
| ServerContext server_ctx) |
| : options_(std::move(options)), |
| local_peer_pb_(std::move(local_peer_pb)), |
| log_prefix_(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid())), |
| cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))), |
| server_ctx_(std::move(server_ctx)), |
| state_(kNew), |
| rng_(GetRandomSeed32()), |
| leader_is_ready_(false), |
| leader_transfer_in_progress_(false), |
| withhold_votes_until_(MonoTime::Min()), |
| last_received_cur_leader_(MinimumOpId()), |
| failed_elections_since_stable_leader_(0), |
| shutdown_(false), |
| update_calls_for_tests_(0) { |
| DCHECK(local_peer_pb_.has_permanent_uuid()); |
| } |
| |
| Status RaftConsensus::Init() { |
| LockGuard l(lock_); |
| DCHECK_EQ(kNew, state_) << State_Name(state_); |
| RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_)); |
| SetStateUnlocked(kInitialized); |
| return Status::OK(); |
| } |
| |
| RaftConsensus::~RaftConsensus() { |
| Shutdown(); |
| } |
| |
| Status RaftConsensus::Create(ConsensusOptions options, |
| RaftPeerPB local_peer_pb, |
| scoped_refptr<ConsensusMetadataManager> cmeta_manager, |
| ServerContext server_ctx, |
| shared_ptr<RaftConsensus>* consensus_out) { |
| shared_ptr<RaftConsensus> consensus(RaftConsensus::make_shared(std::move(options), |
| std::move(local_peer_pb), |
| std::move(cmeta_manager), |
| std::move(server_ctx))); |
| RETURN_NOT_OK_PREPEND(consensus->Init(), "Unable to initialize Raft consensus"); |
| *consensus_out = std::move(consensus); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::Start(const ConsensusBootstrapInfo& info, |
| unique_ptr<PeerProxyFactory> peer_proxy_factory, |
| scoped_refptr<log::Log> log, |
| unique_ptr<TimeManager> time_manager, |
| ConsensusRoundHandler* round_handler, |
| const scoped_refptr<MetricEntity>& metric_entity, |
| MarkDirtyCallback cb) { |
| DCHECK(metric_entity); |
| |
| peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory)); |
| log_ = DCHECK_NOTNULL(std::move(log)); |
| time_manager_ = DCHECK_NOTNULL(std::move(time_manager)); |
| round_handler_ = DCHECK_NOTNULL(round_handler); |
| mark_dirty_clbk_ = std::move(cb); |
| |
| term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, |
| CurrentTerm(), |
| MergeType::kMax); |
| follower_memory_pressure_rejections_ = |
| metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections); |
| |
| num_failed_elections_metric_ = |
| metric_entity->FindOrCreateGauge(&METRIC_failed_elections_since_stable_leader, |
| failed_elections_since_stable_leader_); |
| |
| METRIC_time_since_last_leader_heartbeat.InstantiateFunctionGauge( |
| metric_entity, [this]() { return this->GetMillisSinceLastLeaderHeartbeat(); }, |
| MergeType::kMax) |
| ->AutoDetach(&metric_detacher_); |
| |
| // A single Raft thread pool token is shared between RaftConsensus and |
| // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a |
| // raw pointer to the token, to emphasize that RaftConsensus is responsible |
| // for destroying the token. |
| ThreadPool* raft_pool = server_ctx_.raft_pool; |
| raft_pool_token_ = raft_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT); |
| |
| // The message queue that keeps track of which operations need to be replicated |
| // where. |
| // |
| // Note: the message queue receives a dedicated Raft thread pool token so that |
| // its submissions don't block other submissions by RaftConsensus (such as |
| // heartbeat processing). |
| // |
| // TODO(adar): the token is SERIAL to match the previous single-thread |
| // observer pool behavior, but CONCURRENT may be safe here. |
| unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue( |
| metric_entity, |
| log_, |
| time_manager_.get(), |
| local_peer_pb_, |
| options_.tablet_id, |
| raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL), |
| server_ctx_.quiescing, |
| info.last_id, |
| info.last_committed_id, |
| server_ctx_.allow_status_msg_for_failed_peer)); |
| |
| // A manager for the set of peers that actually send the operations both remotely |
| // and to the local wal. |
| unique_ptr<PeerManager> peer_manager(new PeerManager(options_.tablet_id, |
| peer_uuid(), |
| peer_proxy_factory_.get(), |
| queue.get(), |
| raft_pool_token_.get(), |
| log_)); |
| unique_ptr<PendingRounds> pending(new PendingRounds( |
| LogPrefixThreadSafe(), time_manager_.get())); |
| |
| // Capture a weak_ptr reference into the functor so it can safely handle |
| // outliving the consensus instance. |
| weak_ptr<RaftConsensus> w = shared_from_this(); |
| PeriodicTimer::Options opts; |
| opts.one_shot = true; |
| failure_detector_ = PeriodicTimer::Create( |
| peer_proxy_factory_->messenger(), |
| [w]() { |
| if (auto consensus = w.lock()) { |
| consensus->ReportFailureDetected(); |
| } |
| }, |
| MinimumElectionTimeout(), |
| opts); |
| |
| transfer_period_timer_ = PeriodicTimer::Create( |
| peer_proxy_factory_->messenger(), |
| [w]() { |
| if (auto consensus = w.lock()) { |
| consensus->EndLeaderTransferPeriod(); |
| } |
| }, |
| MinimumElectionTimeout(), |
| opts); |
| |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): " |
| << State_Name(state_); |
| |
| queue_ = std::move(queue); |
| peer_manager_ = std::move(peer_manager); |
| pending_ = std::move(pending); |
| |
| ClearLeaderUnlocked(); |
| |
| // Our last persisted term can be higher than the last persisted operation |
| // (i.e. if we called an election) but reverse should never happen. |
| if (info.last_id.term() > CurrentTermUnlocked()) { |
| return Status::Corruption(Substitute("Unable to start RaftConsensus: " |
| "The last op in the WAL with id $0 has a term ($1) that is greater " |
| "than the latest recorded term, which is $2", |
| OpIdToString(info.last_id), |
| info.last_id.term(), |
| CurrentTermUnlocked())); |
| } |
| |
| // Append any uncommitted replicate messages found during log replay to the queue. |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering " |
| << info.orphaned_replicates.size() |
| << " pending ops. Active config: " |
| << SecureShortDebugString(cmeta_->ActiveConfig()); |
| for (ReplicateMsg* replicate : info.orphaned_replicates) { |
| ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate)); |
| RETURN_NOT_OK(StartFollowerOpUnlocked(replicate_ptr)); |
| } |
| |
| // Set the initial committed opid for the PendingRounds only after |
| // appending any uncommitted replicate messages to the queue. |
| pending_->SetInitialCommittedOpId(info.last_committed_id); |
| |
| // If this is the first term expire the FD immediately so that we have a |
| // fast first election, otherwise we just let the timer expire normally. |
| optional<MonoDelta> fd_initial_delta; |
| if (CurrentTermUnlocked() == 0) { |
| // The failure detector is initialized to a low value to trigger an early |
| // election (unless someone else requested a vote from us first, which |
| // resets the election timer). |
| // |
| // We do it this way instead of immediately running an election to get a |
| // higher likelihood of enough servers being available when the first one |
| // attempts an election to avoid multiple election cycles on startup, |
| // while keeping that "waiting period" random. |
| if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Consensus starting up: Expiring failure detector timer " |
| "to make a prompt election more likely"; |
| fd_initial_delta = MonoDelta::FromMilliseconds( |
| rng_.Uniform(FLAGS_raft_heartbeat_interval_ms)); |
| } |
| } |
| |
| // Now assume non-leader replica duties. |
| RETURN_NOT_OK(BecomeReplicaUnlocked(fd_initial_delta)); |
| |
| SetStateUnlocked(kRunning); |
| } |
| |
| if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) { |
| LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately"; |
| WARN_NOT_OK_EVERY_N_SECS(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION), |
| "Couldn't start leader election", 10); |
| } |
| |
| // Report become visible to the Master. |
| MarkDirty("RaftConsensus started"); |
| |
| return Status::OK(); |
| } |
| |
| bool RaftConsensus::IsRunning() const { |
| return state_ == kRunning; |
| } |
| |
| Status RaftConsensus::EmulateElectionForTests() { |
| TRACE_EVENT2("consensus", "RaftConsensus::EmulateElectionForTests", |
| "peer", peer_uuid(), |
| "tablet", options_.tablet_id); |
| |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| RETURN_NOT_OK(CheckRunningUnlocked()); |
| |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election..."; |
| |
| // Assume leadership of new term. |
| RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1)); |
| SetLeaderUuidUnlocked(peer_uuid()); |
| return BecomeLeaderUnlocked(); |
| } |
| |
| namespace { |
| const char* ModeString(RaftConsensus::ElectionMode mode) { |
| switch (mode) { |
| case RaftConsensus::NORMAL_ELECTION: |
| return "leader election"; |
| case RaftConsensus::PRE_ELECTION: |
| return "pre-election"; |
| case RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE: |
| return "forced leader election"; |
| } |
| __builtin_unreachable(); // silence gcc warnings |
| } |
| string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uuid) { |
| switch (reason) { |
| case RaftConsensus::INITIAL_SINGLE_NODE_ELECTION: |
| return "initial election of a single-replica configuration"; |
| case RaftConsensus::EXTERNAL_REQUEST: |
| return "received explicit request"; |
| case RaftConsensus::ELECTION_TIMEOUT_EXPIRED: |
| if (leader_uuid.empty()) { |
| return "no leader contacted us within the election timeout"; |
| } |
| return Substitute("detected failure of leader $0", leader_uuid); |
| } |
| __builtin_unreachable(); // silence gcc warnings |
| } |
| } // anonymous namespace |
| |
| Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { |
| if (server_ctx_.quiescing && server_ctx_.quiescing->load()) { |
| return Status::IllegalState("leader elections are disabled"); |
| } |
| const char* const mode_str = ModeString(mode); |
| |
| TRACE_EVENT2("consensus", "RaftConsensus::StartElection", |
| "peer", LogPrefixThreadSafe(), |
| "mode", mode_str); |
| scoped_refptr<LeaderElection> election; |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| RETURN_NOT_OK(CheckRunningUnlocked()); |
| |
| RaftPeerPB::Role active_role = cmeta_->active_role(); |
| if (active_role == RaftPeerPB::LEADER) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << Substitute( |
| "Not starting $0 -- already a leader", mode_str); |
| return Status::OK(); |
| } |
| if (PREDICT_FALSE(!consensus::IsVoterRole(active_role))) { |
| // A non-voter should not start leader elections. The leader failure |
| // detector should be re-enabled once the non-voter replica is promoted |
| // to voter replica. |
| return Status::IllegalState("only voting members can start elections", |
| SecureShortDebugString(cmeta_->ActiveConfig())); |
| } |
| if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) { |
| SnoozeFailureDetector(); |
| return Status::IllegalState("Not starting election: node is currently " |
| "a non-participant in the Raft config", |
| SecureShortDebugString(cmeta_->ActiveConfig())); |
| } |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Starting " << mode_str |
| << " (" << ReasonString(reason, GetLeaderUuidUnlocked()) << ")"; |
| |
| // Snooze to avoid the election timer firing again as much as possible. |
| // We do not disable the election timer while running an election, so that |
| // if the election times out, we will try again. |
| MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked(); |
| SnoozeFailureDetector(string("starting election"), timeout); |
| |
| // Increment the term and vote for ourselves, unless it's a pre-election. |
| if (mode != PRE_ELECTION) { |
| // TODO(mpercy): Consider using a separate Mutex for voting, which must sync to disk. |
| |
| // We skip flushing the term to disk because setting the vote just below also |
| // flushes to disk, and the double fsync doesn't buy us anything. |
| RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1, |
| SKIP_FLUSH_TO_DISK)); |
| RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid())); |
| } |
| |
| RaftConfigPB active_config = cmeta_->ActiveConfig(); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: " |
| << SecureShortDebugString(active_config); |
| |
| // Initialize the VoteCounter. |
| int num_voters = CountVoters(active_config); |
| int majority_size = MajoritySize(num_voters); |
| VoteCounter counter(num_voters, majority_size); |
| |
| // Vote for ourselves. |
| bool duplicate; |
| RETURN_NOT_OK(counter.RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate)); |
| CHECK(!duplicate) << LogPrefixUnlocked() |
| << "Inexplicable duplicate self-vote for term " |
| << CurrentTermUnlocked(); |
| |
| VoteRequestPB request; |
| request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE); |
| request.set_candidate_uuid(peer_uuid()); |
| if (mode == PRE_ELECTION) { |
| // In a pre-election, we haven't bumped our own term yet, so we need to be |
| // asking for votes for the next term. |
| request.set_is_pre_election(true); |
| request.set_candidate_term(CurrentTermUnlocked() + 1); |
| } else { |
| request.set_candidate_term(CurrentTermUnlocked()); |
| } |
| request.set_tablet_id(options_.tablet_id); |
| *request.mutable_candidate_status()->mutable_last_received() = |
| queue_->GetLastOpIdInLog(); |
| |
| auto self = shared_from_this(); |
| election.reset(new LeaderElection( |
| std::move(active_config), |
| // The RaftConsensus ref passed below ensures that this raw pointer |
| // remains safe to use for the entirety of LeaderElection's life. |
| peer_proxy_factory_.get(), |
| std::move(request), std::move(counter), timeout, |
| [self, reason](const ElectionResult& result) { |
| self->ElectionCallback(reason, result); |
| })); |
| } |
| |
| // Start the election outside the lock. |
| election->Run(); |
| |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::WaitUntilLeader(const MonoDelta& timeout) { |
| MonoTime deadline = MonoTime::Now() + timeout; |
| while (role() != consensus::RaftPeerPB::LEADER) { |
| if (MonoTime::Now() >= deadline) { |
| return Status::TimedOut(Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3", |
| peer_uuid(), options_.tablet_id, timeout.ToString(), |
| role())); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) { |
| TRACE_EVENT0("consensus", "RaftConsensus::StepDown"); |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| DCHECK((queue_->IsInLeaderMode() && cmeta_->active_role() == RaftPeerPB::LEADER) || |
| (!queue_->IsInLeaderMode() && cmeta_->active_role() != RaftPeerPB::LEADER)); |
| RETURN_NOT_OK(CheckRunningUnlocked()); |
| if (cmeta_->active_role() != RaftPeerPB::LEADER) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to step down while not leader"; |
| resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER); |
| StatusToPB(Status::IllegalState("Not currently leader"), |
| resp->mutable_error()->mutable_status()); |
| // We return OK so that the tablet service won't overwrite the error code. |
| return Status::OK(); |
| } |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received request to step down"; |
| RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1, |
| SKIP_FLUSH_TO_DISK)); |
| // Snooze the failure detector for an extra leader failure timeout. |
| // This should ensure that a different replica is elected leader after this one steps down. |
| SnoozeFailureDetector(string("explicit stepdown request"), MonoDelta::FromMilliseconds( |
| 2 * MinimumElectionTimeout().ToMilliseconds())); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::TransferLeadership(const optional<string>& new_leader_uuid, |
| LeaderStepDownResponsePB* resp) { |
| TRACE_EVENT0("consensus", "RaftConsensus::TransferLeadership"); |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received request to transfer leadership" |
| << (new_leader_uuid ? |
| Substitute(" to TS $0", *new_leader_uuid) : |
| ""); |
| DCHECK((queue_->IsInLeaderMode() && cmeta_->active_role() == RaftPeerPB::LEADER) || |
| (!queue_->IsInLeaderMode() && cmeta_->active_role() != RaftPeerPB::LEADER)); |
| RETURN_NOT_OK(CheckRunningUnlocked()); |
| if (cmeta_->active_role() != RaftPeerPB::LEADER) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transer leadership while not leader"; |
| resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER); |
| StatusToPB(Status::IllegalState("not currently leader"), |
| resp->mutable_error()->mutable_status()); |
| // We return OK so that the tablet service won't overwrite the error code. |
| return Status::OK(); |
| } |
| if (new_leader_uuid) { |
| if (*new_leader_uuid == peer_uuid()) { |
| // Short-circuit as we are transferring leadership to ourselves and we |
| // already checked that we are leader. |
| return Status::OK(); |
| } |
| if (!IsRaftConfigVoter(*new_leader_uuid, cmeta_->ActiveConfig())) { |
| const string msg = Substitute("tablet server $0 is not a voter in the active config", |
| *new_leader_uuid); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transfer leadership " |
| << "because " << msg; |
| return Status::InvalidArgument(msg); |
| } |
| } |
| return BeginLeaderTransferPeriodUnlocked(new_leader_uuid); |
| } |
| |
| Status RaftConsensus::BeginLeaderTransferPeriodUnlocked( |
| const optional<string>& successor_uuid) { |
| DCHECK(lock_.is_locked()); |
| if (std::atomic_exchange(&leader_transfer_in_progress_, true)) { |
| return Status::ServiceUnavailable( |
| Substitute("leadership transfer for $0 already in progress", |
| options_.tablet_id)); |
| } |
| queue_->BeginWatchForSuccessor(successor_uuid); |
| transfer_period_timer_->Start(); |
| return Status::OK(); |
| } |
| |
| void RaftConsensus::EndLeaderTransferPeriod() { |
| transfer_period_timer_->Stop(); |
| queue_->EndWatchForSuccessor(); |
| leader_transfer_in_progress_ = false; |
| } |
| |
| scoped_refptr<ConsensusRound> RaftConsensus::NewRound( |
| unique_ptr<ReplicateMsg> replicate_msg, |
| ConsensusReplicatedCallback replicated_cb) { |
| return make_scoped_refptr(new ConsensusRound(this, |
| std::move(replicate_msg), |
| std::move(replicated_cb))); |
| } |
| |
| void RaftConsensus::ReportFailureDetectedTask() { |
| Status s = StartElection(FLAGS_raft_enable_pre_election ? |
| PRE_ELECTION : NORMAL_ELECTION, ELECTION_TIMEOUT_EXPIRED); |
| if (PREDICT_FALSE(!s.ok())) { |
| WARN_NOT_OK_EVERY_N_SECS( |
| s, LogPrefixThreadSafe() + "failed to trigger leader election", 10); |
| // Normally the failure detector would be enabled at the end of the election, |
| // but since the election failed to start, we must reenable explicitly. |
| EnableFailureDetector(); |
| } |
| } |
| |
| void RaftConsensus::ReportFailureDetected() { |
| // We're running on a timer thread; start an election on a different thread. |
| // |
| // There's no need to reenable the failure detector; if this fails, it's a |
| // sign that RaftConsensus has stopped and we no longer need failure detection. |
| auto self = shared_from_this(); |
| Status s = raft_pool_token_->Submit([self]() { self->ReportFailureDetectedTask(); }); |
| if (PREDICT_FALSE(!s.ok())) { |
| static const char* msg = "failed to submit failure detected task"; |
| CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg; |
| WARN_NOT_OK(s, LogPrefixThreadSafe() + msg); |
| } |
| } |
| |
| Status RaftConsensus::BecomeLeaderUnlocked() { |
| DCHECK(lock_.is_locked()); |
| |
| TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked", |
| "peer", peer_uuid(), |
| "tablet", options_.tablet_id); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked(); |
| |
| // Disable FD while we are leader. |
| DisableFailureDetector(); |
| |
| // Don't vote for anyone if we're a leader. |
| withhold_votes_until_ = MonoTime::Max(); |
| |
| // Leadership never starts in a transfer period. |
| EndLeaderTransferPeriod(); |
| |
| queue_->RegisterObserver(this); |
| bool was_leader = queue_->IsInLeaderMode(); |
| RefreshConsensusQueueAndPeersUnlocked(); |
| if (!was_leader && server_ctx_.num_leaders) { |
| server_ctx_.num_leaders->Increment(); |
| } |
| |
| // Initiate a NO_OP op that is sent at the beginning of every term |
| // change in raft. |
| auto replicate = new ReplicateMsg; |
| replicate->set_op_type(NO_OP); |
| replicate->mutable_noop_request(); // Define the no-op request field. |
| CHECK_OK(time_manager_->AssignTimestamp(replicate)); |
| |
| scoped_refptr<ConsensusRound> round( |
| new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate)))); |
| auto* round_raw = round.get(); |
| round->SetConsensusReplicatedCallback( |
| [this, round_raw](const Status& s) { |
| this->NonTxRoundReplicationFinished(round_raw, &DoNothingStatusCB, s); |
| }); |
| |
| last_leader_communication_time_micros_ = 0; |
| RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round)); |
| leader_is_ready_ = true; |
| |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::BecomeReplicaUnlocked(optional<MonoDelta> fd_delta) { |
| DCHECK(lock_.is_locked()); |
| |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: " |
| << ToStringUnlocked(); |
| ClearLeaderUnlocked(); |
| |
| // Enable/disable leader failure detection if becoming VOTER/NON_VOTER replica |
| // correspondingly. |
| UpdateFailureDetectorState(std::move(fd_delta)); |
| |
| // Now that we're a replica, we can allow voting for other nodes. |
| withhold_votes_until_ = MonoTime::Min(); |
| |
| // Deregister ourselves from the queue. We no longer need to track what gets |
| // replicated since we're stepping down. |
| queue_->UnRegisterObserver(this); |
| bool was_leader = queue_->IsInLeaderMode(); |
| queue_->SetNonLeaderMode(cmeta_->ActiveConfig()); |
| if (was_leader && server_ctx_.num_leaders) { |
| server_ctx_.num_leaders->IncrementBy(-1); |
| } |
| peer_manager_->Close(); |
| |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) { |
| std::lock_guard<simple_spinlock> lock(update_lock_); |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg())); |
| RETURN_NOT_OK(round->CheckBoundTerm(CurrentTermUnlocked())); |
| RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round)); |
| } |
| |
| peer_manager_->SignalRequest(); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) { |
| #ifndef NDEBUG |
| const auto& msg = *round->replicate_msg(); |
| DCHECK(!msg.has_id()) << "should not have an ID yet: " |
| << SecureShortDebugString(msg); |
| #endif |
| // Get a snapshot of an atomic member for consistency between the condition |
| // and the error message, if any. |
| const State state = state_; |
| if (PREDICT_FALSE(state != kRunning)) { |
| return Status::IllegalState("RaftConsensus is not running", |
| Substitute("state $0", State_Name(state))); |
| } |
| |
| // The order of reading role_and_term and leader_is_ready is essential to |
| // deal with situations when leader_is_ready and role_and_term are read |
| // in different Raft terms. |
| // * It's safe to bind to a stale term even if we've asserted leadership |
| // in a newer term: the stale op will be rejected on followers anyway |
| // because Raft guarantees that a majority of replicas have accepted |
| // the new term. |
| // * It's safe for the term to be stale if we haven't asserted leadership |
| // for a newer term because we'll exit out below. |
| // * It's unsafe to bind to a newer term if we've asserted leadership |
| // for a stale term, hence this ordering. |
| const auto role_and_term = cmeta_->GetRoleAndTerm(); |
| const bool leader_is_ready = leader_is_ready_; |
| |
| const auto role = role_and_term.first; |
| switch (role) { |
| case RaftPeerPB::LEADER: |
| if (leader_transfer_in_progress_) { |
| return Status::ServiceUnavailable("leader transfer in progress"); |
| } |
| if (!leader_is_ready) { |
| // Leader replica should not accept write operations before scheduling |
| // the replication of a NO_OP to assert its leadership in the current |
| // term. Otherwise there might be a race, so the very first accepted |
| // write operation might have timestamp lower than the timestamp |
| // of the NO_OP. That would trigger an assertion in MVCC. |
| return Status::ServiceUnavailable("leader is not yet ready"); |
| } |
| break; |
| |
| default: |
| return Status::IllegalState( |
| Substitute("replica $0 is not leader of this config: current role $1", |
| peer_uuid(), RaftPeerPB::Role_Name(role))); |
| } |
| const auto term = role_and_term.second; |
| round->BindToTerm(term); |
| |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) { |
| DCHECK(lock_.is_locked()); |
| |
| *round->replicate_msg()->mutable_id() = queue_->GetNextOpId(); |
| RETURN_NOT_OK(AddPendingOperationUnlocked(round)); |
| |
| // The only reasons for a bad status would be if the log itself were shut down, |
| // or if we had an actual IO error, which we currently don't handle. |
| CHECK_OK_PREPEND(queue_->AppendOperation(round->replicate_scoped_refptr()), |
| Substitute("$0: could not append to queue", LogPrefixUnlocked())); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) { |
| DCHECK(lock_.is_locked()); |
| DCHECK(pending_); |
| |
| // If we are adding a pending config change, we need to propagate it to the |
| // metadata. |
| if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) { |
| // Fill in the opid for the proposed new configuration. This has to be done |
| // here rather than when it's first created because we didn't yet have an |
| // OpId assigned at creation time. |
| ChangeConfigRecordPB* change_record = round->replicate_msg()->mutable_change_config_record(); |
| change_record->mutable_new_config()->set_opid_index(round->replicate_msg()->id().index()); |
| |
| DCHECK(change_record->IsInitialized()) |
| << "change_config_record missing required fields: " |
| << change_record->InitializationErrorString(); |
| |
| const RaftConfigPB& new_config = change_record->new_config(); |
| |
| if (!new_config.unsafe_config_change()) { |
| Status s = CheckNoConfigChangePendingUnlocked(); |
| if (PREDICT_FALSE(!s.ok())) { |
| s = s.CloneAndAppend(Substitute("\n New config: $0", SecureShortDebugString(new_config))); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString(); |
| return s; |
| } |
| } |
| // Check if the pending Raft config has an OpId less than the committed |
| // config. If so, this is a replay at startup in which the COMMIT |
| // messages were delayed. |
| int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG); |
| if (round->replicate_msg()->id().index() > committed_config_opid_index) { |
| RETURN_NOT_OK(SetPendingConfigUnlocked(new_config)); |
| if (cmeta_->active_role() == RaftPeerPB::LEADER) { |
| RefreshConsensusQueueAndPeersUnlocked(); |
| } |
| } else { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Ignoring setting pending config change with OpId " |
| << round->replicate_msg()->id() << " because the committed config has OpId index " |
| << committed_config_opid_index << ". The config change we are ignoring is: " |
| << "Old config: { " << SecureShortDebugString(change_record->old_config()) << " }. " |
| << "New config: { " << SecureShortDebugString(new_config) << " }"; |
| } |
| } |
| |
| return pending_->AddPendingOperation(round); |
| } |
| |
| void RaftConsensus::NotifyCommitIndex(int64_t commit_index) { |
| TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex", |
| "tablet", options_.tablet_id, |
| "commit_index", commit_index); |
| |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| // We will process commit notifications while shutting down because a replica |
| // which has initiated a Prepare() / Replicate() may eventually commit even if |
| // its state has changed after the initial Append() / Update(). |
| const State state = state_; |
| if (PREDICT_FALSE(state != kRunning && state != kStopping)) { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: " |
| << "Replica not in running state: " |
| << State_Name(state); |
| return; |
| } |
| |
| pending_->AdvanceCommittedIndex(commit_index); |
| |
| if (cmeta_->active_role() == RaftPeerPB::LEADER) { |
| peer_manager_->SignalRequest(false); |
| } |
| } |
| |
| void RaftConsensus::NotifyTermChange(int64_t term) { |
| TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange", |
| "tablet", options_.tablet_id, |
| "term", term); |
| |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| Status s = CheckRunningUnlocked(); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term " |
| << "(" << term << "): " << s.ToString(); |
| return; |
| } |
| WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term."); |
| } |
| |
| void RaftConsensus::NotifyFailedFollower(const string& uuid, |
| int64_t term, |
| const string& reason) { |
| // Common info used in all of the log messages within this method. |
| string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ", |
| uuid, term, reason); |
| |
| if (!FLAGS_evict_failed_followers) { |
| LOG(INFO) << LogPrefixThreadSafe() << fail_msg |
| << "Eviction of failed followers is disabled. Doing nothing."; |
| return; |
| } |
| |
| RaftConfigPB committed_config; |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| int64_t current_term = CurrentTermUnlocked(); |
| if (current_term != term) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in " |
| << "previous term " << term << ", but a leader election " |
| << "likely occurred since the failure was detected. " |
| << "Doing nothing."; |
| return; |
| } |
| |
| if (cmeta_->has_pending_config()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "There is already a config change operation " |
| << "in progress. Unable to evict follower until it completes. " |
| << "Doing nothing."; |
| return; |
| } |
| committed_config = cmeta_->CommittedConfig(); |
| } |
| |
| // Run config change on thread pool after dropping lock. |
| auto self = shared_from_this(); |
| WARN_NOT_OK(raft_pool_token_->Submit( |
| [self, uuid, committed_config, reason]() { |
| self->TryRemoveFollowerTask(uuid, committed_config, reason); |
| }), |
| LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask"); |
| } |
| |
| void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) { |
| // Run the config change on the raft thread pool. |
| auto self = shared_from_this(); |
| WARN_NOT_OK(raft_pool_token_->Submit( |
| [self, peer_uuid]() { self->TryPromoteNonVoterTask(peer_uuid); }), |
| LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask"); |
| } |
| |
| void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) { |
| const auto& log_prefix = LogPrefixThreadSafe(); |
| LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election"; |
| auto self = shared_from_this(); |
| WARN_NOT_OK(raft_pool_token_->Submit( |
| [self, peer_uuid]() { self->TryStartElectionOnPeerTask(peer_uuid); }), |
| log_prefix + "Unable to start TryStartElectionOnPeerTask"); |
| } |
| |
| void RaftConsensus::NotifyPeerHealthChange() { |
| MarkDirty("Peer health change"); |
| } |
| |
| void RaftConsensus::TryRemoveFollowerTask(const string& uuid, |
| const RaftConfigPB& committed_config, |
| const string& reason) { |
| ChangeConfigRequestPB req; |
| req.set_tablet_id(options_.tablet_id); |
| req.mutable_server()->set_permanent_uuid(uuid); |
| req.set_type(REMOVE_PEER); |
| req.set_cas_config_opid_index(committed_config.opid_index()); |
| LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower " |
| << uuid << " from the Raft config. Reason: " << reason; |
| optional<TabletServerErrorPB::Code> error_code; |
| WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code), |
| LogPrefixThreadSafe() + "Unable to remove follower " + uuid); |
| } |
| |
| void RaftConsensus::TryPromoteNonVoterTask(const string& peer_uuid) { |
| string msg = Substitute("attempt to promote peer $0: ", peer_uuid); |
| int64_t current_committed_config_index; |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| |
| if (cmeta_->has_pending_config()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "there is already a config change operation " |
| << "in progress. Unable to promote follower until it " |
| << "completes. Doing nothing."; |
| return; |
| } |
| |
| // Check if the peer is still part of the current committed config. |
| RaftConfigPB committed_config = cmeta_->CommittedConfig(); |
| current_committed_config_index = committed_config.opid_index(); |
| |
| RaftPeerPB* peer_pb; |
| Status s = GetRaftConfigMember(&committed_config, peer_uuid, &peer_pb); |
| if (!s.ok()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "can't find peer in the " |
| << "current committed config: " |
| << committed_config.ShortDebugString() |
| << ". Doing nothing."; |
| return; |
| } |
| |
| // Also check if the peer it still a NON_VOTER waiting for promotion. |
| if (peer_pb->member_type() != RaftPeerPB::NON_VOTER || !peer_pb->attrs().promote()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << msg << "peer is either no longer a NON_VOTER " |
| << "or not marked for promotion anymore. Current " |
| << "config: " << committed_config.ShortDebugString() |
| << ". Doing nothing."; |
| return; |
| } |
| } |
| |
| ChangeConfigRequestPB req; |
| req.set_tablet_id(options_.tablet_id); |
| req.set_type(MODIFY_PEER); |
| req.mutable_server()->set_permanent_uuid(peer_uuid); |
| req.mutable_server()->set_member_type(RaftPeerPB::VOTER); |
| req.mutable_server()->mutable_attrs()->set_promote(false); |
| req.set_cas_config_opid_index(current_committed_config_index); |
| LOG(INFO) << LogPrefixThreadSafe() << "attempting to promote NON_VOTER " |
| << peer_uuid << " to VOTER"; |
| optional<TabletServerErrorPB::Code> error_code; |
| WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code), |
| LogPrefixThreadSafe() + Substitute("Unable to promote non-voter $0", peer_uuid)); |
| } |
| |
| void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| // Double-check that the peer is a voter in the active config. |
| if (!IsRaftConfigVoter(peer_uuid, cmeta_->ActiveConfig())) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not signalling peer " << peer_uuid |
| << "to start an election: it's not a voter " |
| << "in the active config."; |
| return; |
| } |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid |
| << " to start an election"; |
| WARN_NOT_OK(peer_manager_->StartElection(peer_uuid), |
| Substitute("unable to start election on peer $0", peer_uuid)); |
| } |
| |
| Status RaftConsensus::Update(const ConsensusRequestPB* request, |
| ConsensusResponsePB* response) { |
| ++update_calls_for_tests_; |
| |
| if (PREDICT_FALSE(FLAGS_follower_reject_update_consensus_requests)) { |
| return Status::IllegalState("Rejected: --follower_reject_update_consensus_requests " |
| "is set to true."); |
| } |
| |
| response->set_responder_uuid(peer_uuid()); |
| |
| VLOG_WITH_PREFIX(2) << "Replica received request: " << SecureShortDebugString(*request); |
| |
| // see var declaration |
| std::lock_guard<simple_spinlock> lock(update_lock_); |
| Status s = UpdateReplica(request, response); |
| if (PREDICT_FALSE(VLOG_IS_ON(1))) { |
| if (request->ops().empty()) { |
| VLOG_WITH_PREFIX(1) |
| << Substitute("Replica replied to status only request. Replica: $0. Response: $1", |
| ToString(), SecureShortDebugString(*response)); |
| } |
| } |
| return s; |
| } |
| |
| // Helper function to check if the op is a no-op op. |
| static bool IsConsensusOnlyOperation(OperationType op_type) { |
| return op_type == NO_OP || op_type == CHANGE_CONFIG_OP; |
| } |
| |
| Status RaftConsensus::StartFollowerOpUnlocked(const ReplicateRefPtr& msg) { |
| DCHECK(lock_.is_locked()); |
| |
| if (IsConsensusOnlyOperation(msg->get()->op_type())) { |
| return StartConsensusOnlyRoundUnlocked(msg); |
| } |
| |
| if (PREDICT_FALSE(FLAGS_follower_fail_all_prepare)) { |
| return Status::IllegalState("Rejected: --follower_fail_all_prepare " |
| "is set to true."); |
| } |
| |
| VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting op: " |
| << SecureShortDebugString(msg->get()->id()); |
| scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg)); |
| ConsensusRound* round_ptr = round.get(); |
| RETURN_NOT_OK(round_handler_->StartFollowerOp(round)); |
| return AddPendingOperationUnlocked(round_ptr); |
| } |
| |
| bool RaftConsensus::IsSingleVoterConfig() const { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| return cmeta_->CountVotersInConfig(COMMITTED_CONFIG) == 1 && |
| cmeta_->IsVoterInConfig(peer_uuid(), COMMITTED_CONFIG); |
| } |
| |
| string RaftConsensus::LeaderRequest::OpsRangeString() const { |
| string ret; |
| ret.reserve(100); |
| ret.push_back('['); |
| if (!messages.empty()) { |
| const OpId& first_op = (*messages.begin())->get()->id(); |
| const OpId& last_op = (*messages.rbegin())->get()->id(); |
| strings::SubstituteAndAppend(&ret, "$0.$1-$2.$3", |
| first_op.term(), first_op.index(), |
| last_op.term(), last_op.index()); |
| } |
| ret.push_back(']'); |
| return ret; |
| } |
| |
| void RaftConsensus::DeduplicateLeaderRequestUnlocked(const ConsensusRequestPB* rpc_req, |
| LeaderRequest* deduplicated_req) { |
| DCHECK(lock_.is_locked()); |
| |
| // TODO(todd): use queue committed index? |
| int64_t last_committed_index = pending_->GetCommittedIndex(); |
| |
| // The leader's preceding id. |
| deduplicated_req->preceding_opid = &rpc_req->preceding_id(); |
| |
| int64_t dedup_up_to_index = queue_->GetLastOpIdInLog().index(); |
| |
| deduplicated_req->first_message_idx = -1; |
| |
| // In this loop we discard duplicates and advance the leader's preceding id |
| // accordingly. |
| for (int i = 0; i < rpc_req->ops_size(); i++) { |
| const ReplicateMsg* leader_msg = &rpc_req->ops(i); |
| |
| if (leader_msg->id().index() <= last_committed_index) { |
| VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id() |
| << " (already committed)"; |
| deduplicated_req->preceding_opid = &leader_msg->id(); |
| continue; |
| } |
| |
| if (leader_msg->id().index() <= dedup_up_to_index) { |
| // If the index is uncommitted and below our match index, then it must be in the |
| // pendings set. |
| scoped_refptr<ConsensusRound> round = |
| pending_->GetPendingOpByIndexOrNull(leader_msg->id().index()); |
| DCHECK(round) << "Could not find op with index " << leader_msg->id().index() |
| << " in pending set. committed= " << last_committed_index |
| << " dedup=" << dedup_up_to_index; |
| |
| // If the OpIds match, i.e. if they have the same term and id, then this is just |
| // duplicate, we skip... |
| if (OpIdEquals(round->replicate_msg()->id(), leader_msg->id())) { |
| VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id() |
| << " (already replicated)"; |
| deduplicated_req->preceding_opid = &leader_msg->id(); |
| continue; |
| } |
| |
| // ... otherwise we must adjust our match index, i.e. all messages from now on |
| // are "new" |
| dedup_up_to_index = leader_msg->id().index(); |
| } |
| |
| if (deduplicated_req->first_message_idx == - 1) { |
| deduplicated_req->first_message_idx = i; |
| } |
| deduplicated_req->messages.emplace_back( |
| make_scoped_refptr_replicate(new ReplicateMsg(*leader_msg))); |
| } |
| |
| if (deduplicated_req->messages.size() != rpc_req->ops_size()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Deduplicated request from leader. Original: " |
| << rpc_req->preceding_id() << "->" << OpsRangeString(*rpc_req) |
| << " Dedup: " << *deduplicated_req->preceding_opid << "->" |
| << deduplicated_req->OpsRangeString(); |
| } |
| |
| } |
| |
| Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request, |
| ConsensusResponsePB* response) { |
| DCHECK(lock_.is_locked()); |
| // Do term checks first: |
| if (PREDICT_FALSE(request->caller_term() != CurrentTermUnlocked())) { |
| |
| // If less, reject. |
| if (request->caller_term() < CurrentTermUnlocked()) { |
| string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. " |
| "Current term is $2. Ops: $3", |
| |
| request->caller_uuid(), |
| request->caller_term(), |
| CurrentTermUnlocked(), |
| OpsRangeString(*request)); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << msg; |
| FillConsensusResponseError(response, |
| ConsensusErrorPB::INVALID_TERM, |
| Status::IllegalState(msg)); |
| return Status::OK(); |
| } |
| RETURN_NOT_OK(HandleTermAdvanceUnlocked(request->caller_term())); |
| } |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req, |
| ConsensusResponsePB* response) { |
| DCHECK(lock_.is_locked()); |
| |
| bool term_mismatch; |
| if (pending_->IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) { |
| return Status::OK(); |
| } |
| |
| string error_msg = Substitute( |
| "Log matching property violated." |
| " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)", |
| SecureShortDebugString(queue_->GetLastOpIdInLog()), |
| SecureShortDebugString(*req.preceding_opid), |
| term_mismatch ? "term" : "index"); |
| |
| |
| FillConsensusResponseError(response, |
| ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, |
| Status::IllegalState(error_msg)); |
| |
| // Adding a check to eliminate an unnecessary log message in the |
| // scenario where this is the first message from the Leader of a new tablet. |
| if (!OpIdEquals(MakeOpId(1,1), *req.preceding_opid)) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Refusing update from remote peer " |
| << req.leader_uuid << ": " << error_msg; |
| } |
| |
| // If the terms mismatch we abort down to the index before the leader's preceding, |
| // since we know that is the last opid that has a chance of not being overwritten. |
| // Aborting preemptively here avoids us reporting a last received index that is |
| // possibly higher than the leader's causing an avoidable cache miss on the leader's |
| // queue. |
| // |
| // TODO: this isn't just an optimization! if we comment this out, we get |
| // failures on raft_consensus-itest a couple percent of the time! Should investigate |
| // why this is actually critical to do here, as opposed to just on requests that |
| // append some ops. |
| if (term_mismatch) { |
| TruncateAndAbortOpsAfterUnlocked(req.preceding_opid->index() - 1); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) { |
| DCHECK(lock_.is_locked()); |
| pending_->AbortOpsAfter(truncate_after_index); |
| queue_->TruncateOpsAfter(truncate_after_index); |
| } |
| |
| Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request, |
| ConsensusResponsePB* response, |
| LeaderRequest* deduped_req) { |
| DCHECK(lock_.is_locked()); |
| |
| if (PREDICT_FALSE(request->has_deprecated_committed_index() || |
| !request->has_all_replicated_index())) { |
| return Status::InvalidArgument("Leader appears to be running an earlier version " |
| "of Kudu. Please shut down and upgrade all servers " |
| "before restarting."); |
| } |
| |
| DeduplicateLeaderRequestUnlocked(request, deduped_req); |
| |
| // This is an additional check for KUDU-639 that makes sure the message's index |
| // and term are in the right sequence in the request, after we've deduplicated |
| // them. We do this before we change any of the internal state. |
| // |
| // TODO move this to raft_consensus-state or whatever we transform that into. |
| // We should be able to do this check for each append, but right now the way |
| // we initialize raft_consensus-state is preventing us from doing so. |
| const OpId* prev = deduped_req->preceding_opid; |
| for (const ReplicateRefPtr& message : deduped_req->messages) { |
| Status s = PendingRounds::CheckOpInSequence(*prev, message->get()->id()); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: " |
| << s.ToString() << ". Leader Request: " << SecureShortDebugString(*request); |
| return s; |
| } |
| prev = &message->get()->id(); |
| } |
| |
| RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response)); |
| |
| if (response->status().has_error()) { |
| return Status::OK(); |
| } |
| |
| RETURN_NOT_OK(EnforceLogMatchingPropertyMatchesUnlocked(*deduped_req, response)); |
| |
| if (response->status().has_error()) { |
| return Status::OK(); |
| } |
| |
| // If the first of the messages to apply is not in our log, either it follows the last |
| // received message or it replaces some in-flight. |
| if (!deduped_req->messages.empty()) { |
| |
| bool term_mismatch; |
| CHECK(!pending_->IsOpCommittedOrPending(deduped_req->messages[0]->get()->id(), &term_mismatch)); |
| |
| // If the index is in our log but the terms are not the same abort down to the leader's |
| // preceding id. |
| if (term_mismatch) { |
| TruncateAndAbortOpsAfterUnlocked(deduped_req->preceding_opid->index()); |
| } |
| } |
| |
| // If all of the above logic was successful then we can consider this to be |
| // the effective leader of the configuration. If they are not currently marked as |
| // the leader locally, mark them as leader now. |
| const string& caller_uuid = request->caller_uuid(); |
| if (PREDICT_FALSE(HasLeaderUnlocked() && |
| GetLeaderUuidUnlocked() != caller_uuid)) { |
| LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected new leader in same term! " |
| << "Existing leader UUID: " << GetLeaderUuidUnlocked() << ", " |
| << "new leader UUID: " << caller_uuid; |
| } |
| if (PREDICT_FALSE(!HasLeaderUnlocked())) { |
| SetLeaderUuidUnlocked(request->caller_uuid()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, |
| ConsensusResponsePB* response) { |
| TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica", |
| "peer", peer_uuid(), |
| "tablet", options_.tablet_id); |
| Synchronizer log_synchronizer; |
| StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback(); |
| |
| |
| // The ordering of the following operations is crucial, read on for details. |
| // |
| // The main requirements explained in more detail below are: |
| // |
| // 1) We must enqueue the prepares before we write to our local log. |
| // 2) If we were able to enqueue a prepare then we must be able to log it. |
| // 3) If we fail to enqueue a prepare, we must not attempt to enqueue any |
| // later-indexed prepare or apply. |
| // |
| // See below for detailed rationale. |
| // |
| // The steps are: |
| // |
| // 0 - Split/Dedup |
| // |
| // We split the operations into replicates and commits and make sure that we don't |
| // don't do anything on operations we've already received in a previous call. |
| // This essentially makes this method idempotent. |
| // |
| // 1 - We mark as many pending ops as committed as we can. |
| // |
| // We may have some pending ops that, according to the leader, are now |
| // committed. We Apply them early, because: |
| // - Soon (step 2) we may reject the call due to excessive memory pressure. One |
| // way to relieve the pressure is by flushing the MRS, and applying these |
| // ops may unblock an in-flight Flush(). |
| // - The Apply and subsequent Prepares (step 2) can take place concurrently. |
| // |
| // 2 - We enqueue the Prepare of the ops. |
| // |
| // The actual prepares are enqueued in order but happen asynchronously so we don't |
| // have decoding/acquiring locks on the critical path. |
| // |
| // We need to do this now for a number of reasons: |
| // - Prepares, by themselves, are inconsequential, i.e. they do not mutate the |
| // state machine so, were we to crash afterwards, having the prepares in-flight |
| // won't hurt. |
| // - Prepares depend on factors external to consensus (the op drivers and |
| // the TabletReplica) so if for some reason they cannot be enqueued we must know |
| // before we try write them to the WAL. Once enqueued, we assume that prepare will |
| // always succeed on a replica op (because the leader already prepared them |
| // successfully, and thus we know they are valid). |
| // - The prepares corresponding to every operation that was logged must be in-flight |
| // first. This because should we need to abort certain ops (say a new leader |
| // says they are not committed) we need to have those prepares in-flight so that |
| // the ops can be continued (in the abort path). |
| // - Failure to enqueue prepares is OK, we can continue and let the leader know that |
| // we only went so far. The leader will re-send the remaining messages. |
| // - Prepares represent new ops, and ops consume memory. Thus, if the |
| // overall memory pressure on the server is too high, we will reject the prepares. |
| // |
| // 3 - We enqueue the writes to the WAL. |
| // |
| // We enqueue writes to the WAL, but only the operations that were successfully |
| // enqueued for prepare (for the reasons introduced above). This means that even |
| // if a prepare fails to enqueue, if any of the previous prepares were successfully |
| // submitted they must be written to the WAL. |
| // If writing to the WAL fails, we're in an inconsistent state and we crash. In this |
| // case, no one will ever know of the ops we previously prepared so those are |
| // inconsequential. |
| // |
| // 4 - We mark the ops as committed. |
| // |
| // For each op which has been committed by the leader, we update the |
| // op state to reflect that. If the logging has already succeeded for that |
| // op, this will trigger the Apply phase. Otherwise, Apply will be triggered |
| // when the logging completes. In both cases the Apply phase executes asynchronously. |
| // This must, of course, happen after the prepares have been triggered as the same batch |
| // can both replicate/prepare and commit/apply an operation. |
| // |
| // Currently, if a prepare failed to enqueue we still trigger all applies for operations |
| // with an id lower than it (if we have them). This is important now as the leader will |
| // not re-send those commit messages. This will be moot when we move to the commit |
| // commitIndex way of doing things as we can simply ignore the applies as we know |
| // they will be triggered with the next successful batch. |
| // |
| // 5 - We wait for the writes to be durable. |
| // |
| // Before replying to the leader we wait for the writes to be durable. We then |
| // just update the last replicated watermark and respond. |
| // |
| // TODO - These failure scenarios need to be exercised in an unit |
| // test. Moreover we need to add more fault injection spots (well that |
| // and actually use the) for each of these steps. |
| // This will be done in a follow up patch. |
| TRACE("Updating replica for $0 ops", request->ops_size()); |
| |
| // The deduplicated request. |
| LeaderRequest deduped_req; |
| auto& messages = deduped_req.messages; |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| RETURN_NOT_OK(CheckRunningUnlocked()); |
| if (!cmeta_->IsMemberInConfig(peer_uuid(), ACTIVE_CONFIG)) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config"; |
| } |
| |
| deduped_req.leader_uuid = request->caller_uuid(); |
| |
| RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req)); |
| if (response->status().has_error()) { |
| // We had an error, like an invalid term, we still fill the response. |
| FillConsensusResponseOKUnlocked(response); |
| return Status::OK(); |
| } |
| |
| // As soon as we decide to accept the message: |
| // * snooze the failure detector |
| // * prohibit voting for anyone for the minimum election timeout |
| // We are guaranteed to be acting as a FOLLOWER at this point by the above |
| // sanity check. |
| SnoozeFailureDetector(); |
| WithholdVotes(); |
| |
| last_leader_communication_time_micros_ = GetMonoTimeMicros(); |
| |
| // Reset the 'failed_elections_since_stable_leader' metric now that we've |
| // accepted an update from the established leader. This is done in addition |
| // to the reset of the value in SetLeaderUuidUnlocked() because there is |
| // a potential race between resetting the failed elections count in |
| // SetLeaderUuidUnlocked() and incrementing after a failed election |
| // if another replica was elected leader in an election concurrent with |
| // the one called by this replica. |
| failed_elections_since_stable_leader_ = 0; |
| num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_); |
| |
| // We update the lag metrics here in addition to after appending to the queue so the |
| // metrics get updated even when the operation is rejected. |
| queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader()); |
| |
| // 1 - Early commit pending (and committed) ops |
| |
| // What should we commit? |
| // 1. As many pending ops as we can, except... |
| // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and... |
| // 3. ...the leader's committed index is always our upper bound. |
| const int64_t early_apply_up_to = std::min({ |
| pending_->GetLastPendingOpOpId().index(), |
| deduped_req.preceding_opid->index(), |
| request->committed_index()}); |
| |
| VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to |
| << ", Last pending opid index: " |
| << pending_->GetLastPendingOpOpId().index() |
| << ", preceding opid index: " |
| << deduped_req.preceding_opid->index() |
| << ", requested index: " << request->committed_index(); |
| TRACE("Early marking committed up to index $0", early_apply_up_to); |
| CHECK_OK(pending_->AdvanceCommittedIndex(early_apply_up_to)); |
| |
| // 2 - Enqueue the prepares |
| |
| TRACE("Triggering prepare for $0 ops", messages.size()); |
| |
| if (PREDICT_TRUE(!messages.empty())) { |
| // This request contains at least one message, and is likely to increase |
| // our memory pressure. |
| double capacity_pct; |
| if (process_memory::SoftLimitExceeded(&capacity_pct)) { |
| if (follower_memory_pressure_rejections_) follower_memory_pressure_rejections_->Increment(); |
| string msg = StringPrintf( |
| "Soft memory limit exceeded (at %.2f%% of capacity)", |
| capacity_pct); |
| if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) { |
| KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting consensus request: " << msg |
| << THROTTLE_MSG; |
| } else { |
| KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting consensus request: " << msg |
| << THROTTLE_MSG; |
| } |
| return Status::ServiceUnavailable(msg); |
| } |
| } |
| |
| Status prepare_status; |
| auto iter = messages.begin(); |
| while (iter != messages.end()) { |
| prepare_status = StartFollowerOpUnlocked(*iter); |
| if (PREDICT_FALSE(!prepare_status.ok())) { |
| break; |
| } |
| // TODO(dralves) Without leader leases this shouldn't be allowed to fail. |
| // Once we have that functionality we'll have to revisit this. |
| CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get())); |
| ++iter; |
| } |
| |
| // If we stopped before reaching the end we failed to prepare some message(s) and need |
| // to perform cleanup, namely trimming deduped_req.messages to only contain the messages |
| // that were actually prepared, and deleting the other ones since we've taken ownership |
| // when we first deduped. |
| if (iter != messages.end()) { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << Substitute( |
| "Could not prepare op '$0' and following $1 ops. " |
| "Status for this op: $2", |
| (*iter)->get()->id().ShortDebugString(), |
| std::distance(iter, messages.end()) - 1, |
| prepare_status.ToString()); |
| iter = messages.erase(iter, messages.end()); |
| |
| // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing |
| // else we can do. The leader will detect this and retry later. |
| if (messages.empty()) { |
| string msg = Substitute("Rejecting Update request from peer $0 for term $1. " |
| "Could not prepare a single op due to: $2", |
| request->caller_uuid(), |
| request->caller_term(), |
| prepare_status.ToString()); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << msg; |
| FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE, |
| Status::IllegalState(msg)); |
| FillConsensusResponseOKUnlocked(response); |
| return Status::OK(); |
| } |
| } |
| |
| // All ops that are going to be prepared were started, advance the safe timestamp. |
| // TODO(dralves) This is only correct because the queue only sets safe time when the request is |
| // an empty heartbeat. If we actually start setting this on a consensus request along with |
| // actual messages we need to be careful to ignore it if any of the messages fails to prepare. |
| if (request->has_safe_timestamp()) { |
| time_manager_->AdvanceSafeTime(Timestamp(request->safe_timestamp())); |
| } |
| |
| OpId last_from_leader; |
| // 3 - Enqueue the writes. |
| // Now that we've triggered the prepares enqueue the operations to be written |
| // to the WAL. |
| if (PREDICT_TRUE(!messages.empty())) { |
| last_from_leader = messages.back()->get()->id(); |
| // Trigger the log append asap, if fsync() is on this might take a while |
| // and we can't reply until this is done. |
| // |
| // Since we've prepared, we need to be able to append (or we risk trying to apply |
| // later something that wasn't logged). We crash if we can't. |
| CHECK_OK(queue_->AppendOperations(messages, sync_status_cb)); |
| } else { |
| last_from_leader = *deduped_req.preceding_opid; |
| } |
| |
| // 4 - Mark ops as committed |
| |
| // Choose the last operation to be applied. This will either be 'committed_index', if |
| // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of |
| // the last successfully enqueued prepare, if some prepare failed to enqueue. |
| int64_t apply_up_to; |
| if (last_from_leader.index() < request->committed_index()) { |
| // we should never apply anything later than what we received in this request |
| apply_up_to = last_from_leader.index(); |
| |
| VLOG_WITH_PREFIX_UNLOCKED(2) << "Received commit index " |
| << request->committed_index() << " from the leader but only" |
| << " marked up to " << apply_up_to << " as committed."; |
| } else { |
| apply_up_to = request->committed_index(); |
| } |
| |
| VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to; |
| TRACE("Marking committed up to $0", apply_up_to); |
| CHECK_OK(pending_->AdvanceCommittedIndex(apply_up_to)); |
| queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index()); |
| |
| // If any messages failed to be started locally, then we already have removed them |
| // from 'deduped_req' at this point. So, 'last_from_leader' is the last one that |
| // we might apply. |
| last_received_cur_leader_ = last_from_leader; |
| |
| // Fill the response with the current state. We will not mutate anymore state until |
| // we actually reply to the leader, we'll just wait for the messages to be durable. |
| FillConsensusResponseOKUnlocked(response); |
| } |
| // Release the lock while we wait for the log append to finish so that commits can go through. |
| // We'll re-acquire it before we update the state again. |
| |
| // Update the last replicated op id |
| if (!messages.empty()) { |
| |
| // 5 - We wait for the writes to be durable. |
| |
| // Note that this is safe because dist consensus now only supports a single outstanding |
| // request at a time and this way we can allow commits to proceed while we wait. |
| TRACE("Waiting on the replicates to finish logging"); |
| TRACE_EVENT0("consensus", "Wait for log"); |
| Status s; |
| do { |
| s = log_synchronizer.WaitFor( |
| MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms)); |
| // If just waiting for our log append to finish lets snooze the timer. |
| // We don't want to fire leader election nor accept vote requests because |
| // we're still processing the Raft message from the leader, |
| // waiting on our own log. |
| if (s.IsTimedOut()) { |
| SnoozeFailureDetector(); |
| WithholdVotes(); |
| } |
| } while (s.IsTimedOut()); |
| RETURN_NOT_OK(s); |
| |
| TRACE("finished"); |
| } |
| |
| VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString() |
| << ". Request: " << SecureShortDebugString(*request); |
| |
| TRACE("UpdateReplicas() finished"); |
| return Status::OK(); |
| } |
| |
| void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) { |
| DCHECK(lock_.is_locked()); |
| TRACE("Filling consensus response to leader."); |
| response->set_responder_term(CurrentTermUnlocked()); |
| response->mutable_status()->mutable_last_received()->CopyFrom( |
| queue_->GetLastOpIdInLog()); |
| response->mutable_status()->mutable_last_received_current_leader()->CopyFrom( |
| last_received_cur_leader_); |
| response->mutable_status()->set_last_committed_idx( |
| queue_->GetCommittedIndex()); |
| if (PREDICT_TRUE(server_ctx_.quiescing) && server_ctx_.quiescing->load()) { |
| response->set_server_quiescing(true); |
| } |
| } |
| |
| void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response, |
| ConsensusErrorPB::Code error_code, |
| const Status& status) { |
| ConsensusErrorPB* error = response->mutable_status()->mutable_error(); |
| error->set_code(error_code); |
| StatusToPB(status, error->mutable_status()); |
| } |
| |
| Status RaftConsensus::RequestVote(const VoteRequestPB* request, |
| TabletVotingState tablet_voting_state, |
| VoteResponsePB* response) { |
| TRACE_EVENT2("consensus", "RaftConsensus::RequestVote", |
| "peer", peer_uuid(), |
| "tablet", options_.tablet_id); |
| response->set_responder_uuid(peer_uuid()); |
| |
| // If we've heard recently from the leader, then we should ignore the request. |
| // It might be from a "disruptive" server. This could happen in a few cases: |
| // |
| // 1) Network partitions |
| // If the leader can talk to a majority of the nodes, but is partitioned from a |
| // bad node, the bad node's failure detector will trigger. If the bad node is |
| // able to reach other nodes in the cluster, it will continuously trigger elections. |
| // |
| // 2) An abandoned node |
| // It's possible that a node has fallen behind the log GC mark of the leader. In that |
| // case, the leader will stop sending it requests. Eventually, the the configuration |
| // will change to eject the abandoned node, but until that point, we don't want the |
| // abandoned follower to disturb the other nodes. |
| // |
| // 3) Other dynamic scenarios with a stale former leader |
| // This is a generalization of the case 1. It's possible that a stale former |
| // leader detects it's not a leader anymore at some point, but a majority |
| // of replicas has elected a new leader already. |
| // |
| // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf |
| // section 4.2.3. |
| if (PREDICT_TRUE(!request->ignore_live_leader()) && |
| MonoTime::Now() < withhold_votes_until_) { |
| return RequestVoteRespondLeaderIsAlive(request, response); |
| } |
| |
| // We must acquire the update lock in order to ensure that this vote action |
| // takes place between requests. |
| // Lock ordering: update_lock_ must be acquired before lock_. |
| std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock); |
| if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { |
| update_guard.try_lock(); |
| } else { |
| // If failure detection is not enabled, then we can't just reject the vote, |
| // because there will be no automatic retry later. So, block for the lock. |
| update_guard.lock(); |
| } |
| if (!update_guard.owns_lock()) { |
| // There is another vote or update concurrent with the vote. In that case, that |
| // other request is likely to reset the timer, and we'll end up just voting |
| // "NO" after waiting. To avoid starving RPC handlers and causing cascading |
| // timeouts, just vote a quick NO. |
| return RequestVoteRespondIsBusy(request, response); |
| } |
| |
| // Acquire the replica state lock so we can read / modify the consensus state. |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| |
| // Ensure our lifecycle state is compatible with voting. |
| // If RaftConsensus is running, we use the latest OpId from the WAL to vote. |
| // Otherwise, we must be voting while tombstoned. |
| OpId local_last_logged_opid; |
| const State state = state_; |
| switch (state) { |
| case kShutdown: |
| return Status::IllegalState("cannot vote while shut down"); |
| case kRunning: |
| // Note: it is (theoretically) possible for 'tombstone_last_logged_opid' |
| // to be passed in and by the time we reach here the state is kRunning. |
| // That may occur when a vote request comes in at the end of a tablet |
| // copy and then tablet bootstrap completes quickly. In that case, we |
| // ignore the passed-in value and use the latest OpId from our queue. |
| local_last_logged_opid = queue_->GetLastOpIdInLog(); |
| break; |
| default: |
| if (!tablet_voting_state.tombstone_last_logged_opid_) { |
| return Status::IllegalState("must be running to vote when last-logged opid is not known"); |
| } |
| if (!FLAGS_raft_enable_tombstoned_voting) { |
| return Status::IllegalState("must be running to vote when tombstoned voting is disabled"); |
| } |
| local_last_logged_opid = *(tablet_voting_state.tombstone_last_logged_opid_); |
| if (tablet_voting_state.data_state_ == tablet::TABLET_DATA_COPYING) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while copying based on last-logged opid " |
| << local_last_logged_opid; |
| } else if (tablet_voting_state.data_state_ == tablet::TABLET_DATA_TOMBSTONED) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while tombstoned based on last-logged opid " |
| << local_last_logged_opid; |
| } |
| break; |
| } |
| DCHECK(local_last_logged_opid.IsInitialized()); |
| |
| // If the node is not in the configuration, allow the vote (this is required by Raft) |
| // but log an informational message anyway. |
| if (!cmeta_->IsMemberInConfig(request->candidate_uuid(), ACTIVE_CONFIG)) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Handling vote request from an unknown peer " |
| << request->candidate_uuid(); |
| } |
| |
| // Recheck the heard-from-the-leader condition. There is a slight chance |
| // that a heartbeat from the leader replica registers after the first check |
| // in the very beginning of this method and before lock_ is acquired. This |
| // extra check costs a little, but it helps in avoiding extra election rounds |
| // and disruptive transfers of the replica leadership. |
| if (PREDICT_TRUE(!request->ignore_live_leader()) && |
| MonoTime::Now() < withhold_votes_until_) { |
| return RequestVoteRespondLeaderIsAlive(request, response); |
| } |
| |
| // Candidate is running behind. |
| if (request->candidate_term() < CurrentTermUnlocked()) { |
| return RequestVoteRespondInvalidTerm(request, response); |
| } |
| |
| // We already voted this term. |
| if (request->candidate_term() == CurrentTermUnlocked() && |
| HasVotedCurrentTermUnlocked()) { |
| |
| // Already voted for the same candidate in the current term. |
| if (GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) { |
| return RequestVoteRespondVoteAlreadyGranted(request, response); |
| } |
| |
| // Voted for someone else in current term. |
| return RequestVoteRespondAlreadyVotedForOther(request, response); |
| } |
| |
| // Candidate must have last-logged OpId at least as large as our own to get |
| // our vote. |
| bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(), |
| local_last_logged_opid); |
| |
| // Record the term advancement if necessary. We don't do so in the case of |
| // pre-elections because it's possible that the node who called the pre-election |
| // has actually now successfully become leader of the prior term, in which case |
| // bumping our term here would disrupt it. |
| if (!request->is_pre_election() && |
| request->candidate_term() > CurrentTermUnlocked()) { |
| // If we are going to vote for this peer, then we will flush the consensus metadata |
| // to disk below when we record the vote, and we can skip flushing the term advancement |
| // to disk here. |
| auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK; |
| RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush), |
| Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1", |
| CurrentTermUnlocked(), request->candidate_term())); |
| } |
| |
| if (!vote_yes) { |
| return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, response); |
| } |
| |
| // Passed all our checks. Vote granted. |
| return RequestVoteRespondVoteGranted(request, response); |
| } |
| |
| Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, |
| StatusCallback client_cb, |
| optional<TabletServerErrorPB::Code>* error_code) { |
| TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig", |
| "peer", peer_uuid(), |
| "tablet", options_.tablet_id); |
| |
| BulkChangeConfigRequestPB bulk_req; |
| *bulk_req.mutable_tablet_id() = req.tablet_id(); |
| |
| if (req.has_dest_uuid()) { |
| *bulk_req.mutable_dest_uuid() = req.dest_uuid(); |
| } |
| if (req.has_cas_config_opid_index()) { |
| bulk_req.set_cas_config_opid_index(req.cas_config_opid_index()); |
| } |
| auto* change = bulk_req.add_config_changes(); |
| if (req.has_type()) { |
| change->set_type(req.type()); |
| } |
| if (req.has_server()) { |
| *change->mutable_peer() = req.server(); |
| } |
| |
| return BulkChangeConfig(bulk_req, std::move(client_cb), error_code); |
| } |
| |
| Status RaftConsensus::BulkChangeConfig(const BulkChangeConfigRequestPB& req, |
| StatusCallback client_cb, |
| optional<TabletServerErrorPB::Code>* error_code) { |
| TRACE_EVENT2("consensus", "RaftConsensus::BulkChangeConfig", |
| "peer", peer_uuid(), |
| "tablet", options_.tablet_id); |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| RETURN_NOT_OK(CheckRunningUnlocked()); |
| RETURN_NOT_OK(CheckActiveLeaderUnlocked()); |
| RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked()); |
| |
| // We are required by Raft to reject config change operations until we have |
| // committed at least one operation in our current term as leader. |
| // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E |
| if (!queue_->IsCommittedIndexInCurrentTerm()) { |
| return Status::IllegalState("Leader has not yet committed an operation in its own term"); |
| } |
| |
| const RaftConfigPB committed_config = cmeta_->CommittedConfig(); |
| |
| // Support atomic ChangeConfig requests. |
| if (req.has_cas_config_opid_index()) { |
| if (committed_config.opid_index() != req.cas_config_opid_index()) { |
| *error_code = TabletServerErrorPB::CAS_FAILED; |
| return Status::IllegalState(Substitute("Request specified cas_config_opid_index " |
| "of $0 but the committed config has opid_index " |
| "of $1", |
| req.cas_config_opid_index(), |
| committed_config.opid_index())); |
| } |
| } |
| |
| // 'new_config' will be modified in-place and validated before being used |
| // as the new Raft configuration. |
| RaftConfigPB new_config = committed_config; |
| |
| // Enforce the "one by one" config change rules, even with the bulk API. |
| // Keep track of total voters added, including non-voters promoted to |
| // voters, and removed, including voters demoted to non-voters. |
| int num_voters_modified = 0; |
| |
| // A record of the peers being modified so that we can enforce only one |
| // change per peer per request. |
| unordered_set<string> peers_modified; |
| |
| for (const auto& item : req.config_changes()) { |
| if (PREDICT_FALSE(!item.has_type())) { |
| *error_code = TabletServerErrorPB::INVALID_CONFIG; |
| return Status::InvalidArgument("Must specify 'type' argument", |
| SecureShortDebugString(req)); |
| } |
| if (PREDICT_FALSE(!item.has_peer())) { |
| *error_code = TabletServerErrorPB::INVALID_CONFIG; |
| return Status::InvalidArgument("Must specify 'peer' argument", |
| SecureShortDebugString(req)); |
| } |
| |
| ChangeConfigType type = item.type(); |
| const RaftPeerPB& peer = item.peer(); |
| |
| if (PREDICT_FALSE(!peer.has_permanent_uuid())) { |
| return Status::InvalidArgument("peer must have permanent_uuid specified", |
| SecureShortDebugString(req)); |
| } |
| |
| if (!InsertIfNotPresent(&peers_modified, peer.permanent_uuid())) { |
| return Status::InvalidArgument( |
| Substitute("only one change allowed per peer: peer $0 appears more " |
| "than once in the config change request", |
| peer.permanent_uuid()), |
| SecureShortDebugString(req)); |
| } |
| |
| const string& server_uuid = peer.permanent_uuid(); |
| switch (type) { |
| case ADD_PEER: |
| // Ensure the peer we are adding is not already a member of the configuration. |
| if (IsRaftConfigMember(server_uuid, committed_config)) { |
| return Status::InvalidArgument( |
| Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1", |
| server_uuid, SecureShortDebugString(committed_config))); |
| } |
| if (!peer.has_member_type()) { |
| return Status::InvalidArgument("peer must have member_type specified", |
| SecureShortDebugString(req)); |
| } |
| if (!peer.has_last_known_addr()) { |
| return Status::InvalidArgument("peer must have last_known_addr specified", |
| SecureShortDebugString(req)); |
| } |
| if (peer.member_type() == RaftPeerPB::VOTER) { |
| num_voters_modified++; |
| } |
| *new_config.add_peers() = peer; |
| break; |
| |
| case REMOVE_PEER: |
| if (server_uuid == peer_uuid()) { |
| return Status::InvalidArgument( |
| Substitute("Cannot remove peer $0 from the config because it is the leader. " |
| "Force another leader to be elected to remove this peer. " |
| "Consensus state: $1", |
| server_uuid, |
| SecureShortDebugString(cmeta_->ToConsensusStatePB()))); |
| } |
| if (!RemoveFromRaftConfig(&new_config, server_uuid)) { |
| return Status::NotFound( |
| Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1", |
| server_uuid, SecureShortDebugString(committed_config))); |
| } |
| if (IsRaftConfigVoter(server_uuid, committed_config)) { |
| num_voters_modified++; |
| } |
| break; |
| |
| case MODIFY_PEER: { |
| RaftPeerPB* modified_peer; |
| RETURN_NOT_OK(GetRaftConfigMember(&new_config, server_uuid, &modified_peer)); |
| const RaftPeerPB orig_peer(*modified_peer); |
| // Override 'member_type' and items within 'attrs' only if they are |
| // explicitly passed in the request. At least one field must be |
| // modified to be a valid request. |
| if (peer.has_member_type() && peer.member_type() != modified_peer->member_type()) { |
| if (modified_peer->member_type() == RaftPeerPB::VOTER || |
| peer.member_type() == RaftPeerPB::VOTER) { |
| // This is a 'member_type' change involving a VOTER, i.e. a |
| // promotion or demotion. |
| num_voters_modified++; |
| } |
| // A leader must be forced to step down before demoting it. |
| if (server_uuid == peer_uuid()) { |
| return Status::InvalidArgument( |
| Substitute("Cannot modify member type of peer $0 because it is the leader. " |
| "Cause another leader to be elected to modify this peer. " |
| "Consensus state: $1", |
| server_uuid, |
| SecureShortDebugString(cmeta_->ToConsensusStatePB()))); |
| } |
| modified_peer->set_member_type(peer.member_type()); |
| } |
| if (peer.attrs().has_promote()) { |
| modified_peer->mutable_attrs()->set_promote(peer.attrs().promote()); |
| } |
| if (peer.attrs().has_replace()) { |
| modified_peer->mutable_attrs()->set_replace(peer.attrs().replace()); |
| } |
| // Ensure that MODIFY_PEER actually modified something. |
| if (MessageDifferencer::Equals(orig_peer, *modified_peer)) { |
| return Status::InvalidArgument("must modify a field when calling MODIFY_PEER"); |
| } |
| break; |
| } |
| |
| default: |
| return Status::NotSupported(Substitute( |
| "$0: unsupported type of configuration change", |
| ChangeConfigType_Name(type))); |
| } |
| } |
| |
| // Don't allow no-op config changes to be committed. |
| if (MessageDifferencer::Equals(committed_config, new_config)) { |
| return Status::InvalidArgument("requested configuration change does not " |
| "actually modify the config", |
| SecureShortDebugString(req)); |
| } |
| |
| // Ensure this wasn't an illegal bulk change. |
| if (num_voters_modified > 1) { |
| return Status::InvalidArgument("it is not safe to modify the VOTER status " |
| "of more than one peer at a time", |
| SecureShortDebugString(req)); |
| } |
| |
| // We'll assign a new opid_index to this config change. |
| new_config.clear_opid_index(); |
| |
| RETURN_NOT_OK(ReplicateConfigChangeUnlocked( |
| committed_config, std::move(new_config), |
| [this, client_cb](const Status& s) { |
| this->MarkDirtyOnSuccess("Config change replication complete", |
| client_cb, s); |
| })); |
| } // Release lock before signaling request. |
| peer_manager_->SignalRequest(); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::UnsafeChangeConfig( |
| const UnsafeChangeConfigRequestPB& req, |
| optional<tserver::TabletServerErrorPB::Code>* error_code) { |
| if (PREDICT_FALSE(!req.has_new_config())) { |
| *error_code = TabletServerErrorPB::INVALID_CONFIG; |
| return Status::InvalidArgument("Request must contain 'new_config' argument " |
| "to UnsafeChangeConfig()", SecureShortDebugString(req)); |
| } |
| if (PREDICT_FALSE(!req.has_caller_id())) { |
| *error_code = TabletServerErrorPB::INVALID_CONFIG; |
| return Status::InvalidArgument("Must specify 'caller_id' argument to UnsafeChangeConfig()", |
| SecureShortDebugString(req)); |
| } |
| |
| // Grab the committed config and current term on this node. |
| int64_t current_term; |
| RaftConfigPB committed_config; |
| int64_t all_replicated_index; |
| int64_t last_committed_index; |
| OpId preceding_opid; |
| uint64_t msg_timestamp; |
| { |
| // Take the snapshot of the replica state and queue state so that |
| // we can stick them in the consensus update request later. |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| current_term = CurrentTermUnlocked(); |
| committed_config = cmeta_->CommittedConfig(); |
| if (cmeta_->has_pending_config()) { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) |
| << "Replica has a pending config, but the new config " |
| << "will be unsafely changed anyway. " |
| << "Currently pending config on the node: " |
| << SecureShortDebugString(cmeta_->PendingConfig()); |
| } |
| all_replicated_index = queue_->GetAllReplicatedIndex(); |
| last_committed_index = queue_->GetCommittedIndex(); |
| preceding_opid = queue_->GetLastOpIdInLog(); |
| msg_timestamp = time_manager_->GetSerialTimestamp().value(); |
| } |
| |
| // Validate that passed replica uuids are part of the committed config |
| // on this node. This allows a manual recovery tool to only have to specify |
| // the uuid of each replica in the new config without having to know the |
| // addresses of each server (since we can get the address information from |
| // the committed config). Additionally, only a subset of the committed config |
| // is required for typical cluster repair scenarios. |
| std::unordered_set<string> retained_peer_uuids; |
| const RaftConfigPB& config = req.new_config(); |
| for (const RaftPeerPB& new_peer : config.peers()) { |
| const string& peer_uuid = new_peer.permanent_uuid(); |
| retained_peer_uuids.insert(peer_uuid); |
| if (!IsRaftConfigMember(peer_uuid, committed_config)) { |
| *error_code = TabletServerErrorPB::INVALID_CONFIG; |
| return Status::InvalidArgument(Substitute("Peer with uuid $0 is not in the committed " |
| "config on this replica, rejecting the " |
| "unsafe config change request for tablet $1. " |
| "Committed config: $2", |
| peer_uuid, req.tablet_id(), |
| SecureShortDebugString(committed_config))); |
| } |
| } |
| |
| RaftConfigPB new_config = committed_config; |
| for (const auto& peer : committed_config.peers()) { |
| const string& peer_uuid = peer.permanent_uuid(); |
| if (!ContainsKey(retained_peer_uuids, peer_uuid)) { |
| CHECK(RemoveFromRaftConfig(&new_config, peer_uuid)); |
| } |
| } |
| // Check that local peer is part of the new config and is a VOTER. |
| // Although it is valid for a local replica to not have itself |
| // in the committed config, it is rare and a replica without itself |
| // in the latest config is definitely not caught up with the latest leader's log. |
| if (!IsRaftConfigVoter(peer_uuid(), new_config)) { |
| *error_code = TabletServerErrorPB::INVALID_CONFIG; |
| return Status::InvalidArgument(Substitute("Local replica uuid $0 is not " |
| "a VOTER in the new config, " |
| "rejecting the unsafe config " |
| "change request for tablet $1. " |
| "Rejected config: $2" , |
| peer_uuid(), req.tablet_id(), |
| SecureShortDebugString(new_config))); |
| } |
| new_config.set_unsafe_config_change(true); |
| int64_t replicate_opid_index = preceding_opid.index() + 1; |
| new_config.set_opid_index(replicate_opid_index); |
| |
| // Sanity check the new config. 'type' is irrelevant here. |
| Status s = VerifyRaftConfig(new_config); |
| if (!s.ok()) { |
| *error_code = TabletServerErrorPB::INVALID_CONFIG; |
| return Status::InvalidArgument(Substitute("The resulting new config for tablet $0 " |
| "from passed parameters has failed raft " |
| "config sanity check: $1", |
| req.tablet_id(), s.ToString())); |
| } |
| |
| // Prepare the consensus request as if the request is being generated |
| // from a different leader. |
| ConsensusRequestPB consensus_req; |
| consensus_req.set_caller_uuid(req.caller_id()); |
| // Bumping up the term for the consensus request being generated. |
| // This makes this request appear to come from a new leader that |
| // the local replica doesn't know about yet. If the local replica |
| // happens to be the leader, this will cause it to step down. |
| const int64_t new_term = current_term + 1; |
| consensus_req.set_caller_term(new_term); |
| consensus_req.mutable_preceding_id()->CopyFrom(preceding_opid); |
| consensus_req.set_committed_index(last_committed_index); |
| consensus_req.set_all_replicated_index(all_replicated_index); |
| |
| // Prepare the replicate msg to be replicated. |
| ReplicateMsg* replicate = consensus_req.add_ops(); |
| ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record(); |
| cc_req->set_tablet_id(req.tablet_id()); |
| *cc_req->mutable_old_config() = committed_config; |
| *cc_req->mutable_new_config() = new_config; |
| OpId* id = replicate->mutable_id(); |
| // Bumping up both the term and the opid_index from what's found in the log. |
| id->set_term(new_term); |
| id->set_index(replicate_opid_index); |
| replicate->set_op_type(CHANGE_CONFIG_OP); |
| replicate->set_timestamp(msg_timestamp); |
| |
| VLOG_WITH_PREFIX(3) << "UnsafeChangeConfig: Generated consensus request: " |
| << SecureShortDebugString(consensus_req); |
| |
| LOG_WITH_PREFIX(WARNING) |
| << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, " |
| << "COMMITTED CONFIG: " << SecureShortDebugString(committed_config) |
| << "NEW CONFIG: " << SecureShortDebugString(new_config); |
| |
| ConsensusResponsePB consensus_resp; |
| return Update(&consensus_req, &consensus_resp).AndThen([&consensus_resp]{ |
| return consensus_resp.has_error() |
| ? StatusFromPB(consensus_resp.error().status()) : Status::OK(); |
| }); |
| } |
| |
| void RaftConsensus::Stop() { |
| TRACE_EVENT2("consensus", "RaftConsensus::Shutdown", |
| "peer", peer_uuid(), |
| "tablet", options_.tablet_id); |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| const State state = state_; |
| if (state == kStopping || state == kStopped || state == kShutdown) { |
| return; |
| } |
| // Transition to kStopping state. |
| SetStateUnlocked(kStopping); |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down."; |
| } |
| |
| // Close the peer manager. |
| if (peer_manager_) peer_manager_->Close(); |
| |
| // We must close the queue after we close the peers. |
| if (queue_) { |
| // If we were leader, decrement the number of leaders there are now. |
| if (queue_->IsInLeaderMode() && server_ctx_.num_leaders) { |
| server_ctx_.num_leaders->IncrementBy(-1); |
| } |
| queue_->Close(); |
| } |
| |
| { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| if (pending_) { |
| CHECK_OK(pending_->CancelPendingOps()); |
| } |
| SetStateUnlocked(kStopped); |
| |
| // Clear leader status on Stop(), in case this replica was the leader. If |
| // we don't do this, the log messages still show this node as the leader. |
| // No need to sync it since it's not persistent state. |
| if (cmeta_) { |
| ClearLeaderUnlocked(); |
| } |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!"; |
| } |
| |
| // If we were the leader, stop withholding votes. |
| auto max_time = MonoTime::Max(); |
| withhold_votes_until_.compare_exchange_strong(max_time, MonoTime::Min()); |
| |
| |
| // Shut down things that might acquire locks during destruction. |
| if (raft_pool_token_) raft_pool_token_->Shutdown(); |
| if (failure_detector_) DisableFailureDetector(); |
| } |
| |
| void RaftConsensus::Shutdown() { |
| // Avoid taking locks if already shut down so we don't violate |
| // ThreadRestrictions assertions in the case where the RaftConsensus |
| // destructor runs on the reactor thread due to an election callback being |
| // the last outstanding reference. |
| if (shutdown_) { |
| return; |
| } |
| |
| Stop(); |
| { |
| LockGuard l(lock_); |
| SetStateUnlocked(kShutdown); |
| } |
| shutdown_ = true; |
| } |
| |
| Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg) { |
| DCHECK(lock_.is_locked()); |
| OperationType op_type = msg->get()->op_type(); |
| CHECK(IsConsensusOnlyOperation(op_type)) |
| << "Expected a consensus-only op type, got " << OperationType_Name(op_type) |
| << ": " << SecureShortDebugString(*msg->get()); |
| VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting consensus round: " |
| << SecureShortDebugString(msg->get()->id()); |
| auto client_cb = [this](const Status& s) { |
| this->MarkDirtyOnSuccess("Replicated consensus-only round", |
| &DoNothingStatusCB, s); |
| }; |
| scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg)); |
| auto* round_raw = round.get(); |
| round->SetConsensusReplicatedCallback( |
| [this, round_raw, client_cb](const Status& s) { |
| this->NonTxRoundReplicationFinished(round_raw, client_cb, s); |
| }); |
| return AddPendingOperationUnlocked(round); |
| } |
| |
| Status RaftConsensus::AdvanceTermForTests(int64_t new_term) { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| CHECK_OK(CheckRunningUnlocked()); |
| return HandleTermAdvanceUnlocked(new_term); |
| } |
| |
| string RaftConsensus::GetRequestVoteLogPrefixThreadSafe(const VoteRequestPB& request) const { |
| return Substitute("$0Leader $1election vote request", |
| LogPrefixThreadSafe(), |
| request.is_pre_election() ? "pre-" : ""); |
| } |
| |
| string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const { |
| DCHECK(lock_.is_locked()); |
| return Substitute("$0Leader $1election vote request", |
| LogPrefixUnlocked(), |
| request.is_pre_election() ? "pre-" : ""); |
| } |
| |
| void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) { |
| response->set_responder_term(CurrentTermUnlocked()); |
| response->set_vote_granted(true); |
| } |
| |
| void RaftConsensus::FillVoteResponseVoteDenied( |
| ConsensusErrorPB::Code error_code, |
| VoteResponsePB* response, |
| ResponderTermPolicy responder_term_policy) { |
| response->set_vote_granted(false); |
| response->mutable_consensus_error()->set_code(error_code); |
| if (responder_term_policy == ResponderTermPolicy::SET) { |
| response->set_responder_term(CurrentTermUnlocked()); |
| } |
| } |
| |
| Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request, |
| VoteResponsePB* response) { |
| FillVoteResponseVoteDenied(ConsensusErrorPB::INVALID_TERM, response); |
| string msg = Substitute("$0: Denying vote to candidate $1 for earlier term $2. " |
| "Current term is $3.", |
| GetRequestVoteLogPrefixUnlocked(*request), |
| request->candidate_uuid(), |
| request->candidate_term(), |
| CurrentTermUnlocked()); |
| LOG(INFO) << msg; |
| StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status()); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request, |
| VoteResponsePB* response) { |
| FillVoteResponseVoteGranted(response); |
| LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. " |
| "Re-sending same reply.", |
| GetRequestVoteLogPrefixUnlocked(*request), |
| request->candidate_uuid(), |
| request->candidate_term()); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request, |
| VoteResponsePB* response) { |
| FillVoteResponseVoteDenied(ConsensusErrorPB::ALREADY_VOTED, response); |
| string msg = Substitute("$0: Denying vote to candidate $1 in current term $2: " |
| "Already voted for candidate $3 in this term.", |
| GetRequestVoteLogPrefixUnlocked(*request), |
| request->candidate_uuid(), |
| CurrentTermUnlocked(), |
| GetVotedForCurrentTermUnlocked()); |
| LOG(INFO) << msg; |
| StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status()); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpId& local_last_logged_opid, |
| const VoteRequestPB* request, |
| VoteResponsePB* response) { |
| FillVoteResponseVoteDenied(ConsensusErrorPB::LAST_OPID_TOO_OLD, response); |
| string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " |
| "replica has last-logged OpId of $3, which is greater than that of the " |
| "candidate, which has last-logged OpId of $4.", |
| GetRequestVoteLogPrefixUnlocked(*request), |
| request->candidate_uuid(), |
| request->candidate_term(), |
| SecureShortDebugString(local_last_logged_opid), |
| SecureShortDebugString(request->candidate_status().last_received())); |
| LOG(INFO) << msg; |
| StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status()); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request, |
| VoteResponsePB* response) { |
| FillVoteResponseVoteDenied(ConsensusErrorPB::LEADER_IS_ALIVE, response, |
| ResponderTermPolicy::DO_NOT_SET); |
| auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " |
| "replica is either leader or believes a valid leader to " |
| "be alive.", |
| GetRequestVoteLogPrefixThreadSafe(*request), |
| request->candidate_uuid(), |
| request->candidate_term()); |
| VLOG(1) << msg; |
| StatusToPB(Status::InvalidArgument(msg), |
| response->mutable_consensus_error()->mutable_status()); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::RequestVoteRespondIsBusy( |
| const VoteRequestPB* request, VoteResponsePB* response) { |
| // Don't set the term in the response: the requestor doesn't need it |
| // to process the NO vote response in this case. |
| FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response, |
| ResponderTermPolicy::DO_NOT_SET); |
| auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " |
| "replica is already servicing an update from " |
| "a current leader or another vote", |
| GetRequestVoteLogPrefixThreadSafe(*request), |
| request->candidate_uuid(), |
| request->candidate_term()); |
| VLOG(1) << msg; |
| StatusToPB(Status::ServiceUnavailable(msg), |
| response->mutable_consensus_error()->mutable_status()); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request, |
| VoteResponsePB* response) { |
| // We know our vote will be "yes", so avoid triggering an election while we |
| // persist our vote to disk. We use an exponential backoff to avoid too much |
| // split-vote contention when nodes display high latencies. |
| MonoDelta backoff = LeaderElectionExpBackoffDeltaUnlocked(); |
| SnoozeFailureDetector(string("vote granted"), backoff); |
| |
| if (!request->is_pre_election()) { |
| // Persist our vote to disk. |
| RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(request->candidate_uuid())); |
| } |
| |
| FillVoteResponseVoteGranted(response); |
| |
| // Give peer time to become leader. Snooze one more time after persisting our |
| // vote. When disk latency is high, this should help reduce churn. |
| SnoozeFailureDetector(/*reason_for_log=*/nullopt, backoff); |
| |
| LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.", |
| GetRequestVoteLogPrefixUnlocked(*request), |
| request->candidate_uuid(), |
| CurrentTermUnlocked()); |
| return Status::OK(); |
| } |
| |
| RaftPeerPB::Role RaftConsensus::role() const { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| return cmeta_->active_role(); |
| } |
| |
| RaftConsensus::RoleAndMemberType RaftConsensus::GetRoleAndMemberType() const { |
| ThreadRestrictions::AssertWaitAllowed(); |
| |
| auto member_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE; |
| const auto& local_peer_uuid = peer_uuid(); |
| |
| LockGuard l(lock_); |
| for (const auto& peer : cmeta_->ActiveConfig().peers()) { |
| if (peer.permanent_uuid() == local_peer_uuid) { |
| member_type = peer.member_type(); |
| break; |
| } |
| } |
| |
| return std::make_pair(cmeta_->active_role(), member_type); |
| } |
| |
| int64_t RaftConsensus::CurrentTerm() const { |
| LockGuard l(lock_); |
| return CurrentTermUnlocked(); |
| } |
| |
| void RaftConsensus::SetStateUnlocked(State new_state) { |
| DCHECK(lock_.is_locked()); |
| switch (new_state) { |
| case kInitialized: |
| CHECK_EQ(kNew, state_); |
| break; |
| case kRunning: |
| CHECK_EQ(kInitialized, state_); |
| break; |
| case kStopping: |
| CHECK(state_ != kStopped && state_ != kShutdown) << "State = " << State_Name(state_); |
| break; |
| case kStopped: |
| CHECK_EQ(kStopping, state_); |
| break; |
| case kShutdown: |
| CHECK(state_ == kStopped || state_ == kShutdown) << "State = " << State_Name(state_); |
| break; |
| default: |
| LOG(FATAL) << "Disallowed transition to state = " << State_Name(new_state); |
| break; |
| } |
| state_ = new_state; |
| } |
| |
| const char* RaftConsensus::State_Name(State state) { |
| switch (state) { |
| case kNew: |
| return "New"; |
| case kInitialized: |
| return "Initialized"; |
| case kRunning: |
| return "Running"; |
| case kStopping: |
| return "Stopping"; |
| case kStopped: |
| return "Stopped"; |
| case kShutdown: |
| return "Shut down"; |
| default: |
| LOG(DFATAL) << "Unknown State value: " << state; |
| return "Unknown"; |
| } |
| } |
| |
| MonoDelta RaftConsensus::MinimumElectionTimeout() { |
| int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods * |
| FLAGS_raft_heartbeat_interval_ms; |
| return MonoDelta::FromMilliseconds(failure_timeout); |
| } |
| |
| void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) { |
| DCHECK(lock_.is_locked()); |
| failed_elections_since_stable_leader_ = 0; |
| num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_); |
| cmeta_->set_leader_uuid(uuid); |
| MarkDirty(Substitute("New leader $0", uuid)); |
| } |
| |
| Status RaftConsensus::ReplicateConfigChangeUnlocked( |
| RaftConfigPB old_config, |
| RaftConfigPB new_config, |
| StatusCallback client_cb) { |
| DCHECK(lock_.is_locked()); |
| auto cc_replicate = new ReplicateMsg(); |
| cc_replicate->set_op_type(CHANGE_CONFIG_OP); |
| ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record(); |
| cc_req->set_tablet_id(options_.tablet_id); |
| *cc_req->mutable_old_config() = std::move(old_config); |
| *cc_req->mutable_new_config() = std::move(new_config); |
| CHECK_OK(time_manager_->AssignTimestamp(cc_replicate)); |
| |
| scoped_refptr<ConsensusRound> round( |
| new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(cc_replicate)))); |
| auto* round_raw = round.get(); |
| round->SetConsensusReplicatedCallback( |
| [this, round_raw, client_cb](const Status& s) { |
| this->NonTxRoundReplicationFinished(round_raw, client_cb, s); |
| }); |
| return AppendNewRoundToQueueUnlocked(round); |
| } |
| |
| void RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() { |
| DCHECK(lock_.is_locked()); |
| DCHECK_EQ(RaftPeerPB::LEADER, cmeta_->active_role()); |
| const RaftConfigPB& active_config = cmeta_->ActiveConfig(); |
| |
| // Change the peers so that we're able to replicate messages remotely and |
| // locally. The peer manager must be closed before updating the active config |
| // in the queue -- when the queue is in LEADER mode, it checks that all |
| // registered peers are a part of the active config. |
| peer_manager_->Close(); |
| // TODO(todd): should use queue committed index here? in that case do |
| // we need to pass it in at all? |
| queue_->SetLeaderMode(pending_->GetCommittedIndex(), |
| CurrentTermUnlocked(), |
| active_config); |
| peer_manager_->UpdateRaftConfig(active_config); |
| } |
| |
| const string& RaftConsensus::peer_uuid() const { |
| return local_peer_pb_.permanent_uuid(); |
| } |
| |
| const string& RaftConsensus::tablet_id() const { |
| return options_.tablet_id; |
| } |
| |
| Status RaftConsensus::ConsensusState(ConsensusStatePB* cstate, |
| IncludeHealthReport report_health) const { |
| ThreadRestrictions::AssertWaitAllowed(); |
| UniqueLock l(lock_); |
| if (state_ == kShutdown) { |
| return Status::IllegalState("Tablet replica is shutdown"); |
| } |
| ConsensusStatePB cstate_tmp = cmeta_->ToConsensusStatePB(); |
| |
| // If we need to include the health report, merge it into the committed |
| // config iff we believe we are the current leader of the config. |
| if (report_health == INCLUDE_HEALTH_REPORT && |
| cmeta_->active_role() == RaftPeerPB::LEADER) { |
| auto reports = queue_->ReportHealthOfPeers(); |
| |
| // We don't need to access the queue anymore, so drop the consensus lock. |
| l.unlock(); |
| |
| // Iterate through each peer in the committed config and attach the health |
| // report to it. |
| RaftConfigPB* committed_raft_config = cstate_tmp.mutable_committed_config(); |
| for (int i = 0; i < committed_raft_config->peers_size(); i++) { |
| RaftPeerPB* peer = committed_raft_config->mutable_peers(i); |
| const HealthReportPB* report = FindOrNull(reports, peer->permanent_uuid()); |
| if (!report) continue; // Only attach details if we know about the peer. |
| *peer->mutable_health_report() = *report; |
| } |
| } |
| *cstate = std::move(cstate_tmp); |
| return Status::OK(); |
| } |
| |
| RaftConfigPB RaftConsensus::CommittedConfig() const { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| return cmeta_->CommittedConfig(); |
| } |
| |
| void RaftConsensus::DumpStatusHtml(std::ostream& out) const { |
| if (state_ != kRunning) { |
| out << "Tablet " << EscapeForHtmlToString(tablet_id()) |
| << " not running" << std::endl; |
| return; |
| } |
| const RaftPeerPB::Role role = cmeta_->GetRoleAndTerm().first; |
| |
| out << "<h1>Raft Consensus State</h1>" << std::endl; |
| |
| out << "<h2>State</h2>" << std::endl; |
| out << "<pre>" << EscapeForHtmlToString(ToString()) << "</pre>" << std::endl; |
| out << "<h2>Queue</h2>" << std::endl; |
| out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl; |
| |
| // Dump the queues on a leader. |
| if (role == RaftPeerPB::LEADER) { |
| out << "<h2>Queue overview</h2>" << std::endl; |
| out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl; |
| out << "<hr/>" << std::endl; |
| out << "<h2>Queue details</h2>" << std::endl; |
| queue_->DumpToHtml(out); |
| } |
| } |
| |
| void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult& result) { |
| // We're running on a reactor thread; service the callback on another thread. |
| // |
| // There's no need to reenable the failure detector; if this fails, it's a |
| // sign that RaftConsensus has stopped and we no longer need failure detection. |
| auto self = shared_from_this(); |
| Status s = raft_pool_token_->Submit([=]() { self->DoElectionCallback(reason, result); }); |
| if (!s.ok()) { |
| static const char* const msg = "unable to run election callback"; |
| CHECK(s.IsServiceUnavailable()) << LogPrefixThreadSafe() << msg; |
| WARN_NOT_OK(s, LogPrefixThreadSafe() + msg); |
| } |
| } |
| |
| void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResult& result) { |
| const int64_t election_term = result.vote_request.candidate_term(); |
| const bool was_pre_election = result.vote_request.is_pre_election(); |
| const char* election_type = was_pre_election ? "pre-election" : "election"; |
| |
| ThreadRestrictions::AssertWaitAllowed(); |
| UniqueLock l(lock_); |
| |
| Status s = CheckRunningUnlocked(); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << " callback for term " |
| << election_term << " while not running: " |
| << s.ToString(); |
| return; |
| } |
| |
| // If this election was not triggered by the failure detector, the fd may |
| // still be enabled and needs to be snoozed, both if we win and lose: |
| // - When we win because we're about to disable it and become leader. |
| // - When we lose or otherwise we can fall into a cycle, where everyone keeps |
| // triggering elections but no election ever completes because by the time they |
| // finish another one is triggered already. |
| // |
| // This is a no-op if the failure detector is disabled.. |
| MonoDelta snooze_delta = LeaderElectionExpBackoffDeltaUnlocked(); |
| SnoozeFailureDetector(string("election complete"), snooze_delta); |
| auto enable_fd = MakeScopedCleanup([&]() { |
| // The failure detector was disabled if it triggered this election. Reenable |
| // it when exiting this function. |
| if (reason == ELECTION_TIMEOUT_EXPIRED) { |
| EnableFailureDetector(snooze_delta); |
| } |
| }); |
| |
| if (result.decision == VOTE_DENIED) { |
| failed_elections_since_stable_leader_++; |
| num_failed_elections_metric_->set_value(failed_elections_since_stable_leader_); |
| |
| // If we called an election and one of the voters had a higher term than we did, |
| // we should bump our term before we potentially try again. This is particularly |
| // important with pre-elections to avoid getting "stuck" in a case like: |
| // Peer A: has ops through 1.10, term = 2, voted in term 2 for peer C |
| // Peer B: has ops through 1.15, term = 1 |
| // In this case, Peer B will reject peer A's pre-elections for term 3 because |
| // the local log is longer. Peer A will reject B's pre-elections for term 2 |
| // because it already voted in term 2. The check below ensures that peer B |
| // will bump to term 2 when it gets the vote rejection, such that its |
| // next pre-election (for term 3) would succeed. |
| if (result.highest_voter_term > CurrentTermUnlocked()) { |
| HandleTermAdvanceUnlocked(result.highest_voter_term); |
| } |
| |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Leader " << election_type << " lost for term " << election_term |
| << ". Reason: " |
| << (!result.message.empty() ? result.message : "None given"); |
| return; |
| } |
| |
| // In a pre-election, we collected votes for the _next_ term. |
| // So, we need to adjust our expectations of what the current term should be. |
| int64_t election_started_in_term = election_term; |
| if (was_pre_election) { |
| election_started_in_term--; |
| } |
| |
| if (election_started_in_term != CurrentTermUnlocked()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Leader " << election_type << " decision vote started in " |
| << "defunct term " << election_started_in_term << ": won"; |
| return; |
| } |
| |
| if (!cmeta_->IsVoterInConfig(peer_uuid(), ACTIVE_CONFIG)) { |
| LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type |
| << " decision while not in active config. " |
| << "Result: Term " << election_term |
| << ": won. RaftConfig: " |
| << SecureShortDebugString(cmeta_->ActiveConfig()); |
| return; |
| } |
| |
| // At this point we're either already leader, we're going to become leader, |
| // or we're going to run a real election. |
| // |
| // In all cases, do not reenable the failure detector. |
| enable_fd.cancel(); |
| |
| if (cmeta_->active_role() == RaftPeerPB::LEADER) { |
| // If this was a pre-election, it's possible to see the following interleaving: |
| // |
| // 1. Term N (follower): send a real election for term N. |
| // 2. An externally-triggered election is started. |
| // 3. Term N (follower): send a pre-election for term N+1. |
| // 4. Election callback for real election from term N completes. |
| // Peer is now leader for term N. |
| // 5. Pre-election callback from term N+1 completes, even though |
| // we are currently a leader of term N. |
| // In this case, we should just ignore the pre-election, since we're |
| // happily the leader of the prior term. |
| if (was_pre_election) return; |
| LOG_WITH_PREFIX_UNLOCKED(DFATAL) |
| << "Leader " << election_type << " callback while already leader! " |
| << "Result: Term " << election_term << ": won"; |
| return; |
| } |
| |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Leader " << election_type << " won for term " << election_term; |
| |
| if (was_pre_election) { |
| // We just won the pre-election. So, we need to call a real election. |
| l.unlock(); |
| WARN_NOT_OK_EVERY_N_SECS(StartElection(NORMAL_ELECTION, reason), |
| "Couldn't start leader election after successful pre-election", 10); |
| } else { |
| // We won a real election. Convert role to LEADER. |
| SetLeaderUuidUnlocked(peer_uuid()); |
| |
| // TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown. |
| // It races with the above state check. |
| // This could be a problem during tablet deletion. |
| CHECK_OK(BecomeLeaderUnlocked()); |
| } |
| } |
| |
| optional<OpId> RaftConsensus::GetLastOpId(OpIdType type) { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| return GetLastOpIdUnlocked(type); |
| } |
| |
| optional<OpId> RaftConsensus::GetLastOpIdUnlocked(OpIdType type) { |
| // Return early if this method is called on an instance of RaftConsensus that |
| // has not yet been started, failed during Init(), or failed during Start(). |
| if (!queue_ || !pending_) { |
| return nullopt; |
| } |
| |
| switch (type) { |
| case RECEIVED_OPID: |
| return queue_->GetLastOpIdInLog(); |
| case COMMITTED_OPID: |
| return MakeOpId(pending_->GetTermWithLastCommittedOp(), |
| pending_->GetCommittedIndex()); |
| default: |
| LOG(DFATAL) << LogPrefixUnlocked() << "Invalid OpIdType " << type; |
| return nullopt; |
| } |
| } |
| |
| log::RetentionIndexes RaftConsensus::GetRetentionIndexes() { |
| // Grab the watermarks from the queue. It's OK to fetch these two watermarks |
| // separately -- the worst case is we see a relatively "out of date" watermark |
| // which just means we'll retain slightly more than necessary in this invocation |
| // of log GC. |
| return log::RetentionIndexes(queue_->GetCommittedIndex(), // for durability |
| queue_->GetAllReplicatedIndex()); // for peers |
| } |
| |
| void RaftConsensus::MarkDirty(const string& reason) { |
| WARN_NOT_OK(raft_pool_token_->Submit([=]() { this->mark_dirty_clbk_(reason); }), |
| LogPrefixThreadSafe() + "Unable to run MarkDirty callback"); |
| } |
| |
| void RaftConsensus::MarkDirtyOnSuccess(const string& reason, |
| const StatusCallback& client_cb, |
| const Status& status) { |
| if (PREDICT_TRUE(status.ok())) { |
| MarkDirty(reason); |
| } |
| client_cb(status); |
| } |
| |
| void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round, |
| const StatusCallback& client_cb, |
| const Status& status) { |
| // NOTE: lock_ is held here because this is triggered by |
| // PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex(). |
| DCHECK(lock_.is_locked()); |
| OperationType op_type = round->replicate_msg()->op_type(); |
| const string& op_type_str = OperationType_Name(op_type); |
| CHECK(IsConsensusOnlyOperation(op_type)) << "Unexpected op type: " << op_type_str; |
| |
| if (op_type == CHANGE_CONFIG_OP) { |
| CompleteConfigChangeRoundUnlocked(round, status); |
| // Fall through to the generic handling. |
| } |
| |
| // TODO(mpercy): May need some refactoring to unlock 'lock_' before invoking |
| // the client callback. |
| |
| if (!status.ok()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: " |
| << status.ToString(); |
| client_cb(status); |
| return; |
| } |
| VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id " |
| << round->id(); |
| round_handler_->FinishConsensusOnlyRound(round); |
| CommitMsg commit_msg; |
| commit_msg.set_op_type(round->replicate_msg()->op_type()); |
| *commit_msg.mutable_commited_op_id() = round->id(); |
| |
| CHECK_OK(log_->AsyncAppendCommit( |
| commit_msg, [](const Status& s) { |
| CrashIfNotOkStatusCB("Enqueued commit operation failed to write to WAL", s); |
| })); |
| |
| client_cb(status); |
| } |
| |
| void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) { |
| DCHECK(lock_.is_locked()); |
| const OpId& op_id = round->replicate_msg()->id(); |
| |
| if (!status.ok()) { |
| // If the config change being aborted is the current pending one, abort it. |
| if (cmeta_->has_pending_config() && |
| cmeta_->GetConfigOpIdIndex(PENDING_CONFIG) == op_id.index()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId " |
| << op_id << ": " << status.ToString(); |
| cmeta_->clear_pending_config(); |
| |
| // Disable leader failure detection if transitioning from VOTER to |
| // NON_VOTER and vice versa. |
| UpdateFailureDetectorState(); |
| } else { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Skipping abort of non-pending config change with OpId " |
| << op_id << ": " << status.ToString(); |
| } |
| |
| // It's possible to abort a config change which isn't the pending one in the following |
| // sequence: |
| // - replicate a config change |
| // - it gets committed, so we write the new config to disk as the Committed configuration |
| // - we crash before the COMMIT message hits the WAL |
| // - we restart the server, and the config change is added as a pending round again, |
| // but isn't set as Pending because it's already committed. |
| // - we delete the tablet before committing it |
| // See KUDU-1735. |
| return; |
| } |
| |
| // Commit the successful config change. |
| |
| DCHECK(round->replicate_msg()->change_config_record().has_old_config()); |
| DCHECK(round->replicate_msg()->change_config_record().has_new_config()); |
| const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config(); |
| const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config(); |
| DCHECK(old_config.has_opid_index()); |
| DCHECK(new_config.has_opid_index()); |
| // Check if the pending Raft config has an OpId less than the committed |
| // config. If so, this is a replay at startup in which the COMMIT |
| // messages were delayed. |
| int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG); |
| if (new_config.opid_index() > committed_config_opid_index) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Committing config change with OpId " |
| << op_id << ": " |
| << DiffRaftConfigs(old_config, new_config) |
| << ". New config: { " << SecureShortDebugString(new_config) << " }"; |
| CHECK_OK(SetCommittedConfigUnlocked(new_config)); |
| } else { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Ignoring commit of config change with OpId " |
| << op_id << " because the committed config has OpId index " |
| << committed_config_opid_index << ". The config change we are ignoring is: " |
| << "Old config: { " << SecureShortDebugString(old_config) << " }. " |
| << "New config: { " << SecureShortDebugString(new_config) << " }"; |
| } |
| } |
| |
| void RaftConsensus::EnableFailureDetector(optional<MonoDelta> delta) { |
| if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { |
| failure_detector_->Start(std::move(delta)); |
| } |
| } |
| |
| void RaftConsensus::DisableFailureDetector() { |
| if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { |
| failure_detector_->Stop(); |
| } |
| } |
| |
| void RaftConsensus::UpdateFailureDetectorState(optional<MonoDelta> delta) { |
| DCHECK(lock_.is_locked()); |
| const auto& uuid = peer_uuid(); |
| if (uuid != cmeta_->leader_uuid() && |
| cmeta_->IsVoterInConfig(uuid, ACTIVE_CONFIG)) { |
| // A voter that is not the leader should run the failure detector. |
| EnableFailureDetector(std::move(delta)); |
| } else { |
| // Otherwise, the local peer should not start leader elections |
| // (e.g. if it is the leader, a non-voter, a non-participant, etc). |
| DisableFailureDetector(); |
| } |
| } |
| |
| void RaftConsensus::SnoozeFailureDetector(optional<string> reason_for_log, |
| optional<MonoDelta> delta) { |
| if (PREDICT_TRUE(failure_detector_ && FLAGS_enable_leader_failure_detection)) { |
| if (reason_for_log) { |
| VLOG(1) << LogPrefixThreadSafe() |
| << Substitute("Snoozing failure detection for $0 ($1)", |
| delta ? delta->ToString() : "election timeout", |
| *reason_for_log); |
| } |
| |
| if (!delta) { |
| delta = MinimumElectionTimeout(); |
| } |
| failure_detector_->Snooze(std::move(delta)); |
| } |
| } |
| |
| void RaftConsensus::WithholdVotes() { |
| MonoTime prev = withhold_votes_until_; |
| MonoTime next = MonoTime::Now() + MinimumElectionTimeout(); |
| do { |
| if (prev == MonoTime::Max()) { |
| // Maximum withholding time already. It might be the case if replica |
| // has become a leader already. |
| break; |
| } |
| next = MonoTime::Now() + MinimumElectionTimeout(); |
| } while (!withhold_votes_until_.compare_exchange_weak(prev, next)); |
| } |
| |
| MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() { |
| DCHECK(lock_.is_locked()); |
| // Compute a backoff factor based on how many leader elections have |
| // failed since a stable leader was last seen. |
| double backoff_factor = pow(1.5, failed_elections_since_stable_leader_ + 1); |
| double min_timeout = MinimumElectionTimeout().ToMilliseconds(); |
| double max_timeout = std::min<double>( |
| min_timeout * backoff_factor, |
| FLAGS_leader_failure_exp_backoff_max_delta_ms); |
| |
| // Randomize the timeout between the minimum and the calculated value. |
| // We do this after the above capping to the max. Otherwise, after a |
| // churny period, we'd end up highly likely to backoff exactly the max |
| // amount. |
| double timeout = min_timeout + (max_timeout - min_timeout) * rng_.NextDoubleFraction(); |
| DCHECK_GE(timeout, min_timeout); |
| |
| return MonoDelta::FromMilliseconds(timeout); |
| } |
| |
| Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term, |
| FlushToDisk flush) { |
| DCHECK(lock_.is_locked()); |
| if (new_term <= CurrentTermUnlocked()) { |
| return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.", |
| new_term, CurrentTermUnlocked())); |
| } |
| if (cmeta_->active_role() == RaftPeerPB::LEADER) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term " |
| << CurrentTermUnlocked(); |
| RETURN_NOT_OK(BecomeReplicaUnlocked()); |
| } |
| |
| LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term; |
| RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush)); |
| if (term_metric_) { |
| term_metric_->set_value(new_term); |
| } |
| last_received_cur_leader_ = MinimumOpId(); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const { |
| DCHECK(lock_.is_locked()); |
| DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg); |
| RETURN_NOT_OK(CheckRunningUnlocked()); |
| return CheckActiveLeaderUnlocked(); |
| } |
| |
| Status RaftConsensus::CheckRunningUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| if (PREDICT_FALSE(state_ != kRunning)) { |
| return Status::IllegalState("RaftConsensus is not running", |
| Substitute("State = $0", State_Name(state_))); |
| } |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::CheckActiveLeaderUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| RaftPeerPB::Role role = cmeta_->active_role(); |
| switch (role) { |
| case RaftPeerPB::LEADER: |
| // Check for the consistency of the information in the consensus metadata |
| // and the state of the consensus queue. |
| DCHECK(queue_->IsInLeaderMode()); |
| if (leader_transfer_in_progress_) { |
| return Status::ServiceUnavailable("leader transfer in progress"); |
| } |
| return Status::OK(); |
| |
| default: |
| // Check for the consistency of the information in the consensus metadata |
| // and the state of the consensus queue. |
| DCHECK(!queue_->IsInLeaderMode()); |
| return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. " |
| "Consensus state: $2", |
| peer_uuid(), |
| RaftPeerPB::Role_Name(role), |
| SecureShortDebugString(cmeta_->ToConsensusStatePB()))); |
| } |
| } |
| |
| Status RaftConsensus::CheckNoConfigChangePendingUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| if (cmeta_->has_pending_config()) { |
| return Status::IllegalState( |
| Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n" |
| " Committed config: $0.\n Pending config: $1", |
| SecureShortDebugString(cmeta_->CommittedConfig()), |
| SecureShortDebugString(cmeta_->PendingConfig()))); |
| } |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) { |
| DCHECK(lock_.is_locked()); |
| RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config), |
| "Invalid config to set as pending"); |
| if (!new_config.unsafe_config_change()) { |
| CHECK(!cmeta_->has_pending_config()) |
| << "Attempt to set pending config while another is already pending! " |
| << "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; " |
| << "Attempted new pending config: " << SecureShortDebugString(new_config); |
| } else if (cmeta_->has_pending_config()) { |
| LOG_WITH_PREFIX_UNLOCKED(INFO) |
| << "Allowing unsafe config change even though there is a pending config! " |
| << "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; " |
| << "New pending config: " << SecureShortDebugString(new_config); |
| } |
| cmeta_->set_pending_config(new_config); |
| |
| UpdateFailureDetectorState(); |
| |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) { |
| TRACE_EVENT0("consensus", "RaftConsensus::SetCommittedConfigUnlocked"); |
| DCHECK(lock_.is_locked()); |
| DCHECK(config_to_commit.IsInitialized()); |
| RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config_to_commit), |
| "Invalid config to set as committed"); |
| |
| // Compare committed with pending configuration, ensure that they are the same. |
| // In the event of an unsafe config change triggered by an administrator, |
| // it is possible that the config being committed may not match the pending config |
| // because unsafe config change allows multiple pending configs to exist. |
| // Therefore we only need to validate that 'config_to_commit' matches the pending config |
| // if the pending config does not have its 'unsafe_config_change' flag set. |
| if (cmeta_->has_pending_config()) { |
| RaftConfigPB pending_config = cmeta_->PendingConfig(); |
| if (!pending_config.unsafe_config_change()) { |
| // Quorums must be exactly equal, even w.r.t. peer ordering. |
| CHECK_EQ(pending_config.SerializeAsString(), |
| config_to_commit.SerializeAsString()) |
| << Substitute("New committed config must equal pending config, but does not. " |
| "Pending config: $0, committed config: $1", |
| SecureShortDebugString(pending_config), |
| SecureShortDebugString(config_to_commit)); |
| } |
| } |
| cmeta_->set_committed_config(config_to_commit); |
| cmeta_->clear_pending_config(); |
| CHECK_OK(cmeta_->Flush()); |
| return Status::OK(); |
| } |
| |
| Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term, |
| FlushToDisk flush) { |
| TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked", |
| "term", new_term); |
| DCHECK(lock_.is_locked()); |
| if (PREDICT_FALSE(new_term <= CurrentTermUnlocked())) { |
| return Status::IllegalState( |
| Substitute("Cannot change term to a term that is lower than or equal to the current one. " |
| "Current: $0, Proposed: $1", CurrentTermUnlocked(), new_term)); |
| } |
| cmeta_->set_current_term(new_term); |
| cmeta_->clear_voted_for(); |
| if (flush == FLUSH_TO_DISK) { |
| CHECK_OK(cmeta_->Flush()); |
| } |
| ClearLeaderUnlocked(); |
| return Status::OK(); |
| } |
| |
| int64_t RaftConsensus::CurrentTermUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| return cmeta_->current_term(); |
| } |
| |
| string RaftConsensus::GetLeaderUuidUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| return cmeta_->leader_uuid(); |
| } |
| |
| bool RaftConsensus::HasLeaderUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| return !GetLeaderUuidUnlocked().empty(); |
| } |
| |
| void RaftConsensus::ClearLeaderUnlocked() { |
| DCHECK(lock_.is_locked()); |
| leader_is_ready_ = false; |
| cmeta_->set_leader_uuid(""); |
| } |
| |
| bool RaftConsensus::HasVotedCurrentTermUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| return cmeta_->has_voted_for(); |
| } |
| |
| Status RaftConsensus::SetVotedForCurrentTermUnlocked(const string& uuid) { |
| TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked", |
| "uuid", uuid); |
| DCHECK(lock_.is_locked()); |
| cmeta_->set_voted_for(uuid); |
| CHECK_OK(cmeta_->Flush()); |
| return Status::OK(); |
| } |
| |
| const string& RaftConsensus::GetVotedForCurrentTermUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| DCHECK(cmeta_->has_voted_for()); |
| return cmeta_->voted_for(); |
| } |
| |
| const ConsensusOptions& RaftConsensus::GetOptions() const { |
| return options_; |
| } |
| |
| string RaftConsensus::LogPrefix() const { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| return LogPrefixUnlocked(); |
| } |
| |
| string RaftConsensus::LogPrefixUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| // 'cmeta_' may not be set if initialization failed. |
| string cmeta_info; |
| if (cmeta_) { |
| cmeta_info = Substitute(" [term $0 $1]", |
| cmeta_->current_term(), |
| RaftPeerPB::Role_Name(cmeta_->active_role())); |
| } |
| return Substitute("T $0 P $1$2: ", options_.tablet_id, peer_uuid(), cmeta_info); |
| } |
| |
| string RaftConsensus::ToString() const { |
| ThreadRestrictions::AssertWaitAllowed(); |
| LockGuard l(lock_); |
| return ToStringUnlocked(); |
| } |
| |
| string RaftConsensus::ToStringUnlocked() const { |
| DCHECK(lock_.is_locked()); |
| return Substitute("Replica: $0, State: $1, Role: $2", |
| peer_uuid(), State_Name(state_), RaftPeerPB::Role_Name(cmeta_->active_role())); |
| } |
| |
| int64_t RaftConsensus::MetadataOnDiskSize() const { |
| return cmeta_->on_disk_size(); |
| } |
| |
| ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const { |
| return cmeta_.get(); |
| } |
| |
| int64_t RaftConsensus::GetMillisSinceLastLeaderHeartbeat() const { |
| return last_leader_communication_time_micros_ == 0 ? |
| 0 : (GetMonoTimeMicros() - last_leader_communication_time_micros_) / 1000; |
| } |
| |
| //////////////////////////////////////////////////////////////////////// |
| // ConsensusBootstrapInfo |
| //////////////////////////////////////////////////////////////////////// |
| |
| ConsensusBootstrapInfo::ConsensusBootstrapInfo() |
| : last_id(MinimumOpId()), |
| last_committed_id(MinimumOpId()) { |
| } |
| |
| ConsensusBootstrapInfo::~ConsensusBootstrapInfo() { |
| STLDeleteElements(&orphaned_replicates); |
| } |
| |
| //////////////////////////////////////////////////////////////////////// |
| // ConsensusRound |
| //////////////////////////////////////////////////////////////////////// |
| |
| ConsensusRound::ConsensusRound(RaftConsensus* consensus, |
| unique_ptr<ReplicateMsg> replicate_msg, |
| ConsensusReplicatedCallback replicated_cb) |
| : consensus_(consensus), |
| replicate_msg_(new RefCountedReplicate(replicate_msg.release())), |
| replicated_cb_(std::move(replicated_cb)), |
| bound_term_(-1) {} |
| |
| ConsensusRound::ConsensusRound(RaftConsensus* consensus, |
| ReplicateRefPtr replicate_msg) |
| : consensus_(consensus), |
| replicate_msg_(std::move(replicate_msg)), |
| bound_term_(-1) { |
| DCHECK(replicate_msg_); |
| } |
| |
| void ConsensusRound::NotifyReplicationFinished(const Status& status) { |
| if (PREDICT_FALSE(!replicated_cb_)) return; |
| replicated_cb_(status); |
| } |
| |
| Status ConsensusRound::CheckBoundTerm(int64_t current_term) const { |
| if (PREDICT_FALSE(bound_term_ != -1 && |
| bound_term_ != current_term)) { |
| return Status::Aborted( |
| strings::Substitute( |
| "Op submitted in term $0 cannot be replicated in term $1", |
| bound_term_, current_term)); |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace consensus |
| } // namespace kudu |