| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud; |
| |
| import org.apache.solr.common.AlreadyClosedException; |
| import org.apache.solr.common.ParWork; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.util.IOUtils; |
| import org.apache.solr.common.util.ObjectReleaseTracker; |
| import org.apache.solr.logging.MDCLoggingContext; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.ConnectionLossException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| /** |
| * Leader Election process. This class contains the logic by which a |
| * leader is chosen. First call * {@link #setup(ElectionContext)} to ensure |
| * the election process is init'd. Next call |
| * {@link #joinElection(boolean)} to start the leader election. |
| * |
| * The implementation follows the classic ZooKeeper recipe of creating an |
| * ephemeral, sequential node for each candidate and then looking at the set |
| * of such nodes - if the created node is the lowest sequential node, the |
| * candidate that created the node is the leader. If not, the candidate puts |
| * a watch on the next lowest node it finds, and if that node goes down, |
| * starts the whole process over by checking if it's the lowest sequential node, etc. |
| * |
| */ |
| public class LeaderElector implements Closeable { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| public static final String ELECTION_NODE = "/election"; |
| |
| public final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)"); |
| private final static Pattern SESSION_ID = Pattern.compile(".*?/?(.*?-.*?)-n_\\d+"); |
| |
| private static final char JOIN = 'j'; |
| private static final char CHECK_IF_LEADER = 'k'; |
| private static final char OUT_OF_ELECTION = 'o'; |
| private static final char POT_LEADER = 'p'; |
| private static final char LEADER = 'l'; |
| private static final char CLOSED = 'c'; |
| private static final char WAITING_IN_ELECTION = 'w'; |
| |
| protected final SolrZkClient zkClient; |
| private final ZkController zkController; |
| |
| private volatile ElectionContext context; |
| |
| private volatile ElectionWatcher watcher; |
| |
| private volatile boolean isClosed; |
| private volatile Future<?> joinFuture; |
| private volatile boolean isCancelled; |
| |
| private final ExecutorService executor = ParWork.getExecutorService(1); |
| |
| private volatile char state = OUT_OF_ELECTION; |
| |
| // public LeaderElector(SolrZkClient zkClient) { |
| // this.zkClient = zkClient; |
| // this.contextKey = null; |
| // this.electionContexts = new ConcurrentHashMap<>(132, 0.75f, 50); |
| // } |
| |
| public LeaderElector(ZkController zkController) { |
| |
| this.zkClient = zkController.getZkClient(); |
| this.zkController = zkController; |
| assert ObjectReleaseTracker.track(this); |
| } |
| |
| public ElectionContext getContext() { |
| return context; |
| } |
| |
| /** |
| * Check if the candidate with the given n_* sequence number is the leader. |
| * If it is, set the leaderId on the leader zk node. If it is not, start |
| * watching the candidate that is in line before this one - if it goes down, check |
| * if this candidate is the leader again. |
| * |
| * @param replacement has someone else been the leader already? |
| */ |
| private synchronized boolean checkIfIamLeader(final ElectionContext context, boolean replacement) throws InterruptedException { |
| //if (checkClosed(context)) return false; |
| MDCLoggingContext.setCoreName(context.leaderProps.getName()); |
| try { |
| if (log.isDebugEnabled()) log.debug("Check if I am leader {}", context.getClass().getSimpleName()); |
| if (isClosed) { |
| log.info("elector is closed, won't join election"); |
| return false; |
| } |
| |
| if (state == LEADER || state == POT_LEADER) { |
| return false; |
| } |
| |
| // executor.submit(() -> { |
| // context.checkIfIamLeaderFired(); |
| // }); |
| |
| state = CHECK_IF_LEADER; |
| // get all other numbers... |
| final String holdElectionPath = context.electionPath + ELECTION_NODE; |
| List<String> seqs; |
| try { |
| seqs = zkClient.getChildren(holdElectionPath, null, true); |
| } catch (KeeperException.SessionExpiredException e) { |
| log.error("ZooKeeper session has expired"); |
| state = OUT_OF_ELECTION; |
| return true; |
| } catch (KeeperException.NoNodeException e) { |
| log.info("the election node disappeared"); |
| state = OUT_OF_ELECTION; |
| return false; |
| } catch (KeeperException e) { |
| // we couldn't set our watch for some other reason, retry |
| log.warn("Failed setting election watch, retrying {} {}", e.getClass().getName(), e.getMessage()); |
| state = OUT_OF_ELECTION; |
| return true; |
| } catch (Exception e) { |
| // we couldn't set our watch for some other reason, retry |
| log.error("Failed on election getchildren call {} {}", e.getClass().getName(), e.getMessage()); |
| state = OUT_OF_ELECTION; |
| return false; |
| } |
| |
| try { |
| |
| sortSeqs(seqs); |
| |
| String leaderSeqNodeName; |
| try { |
| leaderSeqNodeName = context.leaderSeqPath.substring(context.leaderSeqPath.lastIndexOf('/') + 1); |
| } catch (NullPointerException e) { |
| state = OUT_OF_ELECTION; |
| if (log.isDebugEnabled()) log.debug("leaderSeqPath has been removed, bailing"); |
| return true; |
| } |
| if (!seqs.contains(leaderSeqNodeName)) { |
| log.warn("Our node is no longer in line to be leader"); |
| state = OUT_OF_ELECTION; |
| return false; |
| } |
| if (log.isDebugEnabled()) log.debug("The leader election node is {}", leaderSeqNodeName); |
| if (leaderSeqNodeName.equals(seqs.get(0))) { |
| // I am the leader |
| if (log.isDebugEnabled()) log.debug("I am the potential leader {}, running leader process", context.leaderProps.getName()); |
| ElectionWatcher oldWatcher = watcher; |
| if (oldWatcher != null) { |
| oldWatcher.close(); |
| } |
| |
| // if ((zkController != null && zkController.getCoreContainer().isShutDown())) { |
| // if (log.isDebugEnabled()) log.debug("Elector is closed, will not try and run leader processes"); |
| // state = OUT_OF_ELECTION; |
| // return false; |
| // } |
| |
| state = POT_LEADER; |
| runIamLeaderProcess(context, replacement); |
| return false; |
| |
| } else { |
| |
| if (state == LEADER || state == POT_LEADER) { |
| return false; |
| } |
| |
| String toWatch = seqs.get(0); |
| for (String node : seqs) { |
| if (leaderSeqNodeName.equals(node)) { |
| break; |
| } |
| toWatch = node; |
| } |
| try { |
| String watchedNode = holdElectionPath + "/" + toWatch; |
| |
| log.debug("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs); |
| |
| ElectionWatcher oldWatcher = watcher; |
| if (oldWatcher != null) { |
| IOUtils.closeQuietly(oldWatcher); |
| } |
| |
| state = WAITING_IN_ELECTION; |
| |
| watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context); |
| Stat exists = zkClient.exists(watchedNode, watcher); |
| if (exists == null) { |
| state = OUT_OF_ELECTION; |
| return true; |
| } |
| |
| |
| if (log.isDebugEnabled()) log.debug("Watching path {} to know if I could be the leader, my node is {}", watchedNode, context.leaderSeqPath); |
| |
| return false; |
| } catch (KeeperException.SessionExpiredException e) { |
| state = OUT_OF_ELECTION; |
| log.error("ZooKeeper session has expired"); |
| return true; |
| } catch (KeeperException.NoNodeException e) { |
| log.info("the previous node disappeared, check if we are the leader again"); |
| state = OUT_OF_ELECTION; |
| return true; |
| } catch (KeeperException e) { |
| // we couldn't set our watch for some other reason, retry |
| log.warn("Failed setting election watch, retrying {} {}", e.getClass().getName(), e.getMessage()); |
| state = OUT_OF_ELECTION; |
| return true; |
| } catch (AlreadyClosedException e) { |
| state = OUT_OF_ELECTION; |
| return false; |
| } catch (Exception e) { |
| // we couldn't set our watch for some other reason, retry |
| log.error("Failed setting election watch {} {}", e.getClass().getName(), e.getMessage()); |
| state = OUT_OF_ELECTION; |
| return true; |
| } |
| } |
| |
| } catch (KeeperException.SessionExpiredException e) { |
| log.error("ZooKeeper session has expired"); |
| state = OUT_OF_ELECTION; |
| return true; |
| } catch (AlreadyClosedException e) { |
| state = OUT_OF_ELECTION; |
| return false; |
| } catch (Exception e) { |
| log.error("Exception", e); |
| state = OUT_OF_ELECTION; |
| return false; |
| } |
| |
| } finally { |
| MDCLoggingContext.clear(); |
| } |
| } |
| |
| |
| // TODO: get this core param out of here |
| protected synchronized void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException, |
| InterruptedException, IOException { |
| if (state == CLOSED || isClosed) { |
| throw new AlreadyClosedException(); |
| } |
| // if (state == LEADER) { |
| // throw new IllegalStateException("Already in leader state"); |
| // } |
| boolean success = false; |
| try { |
| success = context.runLeaderProcess(context, weAreReplacement, 0); |
| if (success) { |
| state = LEADER; |
| } else { |
| log.warn("Failed becoming leader {}", context.leaderProps); |
| } |
| } finally { |
| if (!success) { |
| state = OUT_OF_ELECTION; |
| } |
| } |
| } |
| |
| /** |
| * Returns int given String of form n_0000000001 or n_0000000003, etc. |
| * |
| * @return sequence number |
| */ |
| public static int getSeq(String nStringSequence) { |
| int seq; |
| Matcher m = LEADER_SEQ.matcher(nStringSequence); |
| if (m.matches()) { |
| seq = Integer.parseInt(m.group(1)); |
| } else { |
| throw new IllegalStateException("Could not find regex match in:" |
| + nStringSequence); |
| } |
| return seq; |
| } |
| |
| private String getNodeId(String nStringSequence) { |
| String id; |
| Matcher m = SESSION_ID.matcher(nStringSequence); |
| if (m.matches()) { |
| id = m.group(1); |
| } else { |
| throw new IllegalStateException("Could not find regex match in:" |
| + nStringSequence); |
| } |
| return id; |
| } |
| |
| public static String getNodeName(String nStringSequence){ |
| |
| return nStringSequence; |
| |
| } |
| |
| public void joinElection(boolean replacement) { |
| joinElection(replacement, false); |
| } |
| |
| public void joinElection(boolean replacement,boolean joinAtHead) { |
| if (!isClosed && !zkController.getCoreContainer().isShutDown() && !zkController.isDcCalled()) { |
| joinFuture = executor.submit(() -> { |
| MDCLoggingContext.setCoreName(context.leaderProps.getName()); |
| MDCLoggingContext.setNode(zkController.getNodeName()); |
| try { |
| isCancelled = false; |
| doJoinElection(context, replacement, joinAtHead); |
| } catch (Exception e) { |
| log.error("Exception trying to join election", e); |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); |
| } finally { |
| MDCLoggingContext.clear(); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Begin participating in the election process. Gets a new sequential number |
| * and begins watching the node with the sequence number before it, unless it |
| * is the lowest number, in which case, initiates the leader process. If the |
| * node that is watched goes down, check if we are the new lowest node, else |
| * watch the next lowest numbered node. |
| * |
| */ |
| public synchronized void doJoinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException { |
| //if (checkClosed(context)) return false; |
| if (shouldRejectJoins() || state == CLOSED) { |
| log.info("Won't join election {}", state); |
| throw new AlreadyClosedException(); |
| } |
| |
| if (state == LEADER) { |
| log.error("Wrong state",new IllegalStateException("Got " + state)); |
| throw new IllegalStateException("Wrong state",new IllegalStateException("Got " + state)); |
| } |
| state = JOIN; |
| |
| isCancelled = false; |
| |
| ParWork.getRootSharedExecutor().submit(context::joinedElectionFired); |
| |
| final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE; |
| |
| long sessionId; |
| String id = null; |
| String leaderSeqPath; |
| |
| while (true) { |
| try { |
| sessionId = zkClient.getSessionId(); |
| id = sessionId + "-" + context.id; |
| if (joinAtHead){ |
| if (log.isDebugEnabled()) log.debug("Node {} trying to join election at the head", id); |
| List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath); |
| if(nodes.size() <2){ |
| leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, |
| CreateMode.EPHEMERAL_SEQUENTIAL, true); |
| } else { |
| String firstInLine = nodes.get(1); |
| if (log.isDebugEnabled()) log.debug("The current head: {}", firstInLine); |
| Matcher m = LEADER_SEQ.matcher(firstInLine); |
| if (!m.matches()) { |
| throw new IllegalStateException("Could not find regex match in:" |
| + firstInLine); |
| } |
| leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1); |
| zkClient.create(leaderSeqPath, (byte[]) null, CreateMode.EPHEMERAL, false); |
| } |
| } else { |
| if (log.isDebugEnabled()) log.debug("create ephem election node {}", shardsElectZkPath + "/" + id + "-n_"); |
| leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, CreateMode.EPHEMERAL_SEQUENTIAL, false); |
| } |
| |
| log.debug("Joined leadership election with path: {}", leaderSeqPath); |
| context.leaderSeqPath = leaderSeqPath; |
| state = JOIN; |
| break; |
| } catch (ConnectionLossException e) { |
| if (zkClient.getConnectionManager().getKeeper().getState() == ZooKeeper.States.CLOSED) { |
| log.info("Won't retry to create election node on ConnectionLoss because the client state is closed"); |
| break; |
| } |
| |
| // we don't know if we made our node or not... |
| List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true); |
| |
| for (String entry : entries) { |
| String nodeId = getNodeId(entry); |
| if (id.equals(nodeId)) { |
| // we did create our node... |
| break; |
| } |
| } |
| |
| } catch (KeeperException.NoNodeException e) { |
| throw new AlreadyClosedException(); |
| } |
| } |
| |
| getSeq(context.leaderSeqPath); |
| |
| if (log.isDebugEnabled()) log.debug("Do checkIfIamLeader"); |
| boolean tryagain = true; |
| |
| while (tryagain) { |
| tryagain = checkIfIamLeader(context, replacement); |
| } |
| |
| // boolean tryagain = false; |
| // while (tryagain) { |
| // tryagain = checkIfIamLeader(context, replacement); |
| // if (tryagain) { |
| // Thread.sleep(250); |
| // } |
| // } |
| |
| } |
| |
| private boolean shouldRejectJoins() { |
| return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || zkClient.isClosed(); |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| assert ObjectReleaseTracker.release(this); |
| state = CLOSED; |
| this.isClosed = true; |
| |
| IOUtils.closeQuietly(watcher); |
| watcher = null; |
| |
| if (context != null) { |
| try { |
| context.cancelElection(); |
| } catch (Exception e) { |
| log.warn("Exception canceling election", e); |
| } |
| } |
| try { |
| if (joinFuture != null) { |
| joinFuture.cancel(true); |
| } |
| } catch (Exception e) { |
| // okay |
| } |
| } |
| |
| public void cancel() { |
| |
| state = OUT_OF_ELECTION; |
| |
| try { |
| this.isCancelled = true; |
| IOUtils.closeQuietly(watcher); |
| if (context != null) { |
| context.cancelElection(); |
| } |
| Future<?> jf = joinFuture; |
| if (jf != null) { |
| jf.cancel(false); |
| // if (!shouldRejectJoins()) { |
| // try { |
| // jf.get(500, TimeUnit.MILLISECONDS); |
| // |
| // } catch (TimeoutException e) { |
| // |
| // } catch (Exception e) { |
| // log.error("Exception waiting for previous election attempt to finish {} {} cause={}", e.getClass().getSimpleName(), e.getMessage()); |
| // } |
| // } |
| |
| } |
| } catch (Exception e) { |
| log.warn("Exception canceling election", e); |
| } |
| } |
| |
| public boolean isClosed() { |
| return isClosed; |
| } |
| |
| public char getState() { |
| return state; |
| } |
| |
| private class ElectionWatcher implements Watcher, Closeable { |
| final String myNode, watchedNode; |
| final ElectionContext context; |
| private volatile boolean closed; |
| |
| private ElectionWatcher(String myNode, String watchedNode, ElectionContext context) { |
| this.myNode = myNode; |
| this.watchedNode = watchedNode; |
| this.context = context; |
| } |
| |
| @Override |
| public void process(WatchedEvent event) { |
| // session events are not change events, and do not remove the watcher |
| if (EventType.None.equals(event.getType()) || closed) { |
| return; |
| } |
| |
| if (log.isDebugEnabled()) log.debug("Got event on node we where watching in leader line {} watchedNode={}", myNode, watchedNode); |
| |
| if (state == LEADER) { |
| log.info("Election watcher fired, but we are already leader"); |
| return; |
| } |
| |
| if (isCancelled || isClosed) { |
| if (log.isDebugEnabled()) log.debug("This watcher is not active anymore {} isCancelled={} isClosed={}", myNode, isCancelled, isClosed); |
| return; |
| } |
| try { |
| if (event.getType() == EventType.NodeDeleted) { |
| // am I the next leader? |
| state = CHECK_IF_LEADER; |
| boolean tryagain = true; |
| while (tryagain) { |
| tryagain = checkIfIamLeader(context, true); |
| } |
| } else { |
| |
| Stat exists = zkClient.exists(watchedNode, this); |
| if (exists == null) { |
| close(); |
| boolean tryagain = true; |
| |
| while (tryagain) { |
| tryagain = checkIfIamLeader(context, true); |
| } |
| } |
| |
| } |
| // we don't kick off recovery here, the leader sync will do that if necessary for its replicas |
| } catch (AlreadyClosedException | InterruptedException e) { |
| log.info("Already shutting down"); |
| } catch (Exception e) { |
| log.error("Exception in election", e); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| this.closed = true; |
| try { |
| zkClient.removeWatches(watchedNode, this, WatcherType.Any, true); |
| } catch (KeeperException.NoWatcherException | AlreadyClosedException e) { |
| |
| } catch (Exception e) { |
| log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage()); |
| } |
| } |
| } |
| |
| /** |
| * Set up any ZooKeeper nodes needed for leader election. |
| */ |
| public void setup(final ElectionContext context) { |
| this.context = context; |
| } |
| |
| /** |
| * Sort n string sequence list. |
| */ |
| public static void sortSeqs(List<String> seqs) { |
| seqs.sort(Comparator.comparingInt(LeaderElector::getSeq).thenComparing(o -> o)); |
| } |
| |
| synchronized void retryElection(boolean joinAtHead) { |
| state = OUT_OF_ELECTION; |
| if (shouldRejectJoins()) { |
| log.info("Closed, won't rejoin election"); |
| throw new AlreadyClosedException(); |
| } |
| |
| IOUtils.closeQuietly(this); |
| context.leaderSeqPath = null; |
| context.watchedSeqPath = null; |
| if (context instanceof ShardLeaderElectionContextBase) { |
| ((ShardLeaderElectionContextBase) context).closed = false; |
| ((ShardLeaderElectionContextBase) context).leaderZkNodeParentVersion = null; |
| } |
| |
| isClosed = false; |
| isCancelled = false; |
| joinFuture = null; |
| state = OUT_OF_ELECTION; |
| joinElection(true, joinAtHead); |
| } |
| |
| public boolean isLeader() { |
| return LEADER == state; |
| } |
| } |