blob: 8aaf8c9bd82bf887c0a9f2fd9b1a91c92be12fef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Leader Election process. This class contains the logic by which a
* leader is chosen. First call * {@link #setup(ElectionContext)} to ensure
* the election process is init'd. Next call
* {@link #joinElection(boolean)} to start the leader election.
*
* The implementation follows the classic ZooKeeper recipe of creating an
* ephemeral, sequential node for each candidate and then looking at the set
* of such nodes - if the created node is the lowest sequential node, the
* candidate that created the node is the leader. If not, the candidate puts
* a watch on the next lowest node it finds, and if that node goes down,
* starts the whole process over by checking if it's the lowest sequential node, etc.
*
*/
public class LeaderElector implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String ELECTION_NODE = "/election";
public final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)");
private final static Pattern SESSION_ID = Pattern.compile(".*?/?(.*?-.*?)-n_\\d+");
private static final char JOIN = 'j';
private static final char CHECK_IF_LEADER = 'k';
private static final char OUT_OF_ELECTION = 'o';
private static final char POT_LEADER = 'p';
private static final char LEADER = 'l';
private static final char CLOSED = 'c';
private static final char WAITING_IN_ELECTION = 'w';
protected final SolrZkClient zkClient;
private final ZkController zkController;
private volatile ElectionContext context;
private volatile ElectionWatcher watcher;
private volatile boolean isClosed;
private volatile Future<?> joinFuture;
private volatile boolean isCancelled;
private final ExecutorService executor = ParWork.getExecutorService(1);
private volatile char state = OUT_OF_ELECTION;
// public LeaderElector(SolrZkClient zkClient) {
// this.zkClient = zkClient;
// this.contextKey = null;
// this.electionContexts = new ConcurrentHashMap<>(132, 0.75f, 50);
// }
public LeaderElector(ZkController zkController) {
this.zkClient = zkController.getZkClient();
this.zkController = zkController;
assert ObjectReleaseTracker.track(this);
}
public ElectionContext getContext() {
return context;
}
/**
* Check if the candidate with the given n_* sequence number is the leader.
* If it is, set the leaderId on the leader zk node. If it is not, start
* watching the candidate that is in line before this one - if it goes down, check
* if this candidate is the leader again.
*
* @param replacement has someone else been the leader already?
*/
private synchronized boolean checkIfIamLeader(final ElectionContext context, boolean replacement) throws InterruptedException {
//if (checkClosed(context)) return false;
MDCLoggingContext.setCoreName(context.leaderProps.getName());
try {
if (log.isDebugEnabled()) log.debug("Check if I am leader {}", context.getClass().getSimpleName());
if (isClosed) {
log.info("elector is closed, won't join election");
return false;
}
if (state == LEADER || state == POT_LEADER) {
return false;
}
// executor.submit(() -> {
// context.checkIfIamLeaderFired();
// });
state = CHECK_IF_LEADER;
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
List<String> seqs;
try {
seqs = zkClient.getChildren(holdElectionPath, null, true);
} catch (KeeperException.SessionExpiredException e) {
log.error("ZooKeeper session has expired");
state = OUT_OF_ELECTION;
return true;
} catch (KeeperException.NoNodeException e) {
log.info("the election node disappeared");
state = OUT_OF_ELECTION;
return false;
} catch (KeeperException e) {
// we couldn't set our watch for some other reason, retry
log.warn("Failed setting election watch, retrying {} {}", e.getClass().getName(), e.getMessage());
state = OUT_OF_ELECTION;
return true;
} catch (Exception e) {
// we couldn't set our watch for some other reason, retry
log.error("Failed on election getchildren call {} {}", e.getClass().getName(), e.getMessage());
state = OUT_OF_ELECTION;
return false;
}
try {
sortSeqs(seqs);
String leaderSeqNodeName;
try {
leaderSeqNodeName = context.leaderSeqPath.substring(context.leaderSeqPath.lastIndexOf('/') + 1);
} catch (NullPointerException e) {
state = OUT_OF_ELECTION;
if (log.isDebugEnabled()) log.debug("leaderSeqPath has been removed, bailing");
return true;
}
if (!seqs.contains(leaderSeqNodeName)) {
log.warn("Our node is no longer in line to be leader");
state = OUT_OF_ELECTION;
return false;
}
if (log.isDebugEnabled()) log.debug("The leader election node is {}", leaderSeqNodeName);
if (leaderSeqNodeName.equals(seqs.get(0))) {
// I am the leader
if (log.isDebugEnabled()) log.debug("I am the potential leader {}, running leader process", context.leaderProps.getName());
ElectionWatcher oldWatcher = watcher;
if (oldWatcher != null) {
oldWatcher.close();
}
// if ((zkController != null && zkController.getCoreContainer().isShutDown())) {
// if (log.isDebugEnabled()) log.debug("Elector is closed, will not try and run leader processes");
// state = OUT_OF_ELECTION;
// return false;
// }
state = POT_LEADER;
runIamLeaderProcess(context, replacement);
return false;
} else {
if (state == LEADER || state == POT_LEADER) {
return false;
}
String toWatch = seqs.get(0);
for (String node : seqs) {
if (leaderSeqNodeName.equals(node)) {
break;
}
toWatch = node;
}
try {
String watchedNode = holdElectionPath + "/" + toWatch;
log.debug("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs);
ElectionWatcher oldWatcher = watcher;
if (oldWatcher != null) {
IOUtils.closeQuietly(oldWatcher);
}
state = WAITING_IN_ELECTION;
watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context);
Stat exists = zkClient.exists(watchedNode, watcher);
if (exists == null) {
state = OUT_OF_ELECTION;
return true;
}
if (log.isDebugEnabled()) log.debug("Watching path {} to know if I could be the leader, my node is {}", watchedNode, context.leaderSeqPath);
return false;
} catch (KeeperException.SessionExpiredException e) {
state = OUT_OF_ELECTION;
log.error("ZooKeeper session has expired");
return true;
} catch (KeeperException.NoNodeException e) {
log.info("the previous node disappeared, check if we are the leader again");
state = OUT_OF_ELECTION;
return true;
} catch (KeeperException e) {
// we couldn't set our watch for some other reason, retry
log.warn("Failed setting election watch, retrying {} {}", e.getClass().getName(), e.getMessage());
state = OUT_OF_ELECTION;
return true;
} catch (AlreadyClosedException e) {
state = OUT_OF_ELECTION;
return false;
} catch (Exception e) {
// we couldn't set our watch for some other reason, retry
log.error("Failed setting election watch {} {}", e.getClass().getName(), e.getMessage());
state = OUT_OF_ELECTION;
return true;
}
}
} catch (KeeperException.SessionExpiredException e) {
log.error("ZooKeeper session has expired");
state = OUT_OF_ELECTION;
return true;
} catch (AlreadyClosedException e) {
state = OUT_OF_ELECTION;
return false;
} catch (Exception e) {
log.error("Exception", e);
state = OUT_OF_ELECTION;
return false;
}
} finally {
MDCLoggingContext.clear();
}
}
// TODO: get this core param out of here
protected synchronized void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
if (state == CLOSED || isClosed) {
throw new AlreadyClosedException();
}
// if (state == LEADER) {
// throw new IllegalStateException("Already in leader state");
// }
boolean success = false;
try {
success = context.runLeaderProcess(context, weAreReplacement, 0);
if (success) {
state = LEADER;
} else {
log.warn("Failed becoming leader {}", context.leaderProps);
}
} finally {
if (!success) {
state = OUT_OF_ELECTION;
}
}
}
/**
* Returns int given String of form n_0000000001 or n_0000000003, etc.
*
* @return sequence number
*/
public static int getSeq(String nStringSequence) {
int seq;
Matcher m = LEADER_SEQ.matcher(nStringSequence);
if (m.matches()) {
seq = Integer.parseInt(m.group(1));
} else {
throw new IllegalStateException("Could not find regex match in:"
+ nStringSequence);
}
return seq;
}
private String getNodeId(String nStringSequence) {
String id;
Matcher m = SESSION_ID.matcher(nStringSequence);
if (m.matches()) {
id = m.group(1);
} else {
throw new IllegalStateException("Could not find regex match in:"
+ nStringSequence);
}
return id;
}
public static String getNodeName(String nStringSequence){
return nStringSequence;
}
public void joinElection(boolean replacement) {
joinElection(replacement, false);
}
public void joinElection(boolean replacement,boolean joinAtHead) {
if (!isClosed && !zkController.getCoreContainer().isShutDown() && !zkController.isDcCalled()) {
joinFuture = executor.submit(() -> {
MDCLoggingContext.setCoreName(context.leaderProps.getName());
MDCLoggingContext.setNode(zkController.getNodeName());
try {
isCancelled = false;
doJoinElection(context, replacement, joinAtHead);
} catch (Exception e) {
log.error("Exception trying to join election", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} finally {
MDCLoggingContext.clear();
}
});
}
}
/**
* Begin participating in the election process. Gets a new sequential number
* and begins watching the node with the sequence number before it, unless it
* is the lowest number, in which case, initiates the leader process. If the
* node that is watched goes down, check if we are the new lowest node, else
* watch the next lowest numbered node.
*
*/
public synchronized void doJoinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException {
//if (checkClosed(context)) return false;
if (shouldRejectJoins() || state == CLOSED) {
log.info("Won't join election {}", state);
throw new AlreadyClosedException();
}
if (state == LEADER) {
log.error("Wrong state",new IllegalStateException("Got " + state));
throw new IllegalStateException("Wrong state",new IllegalStateException("Got " + state));
}
state = JOIN;
isCancelled = false;
ParWork.getRootSharedExecutor().submit(context::joinedElectionFired);
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
long sessionId;
String id = null;
String leaderSeqPath;
while (true) {
try {
sessionId = zkClient.getSessionId();
id = sessionId + "-" + context.id;
if (joinAtHead){
if (log.isDebugEnabled()) log.debug("Node {} trying to join election at the head", id);
List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
if(nodes.size() <2){
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null,
CreateMode.EPHEMERAL_SEQUENTIAL, true);
} else {
String firstInLine = nodes.get(1);
if (log.isDebugEnabled()) log.debug("The current head: {}", firstInLine);
Matcher m = LEADER_SEQ.matcher(firstInLine);
if (!m.matches()) {
throw new IllegalStateException("Could not find regex match in:"
+ firstInLine);
}
leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1);
zkClient.create(leaderSeqPath, (byte[]) null, CreateMode.EPHEMERAL, false);
}
} else {
if (log.isDebugEnabled()) log.debug("create ephem election node {}", shardsElectZkPath + "/" + id + "-n_");
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, CreateMode.EPHEMERAL_SEQUENTIAL, false);
}
log.debug("Joined leadership election with path: {}", leaderSeqPath);
context.leaderSeqPath = leaderSeqPath;
state = JOIN;
break;
} catch (ConnectionLossException e) {
if (zkClient.getConnectionManager().getKeeper().getState() == ZooKeeper.States.CLOSED) {
log.info("Won't retry to create election node on ConnectionLoss because the client state is closed");
break;
}
// we don't know if we made our node or not...
List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);
for (String entry : entries) {
String nodeId = getNodeId(entry);
if (id.equals(nodeId)) {
// we did create our node...
break;
}
}
} catch (KeeperException.NoNodeException e) {
throw new AlreadyClosedException();
}
}
getSeq(context.leaderSeqPath);
if (log.isDebugEnabled()) log.debug("Do checkIfIamLeader");
boolean tryagain = true;
while (tryagain) {
tryagain = checkIfIamLeader(context, replacement);
}
// boolean tryagain = false;
// while (tryagain) {
// tryagain = checkIfIamLeader(context, replacement);
// if (tryagain) {
// Thread.sleep(250);
// }
// }
}
private boolean shouldRejectJoins() {
return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || zkClient.isClosed();
}
@Override
public synchronized void close() throws IOException {
assert ObjectReleaseTracker.release(this);
state = CLOSED;
this.isClosed = true;
IOUtils.closeQuietly(watcher);
watcher = null;
if (context != null) {
try {
context.cancelElection();
} catch (Exception e) {
log.warn("Exception canceling election", e);
}
}
try {
if (joinFuture != null) {
joinFuture.cancel(true);
}
} catch (Exception e) {
// okay
}
}
public void cancel() {
state = OUT_OF_ELECTION;
try {
this.isCancelled = true;
IOUtils.closeQuietly(watcher);
if (context != null) {
context.cancelElection();
}
Future<?> jf = joinFuture;
if (jf != null) {
jf.cancel(false);
// if (!shouldRejectJoins()) {
// try {
// jf.get(500, TimeUnit.MILLISECONDS);
//
// } catch (TimeoutException e) {
//
// } catch (Exception e) {
// log.error("Exception waiting for previous election attempt to finish {} {} cause={}", e.getClass().getSimpleName(), e.getMessage());
// }
// }
}
} catch (Exception e) {
log.warn("Exception canceling election", e);
}
}
public boolean isClosed() {
return isClosed;
}
public char getState() {
return state;
}
private class ElectionWatcher implements Watcher, Closeable {
final String myNode, watchedNode;
final ElectionContext context;
private volatile boolean closed;
private ElectionWatcher(String myNode, String watchedNode, ElectionContext context) {
this.myNode = myNode;
this.watchedNode = watchedNode;
this.context = context;
}
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
if (EventType.None.equals(event.getType()) || closed) {
return;
}
if (log.isDebugEnabled()) log.debug("Got event on node we where watching in leader line {} watchedNode={}", myNode, watchedNode);
if (state == LEADER) {
log.info("Election watcher fired, but we are already leader");
return;
}
if (isCancelled || isClosed) {
if (log.isDebugEnabled()) log.debug("This watcher is not active anymore {} isCancelled={} isClosed={}", myNode, isCancelled, isClosed);
return;
}
try {
if (event.getType() == EventType.NodeDeleted) {
// am I the next leader?
state = CHECK_IF_LEADER;
boolean tryagain = true;
while (tryagain) {
tryagain = checkIfIamLeader(context, true);
}
} else {
Stat exists = zkClient.exists(watchedNode, this);
if (exists == null) {
close();
boolean tryagain = true;
while (tryagain) {
tryagain = checkIfIamLeader(context, true);
}
}
}
// we don't kick off recovery here, the leader sync will do that if necessary for its replicas
} catch (AlreadyClosedException | InterruptedException e) {
log.info("Already shutting down");
} catch (Exception e) {
log.error("Exception in election", e);
}
}
@Override
public void close() throws IOException {
this.closed = true;
try {
zkClient.removeWatches(watchedNode, this, WatcherType.Any, true);
} catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
}
}
/**
* Set up any ZooKeeper nodes needed for leader election.
*/
public void setup(final ElectionContext context) {
this.context = context;
}
/**
* Sort n string sequence list.
*/
public static void sortSeqs(List<String> seqs) {
seqs.sort(Comparator.comparingInt(LeaderElector::getSeq).thenComparing(o -> o));
}
synchronized void retryElection(boolean joinAtHead) {
state = OUT_OF_ELECTION;
if (shouldRejectJoins()) {
log.info("Closed, won't rejoin election");
throw new AlreadyClosedException();
}
IOUtils.closeQuietly(this);
context.leaderSeqPath = null;
context.watchedSeqPath = null;
if (context instanceof ShardLeaderElectionContextBase) {
((ShardLeaderElectionContextBase) context).closed = false;
((ShardLeaderElectionContextBase) context).leaderZkNodeParentVersion = null;
}
isClosed = false;
isCancelled = false;
joinFuture = null;
state = OUT_OF_ELECTION;
joinElection(true, joinAtHead);
}
public boolean isLeader() {
return LEADER == state;
}
}