blob: e55ce2b620aff2c8fa87ecfc71c5c9f10540ec42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.solr.cloud.ZkController.ContextKey;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Leader Election process. This class contains the logic by which a
* leader is chosen. First call {@link #setup(ElectionContext)} to ensure
* the election process is init'd. Next call
* {@link #joinElection(ElectionContext, boolean)} to start the leader election.
*
* The implementation follows the classic ZooKeeper recipe of creating an
* ephemeral, sequential node for each candidate and then looking at the set
* of such nodes - if the created node is the lowest sequential node, the
* candidate that created the node is the leader. If not, the candidate puts
* a watch on the next lowest node it finds, and if that node goes down,
* starts the whole process over by checking if it's the lowest sequential node, etc.
*
*/
public class LeaderElector {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final String ELECTION_NODE = "/election";
public final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)");
private final static Pattern SESSION_ID = Pattern.compile(".*?/?(.*?-.*?)-n_\\d+");
private final static Pattern NODE_NAME = Pattern.compile(".*?/?(.*?-)(.*?)-n_\\d+");
protected SolrZkClient zkClient;
private ZkCmdExecutor zkCmdExecutor;
private volatile ElectionContext context;
private ElectionWatcher watcher;
private Map<ContextKey,ElectionContext> electionContexts;
private ContextKey contextKey;
public LeaderElector(SolrZkClient zkClient) {
this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
}
public LeaderElector(SolrZkClient zkClient, ContextKey key, Map<ContextKey,ElectionContext> electionContexts) {
this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
this.electionContexts = electionContexts;
this.contextKey = key;
}
public ElectionContext getContext() {
return context;
}
/**
* Check if the candidate with the given n_* sequence number is the leader.
* If it is, set the leaderId on the leader zk node. If it is not, start
* watching the candidate that is in line before this one - if it goes down, check
* if this candidate is the leader again.
*
* @param replacement has someone else been the leader already?
*/
private void checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException {
context.checkIfIamLeaderFired();
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
sortSeqs(seqs);
String leaderSeqNodeName = context.leaderSeqPath.substring(context.leaderSeqPath.lastIndexOf('/') + 1);
if (!seqs.contains(leaderSeqNodeName)) {
log.warn("Our node is no longer in line to be leader");
return;
}
// If any double-registrations exist for me, remove all but this latest one!
// TODO: can we even get into this state?
String prefix = zkClient.getSolrZooKeeper().getSessionId() + "-" + context.id + "-";
Iterator<String> it = seqs.iterator();
while (it.hasNext()) {
String elec = it.next();
if (!elec.equals(leaderSeqNodeName) && elec.startsWith(prefix)) {
try {
String toDelete = holdElectionPath + "/" + elec;
log.warn("Deleting duplicate registration: {}", toDelete);
zkClient.delete(toDelete, -1, true);
} catch (KeeperException.NoNodeException e) {
// ignore
}
it.remove();
}
}
if (leaderSeqNodeName.equals(seqs.get(0))) {
// I am the leader
try {
if (zkClient.isClosed()) return; // but our zkClient is already closed
runIamLeaderProcess(context, replacement);
} catch (KeeperException.NodeExistsException e) {
log.error("node exists",e);
retryElection(context, false);
return;
}
} else {
// I am not the leader - watch the node below me
String toWatch = seqs.get(0);
for (String node : seqs) {
if (leaderSeqNodeName.equals(node)) {
break;
}
toWatch = node;
}
try {
String watchedNode = holdElectionPath + "/" + toWatch;
zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, getSeq(context.leaderSeqPath), context), null, true);
log.debug("Watching path {} to know if I could be the leader", watchedNode);
} catch (KeeperException.SessionExpiredException e) {
throw e;
} catch (KeeperException.NoNodeException e) {
// the previous node disappeared, check if we are the leader again
checkIfIamLeader(context, true);
} catch (KeeperException e) {
// we couldn't set our watch for some other reason, retry
log.warn("Failed setting watch", e);
checkIfIamLeader(context, true);
}
}
}
protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
context.runLeaderProcess(weAreReplacement,0);
}
/**
* Returns int given String of form n_0000000001 or n_0000000003, etc.
*
* @return sequence number
*/
public static int getSeq(String nStringSequence) {
int seq = 0;
Matcher m = LEADER_SEQ.matcher(nStringSequence);
if (m.matches()) {
seq = Integer.parseInt(m.group(1));
} else {
throw new IllegalStateException("Could not find regex match in:"
+ nStringSequence);
}
return seq;
}
private String getNodeId(String nStringSequence) {
String id;
Matcher m = SESSION_ID.matcher(nStringSequence);
if (m.matches()) {
id = m.group(1);
} else {
throw new IllegalStateException("Could not find regex match in:"
+ nStringSequence);
}
return id;
}
public static String getNodeName(String nStringSequence){
String result;
Matcher m = NODE_NAME.matcher(nStringSequence);
if (m.matches()) {
result = m.group(2);
} else {
throw new IllegalStateException("Could not find regex match in:"
+ nStringSequence);
}
return result;
}
public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException {
return joinElection(context,replacement, false);
}
/**
* Begin participating in the election process. Gets a new sequential number
* and begins watching the node with the sequence number before it, unless it
* is the lowest number, in which case, initiates the leader process. If the
* node that is watched goes down, check if we are the new lowest node, else
* watch the next lowest numbered node.
*
* @return sequential node number
*/
public int joinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
context.joinedElectionFired();
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
String id = sessionId + "-" + context.id;
String leaderSeqPath = null;
boolean cont = true;
int tries = 0;
while (cont) {
try {
if(joinAtHead){
log.debug("Node {} trying to join election at the head", id);
List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
if(nodes.size() <2){
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);
} else {
String firstInLine = nodes.get(1);
log.debug("The current head: {}", firstInLine);
Matcher m = LEADER_SEQ.matcher(firstInLine);
if (!m.matches()) {
throw new IllegalStateException("Could not find regex match in:"
+ firstInLine);
}
leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1);
zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false);
}
} else {
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);
}
log.debug("Joined leadership election with path: {}", leaderSeqPath);
context.leaderSeqPath = leaderSeqPath;
cont = false;
} catch (ConnectionLossException e) {
// we don't know if we made our node or not...
List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);
boolean foundId = false;
for (String entry : entries) {
String nodeId = getNodeId(entry);
if (id.equals(nodeId)) {
// we did create our node...
foundId = true;
break;
}
}
if (!foundId) {
cont = true;
if (tries++ > 20) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
try {
Thread.sleep(50);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException.NoNodeException e) {
// we must have failed in creating the election node - someone else must
// be working on it, lets try again
if (tries++ > 20) {
context = null;
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
cont = true;
try {
Thread.sleep(50);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
}
}
}
checkIfIamLeader(context, replacement);
return getSeq(context.leaderSeqPath);
}
private class ElectionWatcher implements Watcher {
final String myNode,watchedNode;
final ElectionContext context;
private boolean canceled = false;
private ElectionWatcher(String myNode, String watchedNode, int seq, ElectionContext context) {
this.myNode = myNode;
this.watchedNode = watchedNode;
this.context = context;
}
void cancel() {
canceled = true;
}
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
}
if (canceled) {
log.debug("This watcher is not active anymore {}", myNode);
try {
zkClient.delete(myNode, -1, true);
} catch (KeeperException.NoNodeException nne) {
// expected . don't do anything
} catch (Exception e) {
log.warn("My watched node still exists and can't remove {}", myNode, e);
}
return;
}
try {
// am I the next leader?
checkIfIamLeader(context, true);
} catch (AlreadyClosedException e) {
} catch (Exception e) {
if (!zkClient.isClosed()) {
log.warn("", e);
}
}
}
}
/**
* Set up any ZooKeeper nodes needed for leader election.
*/
public void setup(final ElectionContext context) throws InterruptedException,
KeeperException {
String electZKPath = context.electionPath + LeaderElector.ELECTION_NODE;
if (context instanceof OverseerElectionContext) {
zkCmdExecutor.ensureExists(electZKPath, zkClient);
} else {
// we use 2 param so that replica won't create /collection/{collection} if it doesn't exist
zkCmdExecutor.ensureExists(electZKPath, (byte[])null, CreateMode.PERSISTENT, zkClient, 2);
}
this.context = context;
}
/**
* Sort n string sequence list.
*/
public static void sortSeqs(List<String> seqs) {
seqs.sort(Comparator.comparingInt(LeaderElector::getSeq).thenComparing(Function.identity()));
}
void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
ElectionWatcher watcher = this.watcher;
ElectionContext ctx = context.copy();
if (electionContexts != null) {
electionContexts.put(contextKey, ctx);
}
if (watcher != null) watcher.cancel();
this.context.cancelElection();
this.context.close();
this.context = ctx;
joinElection(ctx, true, joinAtHead);
}
}