| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| package org.apache.zookeeper.server.quorum; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.zookeeper.jmx.MBeanRegistry; |
| import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message; |
| import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; |
| import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; |
| import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; |
| import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; |
| import org.apache.zookeeper.server.util.ZxidUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * Implementation of leader election using TCP. It uses an object of the class |
| * QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based |
| * as with the other UDP implementations. |
| * |
| * There are a few parameters that can be tuned to change its behavior. First, |
| * finalizeWait determines the amount of time to wait until deciding upon a leader. |
| * This is part of the leader election algorithm. |
| */ |
| |
| |
| public class FastLeaderElection implements Election { |
| private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class); |
| |
| /** |
| * Determine how much time a process has to wait |
| * once it believes that it has reached the end of |
| * leader election. |
| */ |
| final static int finalizeWait = 200; |
| |
| |
| /** |
| * Upper bound on the amount of time between two consecutive |
| * notification checks. This impacts the amount of time to get |
| * the system up again after long partitions. Currently 60 seconds. |
| */ |
| |
| final static int maxNotificationInterval = 60000; |
| |
| /** |
| * This value is passed to the methods that check the quorum |
| * majority of an established ensemble for those values that |
| * should not be taken into account in the comparison |
| * (electionEpoch and zxid). |
| */ |
| final static int IGNOREVALUE = -1; |
| |
| /** |
| * Connection manager. Fast leader election uses TCP for |
| * communication between peers, and QuorumCnxManager manages |
| * such connections. |
| */ |
| |
| QuorumCnxManager manager; |
| |
| |
| /** |
| * Notifications are messages that let other peers know that |
| * a given peer has changed its vote, either because it has |
| * joined leader election or because it learned of another |
| * peer with higher zxid or same zxid and higher server id |
| */ |
| |
| static public class Notification { |
| /* |
| * Proposed leader |
| */ |
| long leader; |
| |
| /* |
| * zxid of the proposed leader |
| */ |
| long zxid; |
| |
| /* |
| * Epoch |
| */ |
| long electionEpoch; |
| |
| /* |
| * current state of sender |
| */ |
| QuorumPeer.ServerState state; |
| |
| /* |
| * Address of sender |
| */ |
| long sid; |
| |
| QuorumVerifier qv; |
| /* |
| * epoch of the proposed leader |
| */ |
| long peerEpoch; |
| } |
| |
| static byte[] dummyData = new byte[0]; |
| |
| /** |
| * Messages that a peer wants to send to other peers. |
| * These messages can be both Notifications and Acks |
| * of reception of notification. |
| */ |
| static public class ToSend { |
| static enum mType {crequest, challenge, notification, ack} |
| |
| ToSend(mType type, |
| long leader, |
| long zxid, |
| long electionEpoch, |
| ServerState state, |
| long sid, |
| long peerEpoch, |
| byte[] configData) { |
| |
| |
| this.leader = leader; |
| this.zxid = zxid; |
| this.electionEpoch = electionEpoch; |
| this.state = state; |
| this.sid = sid; |
| this.peerEpoch = peerEpoch; |
| this.configData = configData; |
| } |
| |
| /* |
| * Proposed leader in the case of notification |
| */ |
| long leader; |
| |
| /* |
| * id contains the tag for acks, and zxid for notifications |
| */ |
| long zxid; |
| |
| /* |
| * Epoch |
| */ |
| long electionEpoch; |
| |
| /* |
| * Current state; |
| */ |
| QuorumPeer.ServerState state; |
| |
| /* |
| * Address of recipient |
| */ |
| long sid; |
| |
| /* |
| * Used to send a QuorumVerifier (configuration info) |
| */ |
| byte[] configData = dummyData; |
| |
| /* |
| * Leader epoch |
| */ |
| long peerEpoch; |
| } |
| |
| LinkedBlockingQueue<ToSend> sendqueue; |
| LinkedBlockingQueue<Notification> recvqueue; |
| |
| /** |
| * Multi-threaded implementation of message handler. Messenger |
| * implements two sub-classes: WorkReceiver and WorkSender. The |
| * functionality of each is obvious from the name. Each of these |
| * spawns a new thread. |
| */ |
| |
| private class Messenger { |
| |
| /** |
| * Receives messages from instance of QuorumCnxManager on |
| * method run(), and processes such messages. |
| */ |
| |
| class WorkerReceiver implements Runnable { |
| volatile boolean stop; |
| QuorumCnxManager manager; |
| |
| WorkerReceiver(QuorumCnxManager manager) { |
| this.stop = false; |
| this.manager = manager; |
| } |
| |
| public void run() { |
| |
| Message response; |
| while (!stop) { |
| // Sleeps on receive |
| try{ |
| response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); |
| if(response == null) continue; |
| |
| // The current protocol and two previous generations all send at least 28 bytes |
| if (response.buffer.capacity() < 28) { |
| LOG.error("Got a short response: " + response.buffer.capacity()); |
| continue; |
| } |
| |
| // this is the backwardCompatibility mode in place before ZK-107 |
| // It is for a version of the protocol in which we didn't send peer epoch |
| // With peer epoch the message became 36 bytes |
| boolean backCompatibility28 = (response.buffer.capacity() == 28); |
| |
| // ZK-107 sends the configuration info in every message. |
| // So messages are 36 bytes + size of configuration info |
| // (variable length, shoulld be at the end of the message). |
| boolean backCompatibility36 = (response.buffer.capacity() == 36); |
| |
| response.buffer.clear(); |
| int rstate = response.buffer.getInt(); |
| long rleader = response.buffer.getLong(); |
| long rzxid = response.buffer.getLong(); |
| long relectionEpoch = response.buffer.getLong(); |
| long rpeerepoch; |
| |
| if(!backCompatibility28){ |
| rpeerepoch = response.buffer.getLong(); |
| } else { |
| if(LOG.isInfoEnabled()){ |
| LOG.info("Backward compatibility mode (28 bits), server id: " + response.sid); |
| } |
| rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid); |
| } |
| |
| QuorumVerifier rqv = null; |
| |
| // check if we have more than 36 bytes. If so extract config info from message. |
| if(!backCompatibility28 && !backCompatibility36){ |
| byte b[] = new byte[response.buffer.remaining()]; |
| response.buffer.get(b); |
| |
| synchronized(self){ |
| try { |
| rqv = self.configFromString(new String(b)); |
| QuorumVerifier curQV = self.getQuorumVerifier(); |
| if (rqv.getVersion() > curQV.getVersion()) { |
| LOG.info(self.getId() + " Received version: " + Long.toHexString(rqv.getVersion()) + " my version: " + Long.toHexString(self.getQuorumVerifier().getVersion())); |
| self.processReconfig(rqv, null, null, false); |
| if (!rqv.equals(curQV)) { |
| LOG.info("restarting leader election"); |
| self.shuttingDownLE = true; |
| self.getElectionAlg().shutdown(); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("Something went wrong while processing config received from " + response.sid); |
| } catch (ConfigException e) { |
| LOG.error("Something went wrong while processing config received from " + response.sid); |
| } |
| } |
| } else { |
| if(LOG.isInfoEnabled()){ |
| LOG.info("Backward compatibility mode (before reconfig), server id: " + response.sid); |
| } |
| } |
| |
| /* |
| * If it is from a non-voting server (such as an observer or |
| * a non-voting follower), respond right away. |
| */ |
| if(!self.getVotingView().containsKey(response.sid)){ |
| Vote current = self.getCurrentVote(); |
| QuorumVerifier qv = self.getQuorumVerifier(); |
| ToSend notmsg = new ToSend(ToSend.mType.notification, |
| current.getId(), |
| current.getZxid(), |
| logicalclock, |
| self.getPeerState(), |
| response.sid, |
| current.getPeerEpoch(), |
| qv.toString().getBytes()); |
| |
| sendqueue.offer(notmsg); |
| } else { |
| // Receive new message |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Receive new notification message. My id = " |
| + self.getId()); |
| } |
| |
| // State of peer that sent this message |
| QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; |
| switch (rstate) { |
| case 0: |
| ackstate = QuorumPeer.ServerState.LOOKING; |
| break; |
| case 1: |
| ackstate = QuorumPeer.ServerState.FOLLOWING; |
| break; |
| case 2: |
| ackstate = QuorumPeer.ServerState.LEADING; |
| break; |
| case 3: |
| ackstate = QuorumPeer.ServerState.OBSERVING; |
| break; |
| } |
| |
| // Instantiate Notification and set its attributes |
| Notification n = new Notification(); |
| n.leader = rleader; |
| n.zxid = rzxid; |
| n.electionEpoch = relectionEpoch; |
| n.state = ackstate; |
| n.sid = response.sid; |
| n.peerEpoch = rpeerepoch; |
| n.qv = rqv; |
| /* |
| * Print notification info |
| */ |
| if(LOG.isInfoEnabled()){ |
| printNotification(n); |
| } |
| |
| /* |
| * If this server is looking, then send proposed leader |
| */ |
| |
| if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ |
| recvqueue.offer(n); |
| |
| /* |
| * Send a notification back if the peer that sent this |
| * message is also looking and its logical clock is |
| * lagging behind. |
| */ |
| if((ackstate == QuorumPeer.ServerState.LOOKING) |
| && (n.electionEpoch < logicalclock)){ |
| Vote v = getVote(); |
| QuorumVerifier qv = self.getQuorumVerifier(); |
| ToSend notmsg = new ToSend(ToSend.mType.notification, |
| v.getId(), |
| v.getZxid(), |
| logicalclock, |
| self.getPeerState(), |
| response.sid, |
| v.getPeerEpoch(), |
| qv.toString().getBytes()); |
| sendqueue.offer(notmsg); |
| } |
| } else { |
| /* |
| * If this server is not looking, but the one that sent the ack |
| * is looking, then send back what it believes to be the leader. |
| */ |
| Vote current = self.getCurrentVote(); |
| if(ackstate == QuorumPeer.ServerState.LOOKING){ |
| if(LOG.isDebugEnabled()){ |
| LOG.debug("Sending new notification. My id = " + |
| self.getId() + " recipient=" + |
| response.sid + " zxid=0x" + |
| Long.toHexString(current.getZxid()) + |
| " leader=" + current.getId() + " config version = " + |
| Long.toHexString(self.getQuorumVerifier().getVersion())); |
| } |
| |
| QuorumVerifier qv = self.getQuorumVerifier(); |
| ToSend notmsg = new ToSend( |
| ToSend.mType.notification, |
| current.getId(), |
| current.getZxid(), |
| current.getElectionEpoch(), |
| self.getPeerState(), |
| response.sid, |
| current.getPeerEpoch(), |
| qv.toString().getBytes()); |
| sendqueue.offer(notmsg); |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted Exception while waiting for new message" + |
| e.toString()); |
| } |
| } |
| LOG.info("WorkerReceiver is down"); |
| } |
| } |
| |
| |
| |
| |
| /** |
| * This worker simply dequeues a message to send and |
| * and queues it on the manager's queue. |
| */ |
| |
| class WorkerSender implements Runnable { |
| volatile boolean stop; |
| QuorumCnxManager manager; |
| |
| WorkerSender(QuorumCnxManager manager){ |
| this.stop = false; |
| this.manager = manager; |
| } |
| |
| public void run() { |
| while (!stop) { |
| try { |
| ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); |
| if(m == null) continue; |
| |
| process(m); |
| } catch (InterruptedException e) { |
| break; |
| } |
| } |
| LOG.info("WorkerSender is down"); |
| } |
| |
| /** |
| * Called by run() once there is a new message to send. |
| * |
| * @param m message to send |
| */ |
| private void process(ToSend m) { |
| byte requestBytes[] = new byte[36 + m.configData.length]; |
| ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); |
| |
| /* |
| * Building notification packet to send |
| */ |
| |
| requestBuffer.clear(); |
| requestBuffer.putInt(m.state.ordinal()); |
| requestBuffer.putLong(m.leader); |
| requestBuffer.putLong(m.zxid); |
| requestBuffer.putLong(m.electionEpoch); |
| requestBuffer.putLong(m.peerEpoch); |
| requestBuffer.put(m.configData); |
| |
| manager.toSend(m.sid, requestBuffer); |
| |
| } |
| } |
| |
| WorkerSender ws; |
| WorkerReceiver wr; |
| |
| /** |
| * Constructor of class Messenger. |
| * |
| * @param manager Connection manager |
| */ |
| Messenger(QuorumCnxManager manager) { |
| |
| this.ws = new WorkerSender(manager); |
| |
| Thread t = new Thread(this.ws, |
| "WorkerSender[myid=" + self.getId() + "]"); |
| t.setDaemon(true); |
| t.start(); |
| |
| this.wr = new WorkerReceiver(manager); |
| |
| t = new Thread(this.wr, |
| "WorkerReceiver[myid=" + self.getId() + "]"); |
| t.setDaemon(true); |
| t.start(); |
| } |
| |
| /** |
| * Stops instances of WorkerSender and WorkerReceiver |
| */ |
| void halt(){ |
| this.ws.stop = true; |
| this.wr.stop = true; |
| } |
| |
| } |
| |
| QuorumPeer self; |
| Messenger messenger; |
| volatile long logicalclock; /* Election instance */ |
| long proposedLeader; |
| long proposedZxid; |
| long proposedEpoch; |
| |
| |
| /** |
| * Returns the current vlue of the logical clock counter |
| */ |
| public long getLogicalClock(){ |
| return logicalclock; |
| } |
| |
| /** |
| * Constructor of FastLeaderElection. It takes two parameters, one |
| * is the QuorumPeer object that instantiated this object, and the other |
| * is the connection manager. Such an object should be created only once |
| * by each peer during an instance of the ZooKeeper service. |
| * |
| * @param self QuorumPeer that created this object |
| * @param manager Connection manager |
| */ |
| public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ |
| this.stop = false; |
| this.manager = manager; |
| starter(self, manager); |
| } |
| |
| /** |
| * This method is invoked by the constructor. Because it is a |
| * part of the starting procedure of the object that must be on |
| * any constructor of this class, it is probably best to keep as |
| * a separate method. As we have a single constructor currently, |
| * it is not strictly necessary to have it separate. |
| * |
| * @param self QuorumPeer that created this object |
| * @param manager Connection manager |
| */ |
| private void starter(QuorumPeer self, QuorumCnxManager manager) { |
| this.self = self; |
| proposedLeader = -1; |
| proposedZxid = -1; |
| |
| sendqueue = new LinkedBlockingQueue<ToSend>(); |
| recvqueue = new LinkedBlockingQueue<Notification>(); |
| this.messenger = new Messenger(manager); |
| } |
| |
| private void leaveInstance(Vote v) { |
| if(LOG.isDebugEnabled()){ |
| LOG.debug("About to leave FLE instance: leader=" |
| + v.getId() + ", zxid=0x" + |
| Long.toHexString(v.getZxid()) + ", my id=" + self.getId() |
| + ", my state=" + self.getPeerState()); |
| } |
| recvqueue.clear(); |
| } |
| |
| public QuorumCnxManager getCnxManager(){ |
| return manager; |
| } |
| |
| volatile boolean stop; |
| public void shutdown(){ |
| stop = true; |
| LOG.debug("Shutting down connection manager"); |
| manager.halt(); |
| LOG.debug("Shutting down messenger"); |
| messenger.halt(); |
| LOG.debug("FLE is down"); |
| } |
| |
| |
| /** |
| * Send notifications to all peers upon a change in our vote |
| */ |
| private void sendNotifications() { |
| for (long sid : self.getAllKnownServerIds()) { |
| QuorumVerifier qv = self.getQuorumVerifier(); |
| ToSend notmsg = new ToSend(ToSend.mType.notification, |
| proposedLeader, |
| proposedZxid, |
| logicalclock, |
| QuorumPeer.ServerState.LOOKING, |
| sid, |
| proposedEpoch, qv.toString().getBytes()); |
| if(LOG.isDebugEnabled()){ |
| LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + |
| Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) + |
| " (n.round), " + sid + " (recipient), " + self.getId() + |
| " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); |
| } |
| sendqueue.offer(notmsg); |
| } |
| } |
| |
| private void printNotification(Notification n){ |
| LOG.info("Notification: " + n.leader + " (n.leader), 0x" |
| + Long.toHexString(n.zxid) + " (n.zxid), 0x" |
| + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state |
| + " (n.state), " + n.sid + " (n.sid), 0x" |
| + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), " |
| + self.getPeerState() + " (my state)" + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):"")); |
| } |
| |
| |
| /** |
| * Check if a pair (server id, zxid) succeeds our |
| * current vote. |
| * |
| * @param id Server identifier |
| * @param zxid Last zxid observed by the issuer of this vote |
| */ |
| protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { |
| LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + |
| Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); |
| if(self.getQuorumVerifier().getWeight(newId) == 0){ |
| return false; |
| } |
| |
| /* |
| * We return true if one of the following three cases hold: |
| * 1- New epoch is higher |
| * 2- New epoch is the same as current epoch, but new zxid is higher |
| * 3- New epoch is the same as current epoch, new zxid is the same |
| * as current zxid, but server id is higher. |
| */ |
| |
| return ((newEpoch > curEpoch) || |
| ((newEpoch == curEpoch) && |
| ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); |
| } |
| |
| /** |
| * Termination predicate. Given a set of votes, determines if |
| * have sufficient to declare the end of the election round. |
| * |
| * @param votes Set of votes |
| * @param vote Identifier of the vote received last |
| */ |
| private boolean termPredicate( |
| HashMap<Long, Vote> votes, |
| Vote vote) { |
| |
| HashSet<Long> set = new HashSet<Long>(); |
| |
| /* |
| * First make the views consistent. Sometimes peers will have |
| * different zxids for a server depending on timing. |
| */ |
| for (Map.Entry<Long,Vote> entry : votes.entrySet()) { |
| if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey()) |
| && vote.equals(entry.getValue())){ |
| set.add(entry.getKey()); |
| } |
| } |
| |
| return self.getQuorumVerifier().containsQuorum(set); |
| } |
| /** |
| * In the case there is a leader elected, and a quorum supporting |
| * this leader, we have to check if the leader has voted and acked |
| * that it is leading. We need this check to avoid that peers keep |
| * electing over and over a peer that has crashed and it is no |
| * longer leading. |
| * |
| * @param votes set of votes |
| * @param leader leader id |
| * @param electionEpoch epoch id |
| */ |
| private boolean checkLeader( |
| HashMap<Long, Vote> votes, |
| long leader, |
| long electionEpoch){ |
| |
| boolean predicate = true; |
| |
| /* |
| * If everyone else thinks I'm the leader, I must be the leader. |
| * The other two checks are just for the case in which I'm not the |
| * leader. If I'm not the leader and I haven't received a message |
| * from leader stating that it is leading, then predicate is false. |
| */ |
| |
| if(leader != self.getId()){ |
| if(votes.get(leader) == null) predicate = false; |
| else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false; |
| } else if(logicalclock != electionEpoch) { |
| predicate = false; |
| } |
| |
| return predicate; |
| } |
| |
| synchronized void updateProposal(long leader, long zxid, long epoch){ |
| if(LOG.isDebugEnabled()){ |
| LOG.debug("Updating proposal: " + leader + " (newleader), 0x" |
| + Long.toHexString(zxid) + " (newzxid), " + proposedLeader |
| + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)"); |
| } |
| proposedLeader = leader; |
| proposedZxid = zxid; |
| proposedEpoch = epoch; |
| } |
| |
| synchronized Vote getVote(){ |
| return new Vote(proposedLeader, proposedZxid, proposedEpoch); |
| } |
| |
| /** |
| * A learning state can be either FOLLOWING or OBSERVING. |
| * This method simply decides which one depending on the |
| * role of the server. |
| * |
| * @return ServerState |
| */ |
| private ServerState learningState(){ |
| if(self.getLearnerType() == LearnerType.PARTICIPANT){ |
| LOG.debug("I'm a participant: " + self.getId()); |
| return ServerState.FOLLOWING; |
| } |
| else{ |
| LOG.debug("I'm an observer: " + self.getId()); |
| return ServerState.OBSERVING; |
| } |
| } |
| |
| /** |
| * Returns the initial vote value of server identifier. |
| * |
| * @return long |
| */ |
| private long getInitId(){ |
| if(self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) |
| return self.getId(); |
| else return Long.MIN_VALUE; |
| } |
| |
| /** |
| * Returns initial last logged zxid. |
| * |
| * @return long |
| */ |
| private long getInitLastLoggedZxid(){ |
| if(self.getLearnerType() == LearnerType.PARTICIPANT) |
| return self.getLastLoggedZxid(); |
| else return Long.MIN_VALUE; |
| } |
| |
| /** |
| * Returns the initial vote value of the peer epoch. |
| * |
| * @return long |
| */ |
| private long getPeerEpoch(){ |
| if(self.getLearnerType() == LearnerType.PARTICIPANT) |
| try { |
| return self.getCurrentEpoch(); |
| } catch(IOException e) { |
| RuntimeException re = new RuntimeException(e.getMessage()); |
| re.setStackTrace(e.getStackTrace()); |
| throw re; |
| } |
| else return Long.MIN_VALUE; |
| } |
| |
| /** |
| * Starts a new round of leader election. Whenever our QuorumPeer |
| * changes its state to LOOKING, this method is invoked, and it |
| * sends notifications to all other peers. |
| */ |
| public Vote lookForLeader() throws InterruptedException { |
| try { |
| self.jmxLeaderElectionBean = new LeaderElectionBean(); |
| MBeanRegistry.getInstance().register( |
| self.jmxLeaderElectionBean, self.jmxLocalPeerBean); |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| self.jmxLeaderElectionBean = null; |
| } |
| if (self.start_fle == 0) { |
| self.start_fle = System.currentTimeMillis(); |
| } |
| try { |
| HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); |
| |
| HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); |
| |
| int notTimeout = finalizeWait; |
| |
| synchronized(this){ |
| logicalclock++; |
| updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); |
| } |
| |
| LOG.info("New election. My id = " + self.getId() + |
| ", proposed zxid=0x" + Long.toHexString(proposedZxid)); |
| sendNotifications(); |
| |
| /* |
| * Loop in which we exchange notifications until we find a leader |
| */ |
| |
| while ((self.getPeerState() == ServerState.LOOKING) && |
| (!stop)){ |
| /* |
| * Remove next notification from queue, times out after 2 times |
| * the termination time |
| */ |
| Notification n = recvqueue.poll(notTimeout, |
| TimeUnit.MILLISECONDS); |
| |
| /* |
| * Sends more notifications if haven't received enough. |
| * Otherwise processes new notification. |
| */ |
| if(n == null){ |
| if(manager.haveDelivered()){ |
| sendNotifications(); |
| } else { |
| manager.connectAll(); |
| } |
| |
| /* |
| * Exponential backoff |
| */ |
| int tmpTimeOut = notTimeout*2; |
| notTimeout = (tmpTimeOut < maxNotificationInterval? |
| tmpTimeOut : maxNotificationInterval); |
| LOG.info("Notification time out: " + notTimeout); |
| } |
| else if(self.getVotingView().containsKey(n.sid)) { |
| /* |
| * Only proceed if the vote comes from a replica in the |
| * voting view. |
| */ |
| switch (n.state) { |
| case LOOKING: |
| // If notification > current, replace and send messages out |
| if (n.electionEpoch > logicalclock) { |
| logicalclock = n.electionEpoch; |
| recvset.clear(); |
| if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, |
| getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { |
| updateProposal(n.leader, n.zxid, n.peerEpoch); |
| } else { |
| updateProposal(getInitId(), |
| getInitLastLoggedZxid(), |
| getPeerEpoch()); |
| } |
| sendNotifications(); |
| } else if (n.electionEpoch < logicalclock) { |
| if(LOG.isDebugEnabled()){ |
| LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" |
| + Long.toHexString(n.electionEpoch) |
| + ", logicalclock=0x" + Long.toHexString(logicalclock)); |
| } |
| break; |
| } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, |
| proposedLeader, proposedZxid, proposedEpoch)) { |
| updateProposal(n.leader, n.zxid, n.peerEpoch); |
| sendNotifications(); |
| } |
| |
| if(LOG.isDebugEnabled()){ |
| LOG.debug("Adding vote: from=" + n.sid + |
| ", proposed leader=" + n.leader + |
| ", proposed zxid=0x" + Long.toHexString(n.zxid) + |
| ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); |
| } |
| |
| recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); |
| |
| if (termPredicate(recvset, |
| new Vote(proposedLeader, proposedZxid, |
| logicalclock, proposedEpoch))) { |
| |
| // Verify if there is any change in the proposed leader |
| while((n = recvqueue.poll(finalizeWait, |
| TimeUnit.MILLISECONDS)) != null){ |
| if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, |
| proposedLeader, proposedZxid, proposedEpoch)){ |
| recvqueue.put(n); |
| break; |
| } |
| } |
| |
| /* |
| * This predicate is true once we don't read any new |
| * relevant message from the reception queue |
| */ |
| if (n == null) { |
| self.setPeerState((proposedLeader == self.getId()) ? |
| ServerState.LEADING: learningState()); |
| |
| Vote endVote = new Vote(proposedLeader, |
| proposedZxid, proposedEpoch); |
| leaveInstance(endVote); |
| return endVote; |
| } |
| } |
| break; |
| case OBSERVING: |
| LOG.debug("Notification from observer: " + n.sid); |
| break; |
| case FOLLOWING: |
| case LEADING: |
| /* |
| * Consider all notifications from the same epoch |
| * together. |
| */ |
| if(n.electionEpoch == logicalclock){ |
| recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); |
| if(termPredicate(recvset, new Vote(n.leader, |
| n.zxid, n.electionEpoch, n.peerEpoch, n.state)) |
| && checkLeader(outofelection, n.leader, n.electionEpoch)) { |
| self.setPeerState((n.leader == self.getId()) ? |
| ServerState.LEADING: learningState()); |
| |
| Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); |
| leaveInstance(endVote); |
| return endVote; |
| } |
| } |
| |
| /* |
| * Before joining an established ensemble, verify that |
| * a majority are following the same leader. |
| * Only peer epoch is used to check that the votes come |
| * from the same ensemble. This is because there is at |
| * least one corner case in which the ensemble can be |
| * created with inconsistent zxid and election epoch |
| * info. However, given that only one ensemble can be |
| * running at a single point in time and that each |
| * epoch is used only once, using only the epoch to |
| * compare the votes is sufficient. |
| * |
| * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 |
| */ |
| outofelection.put(n.sid, new Vote(n.leader, |
| IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)); |
| if (termPredicate(outofelection, new Vote(n.leader, |
| IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state)) |
| && checkLeader(outofelection, n.leader, IGNOREVALUE)) { |
| synchronized(this){ |
| logicalclock = n.electionEpoch; |
| self.setPeerState((n.leader == self.getId()) ? |
| ServerState.LEADING: learningState()); |
| } |
| Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch); |
| leaveInstance(endVote); |
| return endVote; |
| } |
| break; |
| default: |
| LOG.warn("Notification state unrecoginized: " + n.state |
| + " (n.state), " + n.sid + " (n.sid)"); |
| break; |
| } |
| } else { |
| LOG.warn("Ignoring notification from non-cluster member " + n.sid); |
| } |
| } |
| return null; |
| } finally { |
| try { |
| if(self.jmxLeaderElectionBean != null){ |
| MBeanRegistry.getInstance().unregister( |
| self.jmxLeaderElectionBean); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to unregister with JMX", e); |
| } |
| self.jmxLeaderElectionBean = null; |
| } |
| } |
| } |