blob: 2f672c17eae26d6113c59ff547e45fa0e199f021 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
import static org.apache.ratis.util.LifeCycle.State.STARTING;
/**
* For a candidate to start an election for becoming the leader.
* There are two phases: Pre-Vote and Election.
* <p>
* In Pre-Vote, the candidate does not change its term and try to learn
* if a majority of the cluster would be willing to grant the candidate their votes
* (if the candidate’s log is sufficiently up-to-date,
* and the voters have not received heartbeats from a valid leader
* for at least a baseline election timeout).
* <p>
* Once the Pre-Vote has passed, the candidate increments its term and starts a real Election.
* <p>
* See
* Ongaro, D. Consensus: Bridging Theory and Practice. PhD thesis, Stanford University, 2014.
* Available at https://github.com/ongardie/dissertation
*/
class LeaderElection implements Runnable {
public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses,
List<Exception> exceptions) {
return logAndReturn(phase, result, responses, exceptions, null);
}
private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses,
List<Exception> exceptions, Long newTerm) {
final ResultAndTerm resultAndTerm = new ResultAndTerm(result, newTerm);
LOG.info("{}: {} {} received {} response(s) and {} exception(s):",
this, phase, resultAndTerm, responses.size(), exceptions.size());
int i = 0;
for(RequestVoteReplyProto reply : responses.values()) {
LOG.info(" Response {}: {}", i++, ServerStringUtils.toRequestVoteReplyString(reply));
}
for(Exception e : exceptions) {
final int j = i++;
LogUtils.infoOrTrace(LOG, () -> " Exception " + j, e);
}
return resultAndTerm;
}
enum Phase {
PRE_VOTE,
ELECTION
}
enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}
private static class ResultAndTerm {
private final Result result;
private final Long term;
ResultAndTerm(Result result, Long term) {
this.result = result;
this.term = term;
}
long maxTerm(long thatTerm) {
return this.term != null && this.term > thatTerm ? this.term: thatTerm;
}
Result getResult() {
return result;
}
@Override
public String toString() {
return result + (term != null? " (term=" + term + ")": "");
}
}
static class Executor {
private final ExecutorService executor;
private final AtomicInteger count = new AtomicInteger();
Executor(Object name, int size) {
Preconditions.assertTrue(size > 0);
executor = Executors.newCachedThreadPool(r ->
Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build());
}
ExecutorService getExecutor() {
return executor;
}
Future<RequestVoteReplyProto> submit(Callable<RequestVoteReplyProto> task) {
return executor.submit(task);
}
}
static class ConfAndTerm {
private final RaftConfigurationImpl conf;
private final long term;
ConfAndTerm(RaftConfigurationImpl conf, long term) {
this.conf = conf;
this.term = term;
}
long getTerm() {
return term;
}
RaftConfigurationImpl getConf() {
return conf;
}
@Override
public String toString() {
return "term=" + term + ", " + conf;
}
}
private static final AtomicInteger COUNT = new AtomicInteger();
private final String name;
private final LifeCycle lifeCycle;
private final Daemon daemon;
private final RaftServerImpl server;
private final boolean skipPreVote;
private final ConfAndTerm round0;
LeaderElection(RaftServerImpl server, boolean force) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
.setThreadGroup(server.getThreadGroup()).build();
this.server = server;
this.skipPreVote = force ||
!RaftServerConfigKeys.LeaderElection.preVote(
server.getRaftServer().getProperties());
try {
// increase term of the candidate in advance if it's forced to election
this.round0 = force ? server.getState().initElection(Phase.ELECTION) : null;
} catch (IOException e) {
throw new IllegalStateException(name + ": Failed to initialize election", e);
}
}
void start() {
startIfNew(daemon::start);
}
@VisibleForTesting
void startInForeground() {
startIfNew(this);
}
private void startIfNew(Runnable starter) {
if (lifeCycle.compareAndTransition(NEW, STARTING)) {
starter.run();
} else {
final LifeCycle.State state = lifeCycle.getCurrentState();
LOG.info("{}: skip starting since this is already {}", this, state);
}
}
void shutdown() {
lifeCycle.checkStateAndClose();
}
@VisibleForTesting
LifeCycle.State getCurrentState() {
return lifeCycle.getCurrentState();
}
@Override
public void run() {
if (!lifeCycle.compareAndTransition(STARTING, RUNNING)) {
final LifeCycle.State state = lifeCycle.getCurrentState();
LOG.info("{}: skip running since this is already {}", this, state);
return;
}
try (AutoCloseable ignored = Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) {
for (int round = 0; shouldRun(); round++) {
if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
if (askForVotes(Phase.ELECTION, round)) {
server.changeToLeader();
}
}
}
} catch(Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
final LifeCycle.State state = lifeCycle.getCurrentState();
if (state.isClosingOrClosed()) {
LOG.info(this + ": since this is already " + state + ", safely ignore " + e);
} else {
if (!server.getInfo().isAlive()) {
LOG.info(this + ": since the server is not alive, safely ignore " + e);
} else {
LOG.error("{}: Failed, state={}", this, state, e);
}
shutdown();
}
} finally {
// Update leader election completion metric(s).
server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
lifeCycle.checkStateAndClose(() -> {});
}
}
private boolean shouldRun() {
final DivisionInfo info = server.getInfo();
return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive();
}
private boolean shouldRun(long electionTerm) {
return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
}
private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationImpl conf, long electionTerm)
throws InterruptedException {
if (!conf.containsInConf(server.getId())) {
return new ResultAndTerm(Result.NOT_IN_CONF, electionTerm);
}
final ResultAndTerm r;
final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
if (others.isEmpty()) {
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
final TermIndex lastEntry = server.getState().getLastEntry();
final List<Future<RequestVoteReplyProto>> submitted = submitRequests(
phase, electionTerm, lastEntry, others, server.getLeaderElectionExecutor());
r = waitForResults(phase, electionTerm, submitted, conf);
}
return r;
}
/** Send requestVote rpc to all other peers for the given phase. */
private boolean askForVotes(Phase phase, int round) throws InterruptedException, IOException {
final long electionTerm;
final RaftConfigurationImpl conf;
synchronized (server) {
if (!shouldRun()) {
return false;
}
// If round0 is non-null, we have already called initElection in the constructor,
// reuse round0 to avoid initElection again for the first round
final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ?
round0 : server.getState().initElection(phase);
electionTerm = confAndTerm.getTerm();
conf = confAndTerm.getConf();
}
LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
LOG.info("{} {} round {}: result {}", this, phase, round, r);
synchronized (server) {
if (!shouldRun(electionTerm)) {
return false; // term already passed or this should not run anymore.
}
switch (r.getResult()) {
case PASSED:
case SINGLE_MODE_PASSED:
return true;
case NOT_IN_CONF:
case SHUTDOWN:
server.close();
server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto());
return false;
case TIMEOUT:
return false; // should retry
case REJECTED:
case DISCOVERED_A_NEW_TERM:
final long term = r.maxTerm(server.getState().getCurrentTerm());
server.changeToFollowerAndPersistMetadata(term, false, r);
return false;
default: throw new IllegalArgumentException("Unable to process result " + r.result);
}
}
}
private List<Future<RequestVoteReplyProto>> submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
Collection<RaftPeer> others, Executor voteExecutor) {
final List<Future<RequestVoteReplyProto>> submitted = new ArrayList<>(others.size());
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
submitted.add(voteExecutor.submit(() -> server.getServerRpc().requestVote(r)));
}
return submitted;
}
private Set<RaftPeerId> getHigherPriorityPeers(RaftConfiguration conf) {
final Optional<Integer> priority = Optional.ofNullable(conf.getPeer(server.getId()))
.map(RaftPeer::getPriority);
return conf.getAllPeers().stream()
.filter(peer -> priority.filter(p -> peer.getPriority() > p).isPresent())
.map(RaftPeer::getId)
.collect(Collectors.toSet());
}
private ResultAndTerm waitForResults(Phase phase, long electionTerm, List<Future<RequestVoteReplyProto>> submitted,
RaftConfigurationImpl conf) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
Collection<RaftPeerId> votedPeers = new ArrayList<>();
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
final boolean singleMode = conf.isSingleMode(server.getId());
for(Iterator<Future<RequestVoteReplyProto>> i = submitted.iterator(); i.hasNext() && shouldRun(electionTerm); ) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
if (waitTime.isNonPositive()) {
if (conf.hasMajority(votedPeers, server.getId())) {
// if some higher priority peer did not response when timeout, but candidate get majority,
// candidate pass vote
return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else if (singleMode) {
// if candidate is in single mode, candidate pass vote.
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
} else {
return logAndReturn(phase, Result.TIMEOUT, responses, exceptions);
}
}
try {
final Future<RequestVoteReplyProto> future = i.next();
final RequestVoteReplyProto r = future.get(waitTime.getDuration(), waitTime.getUnit());
final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
if (previous != null) {
if (LOG.isWarnEnabled()) {
LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st={}, 2nd={}",
this, replierId,
ServerStringUtils.toRequestVoteReplyString(previous),
ServerStringUtils.toRequestVoteReplyString(r));
}
continue;
}
if (r.getShouldShutdown()) {
return logAndReturn(phase, Result.SHUTDOWN, responses, exceptions);
}
if (r.getTerm() > electionTerm) {
return logAndReturn(phase, Result.DISCOVERED_A_NEW_TERM, responses, exceptions, r.getTerm());
}
// If any peer with higher priority rejects vote, candidate can not pass vote
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) && !singleMode) {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
// remove higher priority peer, so that we check higherPriorityPeers empty to make sure
// all higher priority peers have replied
higherPriorityPeers.remove(replierId);
if (r.getServerReply().getSuccess()) {
votedPeers.add(replierId);
// If majority and all peers with higher priority have voted, candidate pass vote
if (higherPriorityPeers.isEmpty() && conf.hasMajority(votedPeers, server.getId())) {
return logAndReturn(phase, Result.PASSED, responses, exceptions);
}
} else {
rejectedPeers.add(replierId);
if (conf.majorityRejectVotes(rejectedPeers)) {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
}
} catch(ExecutionException e) {
LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
exceptions.add(e);
} catch (TimeoutException e) {
// get timeout, continue to return Result.TIMEOUT
}
}
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else if (singleMode) {
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
} else {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
}
@Override
public String toString() {
return name;
}
}