| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zookeeper.server.quorum; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import java.io.IOException; |
| import java.net.DatagramPacket; |
| import java.net.DatagramSocket; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.SocketException; |
| import java.nio.ByteBuffer; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.zookeeper.common.Time; |
| import org.apache.zookeeper.jmx.MBeanRegistry; |
| import org.apache.zookeeper.server.ZooKeeperThread; |
| import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; |
| import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * @deprecated This class has been deprecated as of release 3.4.0. |
| */ |
| @Deprecated |
| public class AuthFastLeaderElection implements Election { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AuthFastLeaderElection.class); |
| |
| /* Sequence numbers for messages */ |
| static int sequencer = 0; |
| static int maxTag = 0; |
| |
| /* |
| * Determine how much time a process has to wait once it believes that it |
| * has reached the end of leader election. |
| */ |
| static int finalizeWait = 100; |
| |
| /* |
| * Challenge counter to avoid replay attacks |
| */ |
| |
| static int challengeCounter = 0; |
| |
| /* |
| * Flag to determine whether to authenticate or not |
| */ |
| |
| private boolean authEnabled = false; |
| |
| public static class Notification { |
| |
| /* |
| * Proposed leader |
| */ long leader; |
| |
| /* |
| * zxid of the proposed leader |
| */ long zxid; |
| |
| /* |
| * Epoch |
| */ long epoch; |
| |
| /* |
| * current state of sender |
| */ QuorumPeer.ServerState state; |
| |
| /* |
| * Address of the sender |
| */ InetSocketAddress addr; |
| |
| } |
| |
| /* |
| * Messages to send, both Notifications and Acks |
| */ |
| public static class ToSend { |
| |
| enum mType { |
| crequest, |
| challenge, |
| notification, |
| ack |
| } |
| |
| ToSend(mType type, long tag, long leader, long zxid, long epoch, ServerState state, InetSocketAddress addr) { |
| |
| switch (type) { |
| case crequest: |
| this.type = 0; |
| this.tag = tag; |
| this.leader = leader; |
| this.zxid = zxid; |
| this.epoch = epoch; |
| this.state = state; |
| this.addr = addr; |
| |
| break; |
| case challenge: |
| this.type = 1; |
| this.tag = tag; |
| this.leader = leader; |
| this.zxid = zxid; |
| this.epoch = epoch; |
| this.state = state; |
| this.addr = addr; |
| |
| break; |
| case notification: |
| this.type = 2; |
| this.leader = leader; |
| this.zxid = zxid; |
| this.epoch = epoch; |
| this.state = QuorumPeer.ServerState.LOOKING; |
| this.tag = tag; |
| this.addr = addr; |
| |
| break; |
| case ack: |
| this.type = 3; |
| this.tag = tag; |
| this.leader = leader; |
| this.zxid = zxid; |
| this.epoch = epoch; |
| this.state = state; |
| this.addr = addr; |
| |
| break; |
| default: |
| break; |
| } |
| } |
| |
| /* |
| * Message type: 0 notification, 1 acknowledgement |
| */ int type; |
| |
| /* |
| * Proposed leader in the case of notification |
| */ long leader; |
| |
| /* |
| * id contains the tag for acks, and zxid for notifications |
| */ long zxid; |
| |
| /* |
| * Epoch |
| */ long epoch; |
| |
| /* |
| * Current state; |
| */ QuorumPeer.ServerState state; |
| |
| /* |
| * Message tag |
| */ long tag; |
| |
| InetSocketAddress addr; |
| |
| } |
| |
| LinkedBlockingQueue<ToSend> sendqueue; |
| |
| LinkedBlockingQueue<Notification> recvqueue; |
| |
| private class Messenger { |
| |
| final DatagramSocket mySocket; |
| long lastProposedLeader; |
| long lastProposedZxid; |
| long lastEpoch; |
| final Set<Long> ackset; |
| final ConcurrentHashMap<Long, Long> challengeMap; |
| final ConcurrentHashMap<Long, Semaphore> challengeMutex; |
| final ConcurrentHashMap<Long, Semaphore> ackMutex; |
| final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap; |
| |
| class WorkerReceiver extends ZooKeeperThread { |
| |
| DatagramSocket mySocket; |
| Messenger myMsg; |
| |
| WorkerReceiver(DatagramSocket s, Messenger msg) { |
| super("WorkerReceiver-" + s.getRemoteSocketAddress()); |
| mySocket = s; |
| myMsg = msg; |
| } |
| |
| boolean saveChallenge(long tag, long challenge) { |
| Semaphore s = challengeMutex.get(tag); |
| if (s != null) { |
| synchronized (Messenger.this) { |
| challengeMap.put(tag, challenge); |
| challengeMutex.remove(tag); |
| } |
| |
| s.release(); |
| } else { |
| LOG.error("No challenge mutex object"); |
| } |
| |
| return true; |
| } |
| |
| public void run() { |
| byte[] responseBytes = new byte[48]; |
| ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes); |
| DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length); |
| while (true) { |
| // Sleeps on receive |
| try { |
| responseBuffer.clear(); |
| mySocket.receive(responsePacket); |
| } catch (IOException e) { |
| LOG.warn("Ignoring exception receiving", e); |
| } |
| // Receive new message |
| if (responsePacket.getLength() != responseBytes.length) { |
| LOG.warn("Got a short response: {} {}", responsePacket.getLength(), responsePacket.toString()); |
| continue; |
| } |
| responseBuffer.clear(); |
| int type = responseBuffer.getInt(); |
| if ((type > 3) || (type < 0)) { |
| LOG.warn("Got bad Msg type: {}", type); |
| continue; |
| } |
| long tag = responseBuffer.getLong(); |
| |
| QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; |
| switch (responseBuffer.getInt()) { |
| case 0: |
| ackstate = QuorumPeer.ServerState.LOOKING; |
| break; |
| case 1: |
| ackstate = QuorumPeer.ServerState.LEADING; |
| break; |
| case 2: |
| ackstate = QuorumPeer.ServerState.FOLLOWING; |
| break; |
| default: |
| LOG.warn("unknown type {}", responseBuffer.getInt()); |
| break; |
| } |
| |
| Vote current = self.getCurrentVote(); |
| |
| switch (type) { |
| case 0: |
| // Receive challenge request |
| ToSend c = new ToSend( |
| ToSend.mType.challenge, |
| tag, |
| current.getId(), |
| current.getZxid(), |
| logicalclock.get(), |
| self.getPeerState(), |
| (InetSocketAddress) responsePacket.getSocketAddress()); |
| sendqueue.offer(c); |
| break; |
| case 1: |
| // Receive challenge and store somewhere else |
| long challenge = responseBuffer.getLong(); |
| saveChallenge(tag, challenge); |
| |
| break; |
| case 2: |
| Notification n = new Notification(); |
| n.leader = responseBuffer.getLong(); |
| n.zxid = responseBuffer.getLong(); |
| n.epoch = responseBuffer.getLong(); |
| n.state = ackstate; |
| n.addr = (InetSocketAddress) responsePacket.getSocketAddress(); |
| |
| if ((myMsg.lastEpoch <= n.epoch) |
| && ((n.zxid > myMsg.lastProposedZxid) |
| || ((n.zxid == myMsg.lastProposedZxid) |
| && (n.leader > myMsg.lastProposedLeader)))) { |
| myMsg.lastProposedZxid = n.zxid; |
| myMsg.lastProposedLeader = n.leader; |
| myMsg.lastEpoch = n.epoch; |
| } |
| |
| long recChallenge; |
| InetSocketAddress addr = (InetSocketAddress) responsePacket.getSocketAddress(); |
| if (authEnabled) { |
| ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr); |
| if (tmpMap != null) { |
| if (tmpMap.get(tag) != null) { |
| recChallenge = responseBuffer.getLong(); |
| |
| if (tmpMap.get(tag) == recChallenge) { |
| recvqueue.offer(n); |
| |
| ToSend a = new ToSend( |
| ToSend.mType.ack, |
| tag, |
| current.getId(), |
| current.getZxid(), |
| logicalclock.get(), |
| self.getPeerState(), |
| addr); |
| |
| sendqueue.offer(a); |
| } else { |
| LOG.warn("Incorrect challenge: {}, {}", recChallenge, addrChallengeMap.toString()); |
| } |
| } else { |
| LOG.warn("No challenge for host: {} {}", addr, tag); |
| } |
| } |
| } else { |
| recvqueue.offer(n); |
| |
| ToSend a = new ToSend( |
| ToSend.mType.ack, |
| tag, |
| current.getId(), |
| current.getZxid(), |
| logicalclock.get(), |
| self.getPeerState(), |
| (InetSocketAddress) responsePacket.getSocketAddress()); |
| |
| sendqueue.offer(a); |
| } |
| break; |
| |
| // Upon reception of an ack message, remove it from the |
| // queue |
| case 3: |
| Semaphore s = ackMutex.get(tag); |
| |
| if (s != null) { |
| s.release(); |
| } else { |
| LOG.error("Empty ack semaphore"); |
| } |
| |
| ackset.add(tag); |
| |
| if (authEnabled) { |
| ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(responsePacket.getSocketAddress()); |
| if (tmpMap != null) { |
| tmpMap.remove(tag); |
| } else { |
| LOG.warn("No such address in the ensemble configuration {}", responsePacket.getSocketAddress()); |
| } |
| } |
| |
| if (ackstate != QuorumPeer.ServerState.LOOKING) { |
| Notification outofsync = new Notification(); |
| outofsync.leader = responseBuffer.getLong(); |
| outofsync.zxid = responseBuffer.getLong(); |
| outofsync.epoch = responseBuffer.getLong(); |
| outofsync.state = ackstate; |
| outofsync.addr = (InetSocketAddress) responsePacket.getSocketAddress(); |
| |
| recvqueue.offer(outofsync); |
| } |
| |
| break; |
| // Default case |
| default: |
| LOG.warn("Received message of incorrect type {}", type); |
| break; |
| } |
| } |
| } |
| |
| } |
| |
| class WorkerSender extends ZooKeeperThread { |
| |
| Random rand; |
| int maxAttempts; |
| int ackWait = finalizeWait; |
| |
| /* |
| * Receives a socket and max number of attempts as input |
| */ |
| |
| WorkerSender(int attempts) { |
| super("WorkerSender"); |
| maxAttempts = attempts; |
| rand = new Random(java.lang.Thread.currentThread().getId() + Time.currentElapsedTime()); |
| } |
| |
| long genChallenge() { |
| byte[] buf = new byte[8]; |
| |
| buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24); |
| buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16); |
| buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8); |
| buf[3] = (byte) ((challengeCounter & 0x000000ff)); |
| |
| challengeCounter++; |
| int secret = rand.nextInt(java.lang.Integer.MAX_VALUE); |
| |
| buf[4] = (byte) ((secret & 0xff000000) >>> 24); |
| buf[5] = (byte) ((secret & 0x00ff0000) >>> 16); |
| buf[6] = (byte) ((secret & 0x0000ff00) >>> 8); |
| buf[7] = (byte) ((secret & 0x000000ff)); |
| |
| return (((long) (buf[0] & 0xFF)) << 56) |
| + (((long) (buf[1] & 0xFF)) << 48) |
| + (((long) (buf[2] & 0xFF)) << 40) |
| + (((long) (buf[3] & 0xFF)) << 32) |
| + (((long) (buf[4] & 0xFF)) << 24) |
| + (((long) (buf[5] & 0xFF)) << 16) |
| + (((long) (buf[6] & 0xFF)) << 8) |
| + ((long) (buf[7] & 0xFF)); |
| } |
| |
| public void run() { |
| while (true) { |
| try { |
| ToSend m = sendqueue.take(); |
| process(m); |
| } catch (InterruptedException e) { |
| break; |
| } |
| |
| } |
| } |
| |
| @SuppressFBWarnings( |
| value = "RV_RETURN_VALUE_IGNORED", |
| justification = "tryAcquire result not chacked, but it is not an issue") |
| private void process(ToSend m) { |
| int attempts = 0; |
| byte[] zeroes; |
| byte[] requestBytes = new byte[48]; |
| DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length); |
| ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); |
| |
| switch (m.type) { |
| case 0: |
| /* |
| * Building challenge request packet to send |
| */ |
| requestBuffer.clear(); |
| requestBuffer.putInt(ToSend.mType.crequest.ordinal()); |
| requestBuffer.putLong(m.tag); |
| requestBuffer.putInt(m.state.ordinal()); |
| zeroes = new byte[32]; |
| requestBuffer.put(zeroes); |
| |
| requestPacket.setLength(48); |
| try { |
| requestPacket.setSocketAddress(m.addr); |
| } catch (IllegalArgumentException e) { |
| // Sun doesn't include the address that causes this |
| // exception to be thrown, so we wrap the exception |
| // in order to capture this critical detail. |
| throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() |
| + " with addr:" + m.addr, e); |
| } |
| |
| try { |
| if (challengeMap.get(m.tag) == null) { |
| mySocket.send(requestPacket); |
| } |
| } catch (IOException e) { |
| LOG.warn("Exception while sending challenge: ", e); |
| } |
| |
| break; |
| case 1: |
| /* |
| * Building challenge packet to send |
| */ |
| |
| long newChallenge; |
| ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr); |
| if (tmpMap != null) { |
| Long tmpLong = tmpMap.get(m.tag); |
| if (tmpLong != null) { |
| newChallenge = tmpLong; |
| } else { |
| newChallenge = genChallenge(); |
| } |
| |
| tmpMap.put(m.tag, newChallenge); |
| |
| requestBuffer.clear(); |
| requestBuffer.putInt(ToSend.mType.challenge.ordinal()); |
| requestBuffer.putLong(m.tag); |
| requestBuffer.putInt(m.state.ordinal()); |
| requestBuffer.putLong(newChallenge); |
| zeroes = new byte[24]; |
| requestBuffer.put(zeroes); |
| |
| requestPacket.setLength(48); |
| try { |
| requestPacket.setSocketAddress(m.addr); |
| } catch (IllegalArgumentException e) { |
| // Sun doesn't include the address that causes this |
| // exception to be thrown, so we wrap the exception |
| // in order to capture this critical detail. |
| throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() |
| + " with addr:" + m.addr, e); |
| } |
| |
| try { |
| mySocket.send(requestPacket); |
| } catch (IOException e) { |
| LOG.warn("Exception while sending challenge: ", e); |
| } |
| } else { |
| LOG.error("Address is not in the configuration: {}", m.addr); |
| } |
| |
| break; |
| case 2: |
| |
| /* |
| * Building notification packet to send |
| */ |
| |
| requestBuffer.clear(); |
| requestBuffer.putInt(m.type); |
| requestBuffer.putLong(m.tag); |
| requestBuffer.putInt(m.state.ordinal()); |
| requestBuffer.putLong(m.leader); |
| requestBuffer.putLong(m.zxid); |
| requestBuffer.putLong(m.epoch); |
| zeroes = new byte[8]; |
| requestBuffer.put(zeroes); |
| |
| requestPacket.setLength(48); |
| try { |
| requestPacket.setSocketAddress(m.addr); |
| } catch (IllegalArgumentException e) { |
| // Sun doesn't include the address that causes this |
| // exception to be thrown, so we wrap the exception |
| // in order to capture this critical detail. |
| throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() |
| + " with addr:" + m.addr, e); |
| } |
| |
| boolean myChallenge = false; |
| boolean myAck = false; |
| |
| while (attempts < maxAttempts) { |
| try { |
| /* |
| * Try to obtain a challenge only if does not have |
| * one yet |
| */ |
| |
| if (!myChallenge && authEnabled) { |
| ToSend crequest = new ToSend( |
| ToSend.mType.crequest, |
| m.tag, |
| m.leader, |
| m.zxid, |
| m.epoch, |
| QuorumPeer.ServerState.LOOKING, |
| m.addr); |
| sendqueue.offer(crequest); |
| |
| try { |
| double timeout = ackWait * java.lang.Math.pow(2, attempts); |
| |
| Semaphore s = new Semaphore(0); |
| synchronized (Messenger.this) { |
| challengeMutex.put(m.tag, s); |
| s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS); |
| myChallenge = challengeMap.containsKey(m.tag); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Challenge request exception: ", e); |
| } |
| } |
| |
| /* |
| * If don't have challenge yet, skip sending |
| * notification |
| */ |
| |
| if (authEnabled && !myChallenge) { |
| attempts++; |
| continue; |
| } |
| |
| if (authEnabled) { |
| requestBuffer.position(40); |
| Long tmpLong = challengeMap.get(m.tag); |
| if (tmpLong != null) { |
| requestBuffer.putLong(tmpLong); |
| } else { |
| LOG.warn("No challenge with tag: {}", m.tag); |
| } |
| } |
| mySocket.send(requestPacket); |
| try { |
| Semaphore s = new Semaphore(0); |
| double timeout = ackWait * java.lang.Math.pow(10, attempts); |
| ackMutex.put(m.tag, s); |
| s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| LOG.warn("Ack exception: ", e); |
| } |
| |
| if (ackset.remove(m.tag)) { |
| myAck = true; |
| } |
| |
| } catch (IOException e) { |
| LOG.warn("Sending exception: ", e); |
| /* |
| * Do nothing, just try again |
| */ |
| } |
| if (myAck) { |
| /* |
| * Received ack successfully, so return |
| */ |
| challengeMap.remove(m.tag); |
| |
| return; |
| } else { |
| attempts++; |
| } |
| } |
| /* |
| * Return message to queue for another attempt later if |
| * epoch hasn't changed. |
| */ |
| if (m.epoch == logicalclock.get()) { |
| challengeMap.remove(m.tag); |
| sendqueue.offer(m); |
| } |
| break; |
| case 3: |
| |
| requestBuffer.clear(); |
| requestBuffer.putInt(m.type); |
| requestBuffer.putLong(m.tag); |
| requestBuffer.putInt(m.state.ordinal()); |
| requestBuffer.putLong(m.leader); |
| requestBuffer.putLong(m.zxid); |
| requestBuffer.putLong(m.epoch); |
| |
| requestPacket.setLength(48); |
| try { |
| requestPacket.setSocketAddress(m.addr); |
| } catch (IllegalArgumentException e) { |
| // Sun doesn't include the address that causes this |
| // exception to be thrown, so we wrap the exception |
| // in order to capture this critical detail. |
| throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage() |
| + " with addr:" + m.addr, e); |
| } |
| |
| try { |
| mySocket.send(requestPacket); |
| } catch (IOException e) { |
| LOG.warn("Exception while sending ack: ", e); |
| } |
| break; |
| default: |
| LOG.warn("unknown type {}", m.type); |
| break; |
| } |
| } |
| |
| } |
| |
| Messenger(int threads, DatagramSocket s) { |
| mySocket = s; |
| ackset = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); |
| challengeMap = new ConcurrentHashMap<Long, Long>(); |
| challengeMutex = new ConcurrentHashMap<Long, Semaphore>(); |
| ackMutex = new ConcurrentHashMap<Long, Semaphore>(); |
| addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>(); |
| lastProposedLeader = 0; |
| lastProposedZxid = 0; |
| lastEpoch = 0; |
| |
| for (int i = 0; i < threads; ++i) { |
| Thread t = new Thread(new WorkerSender(3), "WorkerSender Thread: " + (i + 1)); |
| t.setDaemon(true); |
| t.start(); |
| } |
| |
| for (QuorumServer server : self.getVotingView().values()) { |
| InetAddress address = server.addr.getReachableOrOne().getAddress(); |
| InetSocketAddress saddr = new InetSocketAddress(address, port); |
| addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>()); |
| } |
| |
| Thread t = new Thread(new WorkerReceiver(s, this), "WorkerReceiver Thread"); |
| t.start(); |
| } |
| |
| } |
| |
| QuorumPeer self; |
| int port; |
| AtomicLong logicalclock = new AtomicLong(); /* Election instance */ |
| DatagramSocket mySocket; |
| long proposedLeader; |
| long proposedZxid; |
| |
| public AuthFastLeaderElection(QuorumPeer self, boolean auth) { |
| this.authEnabled = auth; |
| starter(self); |
| } |
| |
| public AuthFastLeaderElection(QuorumPeer self) { |
| starter(self); |
| } |
| |
| private void starter(QuorumPeer self) { |
| this.self = self; |
| port = self.getVotingView().get(self.getId()).electionAddr.getAllPorts().get(0); |
| proposedLeader = -1; |
| proposedZxid = -1; |
| |
| try { |
| mySocket = new DatagramSocket(port); |
| // mySocket.setSoTimeout(20000); |
| } catch (SocketException e1) { |
| e1.printStackTrace(); |
| throw new RuntimeException(); |
| } |
| sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size()); |
| recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView().size()); |
| new Messenger(self.getVotingView().size() * 2, mySocket); |
| } |
| |
| private void leaveInstance() { |
| logicalclock.incrementAndGet(); |
| } |
| |
| private void sendNotifications() { |
| for (QuorumServer server : self.getView().values()) { |
| |
| InetSocketAddress address = self.getView().get(server.id).electionAddr.getReachableOrOne(); |
| ToSend notmsg = new ToSend( |
| ToSend.mType.notification, |
| AuthFastLeaderElection.sequencer++, |
| proposedLeader, |
| proposedZxid, |
| logicalclock.get(), |
| QuorumPeer.ServerState.LOOKING, |
| address); |
| |
| sendqueue.offer(notmsg); |
| } |
| } |
| |
| private boolean totalOrderPredicate(long id, long zxid) { |
| return (zxid > proposedZxid) || ((zxid == proposedZxid) && (id > proposedLeader)); |
| |
| } |
| |
| private boolean termPredicate(Map<InetSocketAddress, Vote> votes, long l, long zxid) { |
| |
| Collection<Vote> votesCast = votes.values(); |
| int count = 0; |
| /* |
| * First make the views consistent. Sometimes peers will have different |
| * zxids for a server depending on timing. |
| */ |
| for (Vote v : votesCast) { |
| if ((v.getId() == l) && (v.getZxid() == zxid)) { |
| count++; |
| } |
| } |
| |
| return count > (self.getVotingView().size() / 2); |
| |
| } |
| |
| /** |
| * There is nothing to shutdown in this implementation of |
| * leader election, so we simply have an empty method. |
| */ |
| public void shutdown() { |
| } |
| |
| /** |
| * Invoked in QuorumPeer to find or elect a new leader. |
| * |
| * @throws InterruptedException |
| */ |
| public Vote lookForLeader() throws InterruptedException { |
| try { |
| self.jmxLeaderElectionBean = new LeaderElectionBean(); |
| MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean); |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| self.jmxLeaderElectionBean = null; |
| } |
| |
| try { |
| HashMap<InetSocketAddress, Vote> recvset = new HashMap<InetSocketAddress, Vote>(); |
| |
| HashMap<InetSocketAddress, Vote> outofelection = new HashMap<InetSocketAddress, Vote>(); |
| |
| logicalclock.incrementAndGet(); |
| |
| proposedLeader = self.getId(); |
| proposedZxid = self.getLastLoggedZxid(); |
| |
| LOG.info("Election tally"); |
| sendNotifications(); |
| |
| /* |
| * Loop in which we exchange notifications until we find a leader |
| */ |
| |
| while (self.getPeerState() == ServerState.LOOKING) { |
| /* |
| * Remove next notification from queue, times out after 2 times |
| * the termination time |
| */ |
| Notification n = recvqueue.poll(2 * finalizeWait, TimeUnit.MILLISECONDS); |
| |
| /* |
| * Sends more notifications if haven't received enough. |
| * Otherwise processes new notification. |
| */ |
| if (n == null) { |
| if (((!outofelection.isEmpty()) || (recvset.size() > 1))) { |
| sendNotifications(); |
| } |
| } else { |
| switch (n.state) { |
| case LOOKING: |
| if (n.epoch > logicalclock.get()) { |
| logicalclock.set(n.epoch); |
| recvset.clear(); |
| if (totalOrderPredicate(n.leader, n.zxid)) { |
| proposedLeader = n.leader; |
| proposedZxid = n.zxid; |
| } |
| sendNotifications(); |
| } else if (n.epoch < logicalclock.get()) { |
| break; |
| } else if (totalOrderPredicate(n.leader, n.zxid)) { |
| proposedLeader = n.leader; |
| proposedZxid = n.zxid; |
| |
| sendNotifications(); |
| } |
| |
| recvset.put(n.addr, new Vote(n.leader, n.zxid)); |
| |
| // If have received from all nodes, then terminate |
| if (self.getVotingView().size() == recvset.size()) { |
| self.setPeerState((proposedLeader == self.getId()) |
| ? ServerState.LEADING |
| : ServerState.FOLLOWING); |
| // if (self.state == ServerState.FOLLOWING) { |
| // Thread.sleep(100); |
| // } |
| leaveInstance(); |
| return new Vote(proposedLeader, proposedZxid); |
| |
| } else if (termPredicate(recvset, proposedLeader, proposedZxid)) { |
| // Otherwise, wait for a fixed amount of time |
| LOG.info("Passed predicate"); |
| Thread.sleep(finalizeWait); |
| |
| // Notification probe = recvqueue.peek(); |
| |
| // Verify if there is any change in the proposed leader |
| while ((!recvqueue.isEmpty()) |
| && !totalOrderPredicate(recvqueue.peek().leader, recvqueue.peek().zxid)) { |
| recvqueue.poll(); |
| } |
| if (recvqueue.isEmpty()) { |
| // LOG.warn("Proposed leader: " + |
| // proposedLeader); |
| self.setPeerState((proposedLeader == self.getId()) |
| ? ServerState.LEADING |
| : ServerState.FOLLOWING); |
| |
| leaveInstance(); |
| return new Vote(proposedLeader, proposedZxid); |
| } |
| } |
| break; |
| case LEADING: |
| outofelection.put(n.addr, new Vote(n.leader, n.zxid)); |
| |
| if (termPredicate(outofelection, n.leader, n.zxid)) { |
| |
| self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING); |
| |
| leaveInstance(); |
| return new Vote(n.leader, n.zxid); |
| } |
| break; |
| case FOLLOWING: |
| outofelection.put(n.addr, new Vote(n.leader, n.zxid)); |
| |
| if (termPredicate(outofelection, n.leader, n.zxid)) { |
| |
| self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING); |
| |
| leaveInstance(); |
| return new Vote(n.leader, n.zxid); |
| } |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| |
| return null; |
| } finally { |
| try { |
| if (self.jmxLeaderElectionBean != null) { |
| MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to unregister with JMX", e); |
| } |
| self.jmxLeaderElectionBean = null; |
| } |
| } |
| |
| } |