blob: cb0fec8d928851c05fbd37c4eb01959a264f965f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @deprecated This class has been deprecated as of release 3.4.0.
*/
@Deprecated
public class AuthFastLeaderElection implements Election {
private static final Logger LOG = LoggerFactory.getLogger(AuthFastLeaderElection.class);
/* Sequence numbers for messages */
static int sequencer = 0;
static int maxTag = 0;
/*
* Determine how much time a process has to wait once it believes that it
* has reached the end of leader election.
*/
static int finalizeWait = 100;
/*
* Challenge counter to avoid replay attacks
*/
static int challengeCounter = 0;
/*
* Flag to determine whether to authenticate or not
*/
private boolean authEnabled = false;
public static class Notification {
/*
* Proposed leader
*/ long leader;
/*
* zxid of the proposed leader
*/ long zxid;
/*
* Epoch
*/ long epoch;
/*
* current state of sender
*/ QuorumPeer.ServerState state;
/*
* Address of the sender
*/ InetSocketAddress addr;
}
/*
* Messages to send, both Notifications and Acks
*/
public static class ToSend {
enum mType {
crequest,
challenge,
notification,
ack
}
ToSend(mType type, long tag, long leader, long zxid, long epoch, ServerState state, InetSocketAddress addr) {
switch (type) {
case crequest:
this.type = 0;
this.tag = tag;
this.leader = leader;
this.zxid = zxid;
this.epoch = epoch;
this.state = state;
this.addr = addr;
break;
case challenge:
this.type = 1;
this.tag = tag;
this.leader = leader;
this.zxid = zxid;
this.epoch = epoch;
this.state = state;
this.addr = addr;
break;
case notification:
this.type = 2;
this.leader = leader;
this.zxid = zxid;
this.epoch = epoch;
this.state = QuorumPeer.ServerState.LOOKING;
this.tag = tag;
this.addr = addr;
break;
case ack:
this.type = 3;
this.tag = tag;
this.leader = leader;
this.zxid = zxid;
this.epoch = epoch;
this.state = state;
this.addr = addr;
break;
default:
break;
}
}
/*
* Message type: 0 notification, 1 acknowledgement
*/ int type;
/*
* Proposed leader in the case of notification
*/ long leader;
/*
* id contains the tag for acks, and zxid for notifications
*/ long zxid;
/*
* Epoch
*/ long epoch;
/*
* Current state;
*/ QuorumPeer.ServerState state;
/*
* Message tag
*/ long tag;
InetSocketAddress addr;
}
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
private class Messenger {
final DatagramSocket mySocket;
long lastProposedLeader;
long lastProposedZxid;
long lastEpoch;
final Set<Long> ackset;
final ConcurrentHashMap<Long, Long> challengeMap;
final ConcurrentHashMap<Long, Semaphore> challengeMutex;
final ConcurrentHashMap<Long, Semaphore> ackMutex;
final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap;
class WorkerReceiver extends ZooKeeperThread {
DatagramSocket mySocket;
Messenger myMsg;
WorkerReceiver(DatagramSocket s, Messenger msg) {
super("WorkerReceiver-" + s.getRemoteSocketAddress());
mySocket = s;
myMsg = msg;
}
boolean saveChallenge(long tag, long challenge) {
Semaphore s = challengeMutex.get(tag);
if (s != null) {
synchronized (Messenger.this) {
challengeMap.put(tag, challenge);
challengeMutex.remove(tag);
}
s.release();
} else {
LOG.error("No challenge mutex object");
}
return true;
}
public void run() {
byte[] responseBytes = new byte[48];
ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length);
while (true) {
// Sleeps on receive
try {
responseBuffer.clear();
mySocket.receive(responsePacket);
} catch (IOException e) {
LOG.warn("Ignoring exception receiving", e);
}
// Receive new message
if (responsePacket.getLength() != responseBytes.length) {
LOG.warn("Got a short response: {} {}", responsePacket.getLength(), responsePacket.toString());
continue;
}
responseBuffer.clear();
int type = responseBuffer.getInt();
if ((type > 3) || (type < 0)) {
LOG.warn("Got bad Msg type: {}", type);
continue;
}
long tag = responseBuffer.getLong();
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (responseBuffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 2:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
default:
LOG.warn("unknown type {}", responseBuffer.getInt());
break;
}
Vote current = self.getCurrentVote();
switch (type) {
case 0:
// Receive challenge request
ToSend c = new ToSend(
ToSend.mType.challenge,
tag,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
(InetSocketAddress) responsePacket.getSocketAddress());
sendqueue.offer(c);
break;
case 1:
// Receive challenge and store somewhere else
long challenge = responseBuffer.getLong();
saveChallenge(tag, challenge);
break;
case 2:
Notification n = new Notification();
n.leader = responseBuffer.getLong();
n.zxid = responseBuffer.getLong();
n.epoch = responseBuffer.getLong();
n.state = ackstate;
n.addr = (InetSocketAddress) responsePacket.getSocketAddress();
if ((myMsg.lastEpoch <= n.epoch)
&& ((n.zxid > myMsg.lastProposedZxid)
|| ((n.zxid == myMsg.lastProposedZxid)
&& (n.leader > myMsg.lastProposedLeader)))) {
myMsg.lastProposedZxid = n.zxid;
myMsg.lastProposedLeader = n.leader;
myMsg.lastEpoch = n.epoch;
}
long recChallenge;
InetSocketAddress addr = (InetSocketAddress) responsePacket.getSocketAddress();
if (authEnabled) {
ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr);
if (tmpMap != null) {
if (tmpMap.get(tag) != null) {
recChallenge = responseBuffer.getLong();
if (tmpMap.get(tag) == recChallenge) {
recvqueue.offer(n);
ToSend a = new ToSend(
ToSend.mType.ack,
tag,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
addr);
sendqueue.offer(a);
} else {
LOG.warn("Incorrect challenge: {}, {}", recChallenge, addrChallengeMap.toString());
}
} else {
LOG.warn("No challenge for host: {} {}", addr, tag);
}
}
} else {
recvqueue.offer(n);
ToSend a = new ToSend(
ToSend.mType.ack,
tag,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
(InetSocketAddress) responsePacket.getSocketAddress());
sendqueue.offer(a);
}
break;
// Upon reception of an ack message, remove it from the
// queue
case 3:
Semaphore s = ackMutex.get(tag);
if (s != null) {
s.release();
} else {
LOG.error("Empty ack semaphore");
}
ackset.add(tag);
if (authEnabled) {
ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(responsePacket.getSocketAddress());
if (tmpMap != null) {
tmpMap.remove(tag);
} else {
LOG.warn("No such address in the ensemble configuration {}", responsePacket.getSocketAddress());
}
}
if (ackstate != QuorumPeer.ServerState.LOOKING) {
Notification outofsync = new Notification();
outofsync.leader = responseBuffer.getLong();
outofsync.zxid = responseBuffer.getLong();
outofsync.epoch = responseBuffer.getLong();
outofsync.state = ackstate;
outofsync.addr = (InetSocketAddress) responsePacket.getSocketAddress();
recvqueue.offer(outofsync);
}
break;
// Default case
default:
LOG.warn("Received message of incorrect type {}", type);
break;
}
}
}
}
class WorkerSender extends ZooKeeperThread {
Random rand;
int maxAttempts;
int ackWait = finalizeWait;
/*
* Receives a socket and max number of attempts as input
*/
WorkerSender(int attempts) {
super("WorkerSender");
maxAttempts = attempts;
rand = new Random(java.lang.Thread.currentThread().getId() + Time.currentElapsedTime());
}
long genChallenge() {
byte[] buf = new byte[8];
buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24);
buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16);
buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8);
buf[3] = (byte) ((challengeCounter & 0x000000ff));
challengeCounter++;
int secret = rand.nextInt(java.lang.Integer.MAX_VALUE);
buf[4] = (byte) ((secret & 0xff000000) >>> 24);
buf[5] = (byte) ((secret & 0x00ff0000) >>> 16);
buf[6] = (byte) ((secret & 0x0000ff00) >>> 8);
buf[7] = (byte) ((secret & 0x000000ff));
return (((long) (buf[0] & 0xFF)) << 56)
+ (((long) (buf[1] & 0xFF)) << 48)
+ (((long) (buf[2] & 0xFF)) << 40)
+ (((long) (buf[3] & 0xFF)) << 32)
+ (((long) (buf[4] & 0xFF)) << 24)
+ (((long) (buf[5] & 0xFF)) << 16)
+ (((long) (buf[6] & 0xFF)) << 8)
+ ((long) (buf[7] & 0xFF));
}
public void run() {
while (true) {
try {
ToSend m = sendqueue.take();
process(m);
} catch (InterruptedException e) {
break;
}
}
}
@SuppressFBWarnings(
value = "RV_RETURN_VALUE_IGNORED",
justification = "tryAcquire result not chacked, but it is not an issue")
private void process(ToSend m) {
int attempts = 0;
byte[] zeroes;
byte[] requestBytes = new byte[48];
DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length);
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
switch (m.type) {
case 0:
/*
* Building challenge request packet to send
*/
requestBuffer.clear();
requestBuffer.putInt(ToSend.mType.crequest.ordinal());
requestBuffer.putLong(m.tag);
requestBuffer.putInt(m.state.ordinal());
zeroes = new byte[32];
requestBuffer.put(zeroes);
requestPacket.setLength(48);
try {
requestPacket.setSocketAddress(m.addr);
} catch (IllegalArgumentException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, so we wrap the exception
// in order to capture this critical detail.
throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
+ " with addr:" + m.addr, e);
}
try {
if (challengeMap.get(m.tag) == null) {
mySocket.send(requestPacket);
}
} catch (IOException e) {
LOG.warn("Exception while sending challenge: ", e);
}
break;
case 1:
/*
* Building challenge packet to send
*/
long newChallenge;
ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr);
if (tmpMap != null) {
Long tmpLong = tmpMap.get(m.tag);
if (tmpLong != null) {
newChallenge = tmpLong;
} else {
newChallenge = genChallenge();
}
tmpMap.put(m.tag, newChallenge);
requestBuffer.clear();
requestBuffer.putInt(ToSend.mType.challenge.ordinal());
requestBuffer.putLong(m.tag);
requestBuffer.putInt(m.state.ordinal());
requestBuffer.putLong(newChallenge);
zeroes = new byte[24];
requestBuffer.put(zeroes);
requestPacket.setLength(48);
try {
requestPacket.setSocketAddress(m.addr);
} catch (IllegalArgumentException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, so we wrap the exception
// in order to capture this critical detail.
throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
+ " with addr:" + m.addr, e);
}
try {
mySocket.send(requestPacket);
} catch (IOException e) {
LOG.warn("Exception while sending challenge: ", e);
}
} else {
LOG.error("Address is not in the configuration: {}", m.addr);
}
break;
case 2:
/*
* Building notification packet to send
*/
requestBuffer.clear();
requestBuffer.putInt(m.type);
requestBuffer.putLong(m.tag);
requestBuffer.putInt(m.state.ordinal());
requestBuffer.putLong(m.leader);
requestBuffer.putLong(m.zxid);
requestBuffer.putLong(m.epoch);
zeroes = new byte[8];
requestBuffer.put(zeroes);
requestPacket.setLength(48);
try {
requestPacket.setSocketAddress(m.addr);
} catch (IllegalArgumentException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, so we wrap the exception
// in order to capture this critical detail.
throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
+ " with addr:" + m.addr, e);
}
boolean myChallenge = false;
boolean myAck = false;
while (attempts < maxAttempts) {
try {
/*
* Try to obtain a challenge only if does not have
* one yet
*/
if (!myChallenge && authEnabled) {
ToSend crequest = new ToSend(
ToSend.mType.crequest,
m.tag,
m.leader,
m.zxid,
m.epoch,
QuorumPeer.ServerState.LOOKING,
m.addr);
sendqueue.offer(crequest);
try {
double timeout = ackWait * java.lang.Math.pow(2, attempts);
Semaphore s = new Semaphore(0);
synchronized (Messenger.this) {
challengeMutex.put(m.tag, s);
s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
myChallenge = challengeMap.containsKey(m.tag);
}
} catch (InterruptedException e) {
LOG.warn("Challenge request exception: ", e);
}
}
/*
* If don't have challenge yet, skip sending
* notification
*/
if (authEnabled && !myChallenge) {
attempts++;
continue;
}
if (authEnabled) {
requestBuffer.position(40);
Long tmpLong = challengeMap.get(m.tag);
if (tmpLong != null) {
requestBuffer.putLong(tmpLong);
} else {
LOG.warn("No challenge with tag: {}", m.tag);
}
}
mySocket.send(requestPacket);
try {
Semaphore s = new Semaphore(0);
double timeout = ackWait * java.lang.Math.pow(10, attempts);
ackMutex.put(m.tag, s);
s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Ack exception: ", e);
}
if (ackset.remove(m.tag)) {
myAck = true;
}
} catch (IOException e) {
LOG.warn("Sending exception: ", e);
/*
* Do nothing, just try again
*/
}
if (myAck) {
/*
* Received ack successfully, so return
*/
challengeMap.remove(m.tag);
return;
} else {
attempts++;
}
}
/*
* Return message to queue for another attempt later if
* epoch hasn't changed.
*/
if (m.epoch == logicalclock.get()) {
challengeMap.remove(m.tag);
sendqueue.offer(m);
}
break;
case 3:
requestBuffer.clear();
requestBuffer.putInt(m.type);
requestBuffer.putLong(m.tag);
requestBuffer.putInt(m.state.ordinal());
requestBuffer.putLong(m.leader);
requestBuffer.putLong(m.zxid);
requestBuffer.putLong(m.epoch);
requestPacket.setLength(48);
try {
requestPacket.setSocketAddress(m.addr);
} catch (IllegalArgumentException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, so we wrap the exception
// in order to capture this critical detail.
throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
+ " with addr:" + m.addr, e);
}
try {
mySocket.send(requestPacket);
} catch (IOException e) {
LOG.warn("Exception while sending ack: ", e);
}
break;
default:
LOG.warn("unknown type {}", m.type);
break;
}
}
}
Messenger(int threads, DatagramSocket s) {
mySocket = s;
ackset = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
challengeMap = new ConcurrentHashMap<Long, Long>();
challengeMutex = new ConcurrentHashMap<Long, Semaphore>();
ackMutex = new ConcurrentHashMap<Long, Semaphore>();
addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>();
lastProposedLeader = 0;
lastProposedZxid = 0;
lastEpoch = 0;
for (int i = 0; i < threads; ++i) {
Thread t = new Thread(new WorkerSender(3), "WorkerSender Thread: " + (i + 1));
t.setDaemon(true);
t.start();
}
for (QuorumServer server : self.getVotingView().values()) {
InetAddress address = server.addr.getReachableOrOne().getAddress();
InetSocketAddress saddr = new InetSocketAddress(address, port);
addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
}
Thread t = new Thread(new WorkerReceiver(s, this), "WorkerReceiver Thread");
t.start();
}
}
QuorumPeer self;
int port;
AtomicLong logicalclock = new AtomicLong(); /* Election instance */
DatagramSocket mySocket;
long proposedLeader;
long proposedZxid;
public AuthFastLeaderElection(QuorumPeer self, boolean auth) {
this.authEnabled = auth;
starter(self);
}
public AuthFastLeaderElection(QuorumPeer self) {
starter(self);
}
private void starter(QuorumPeer self) {
this.self = self;
port = self.getVotingView().get(self.getId()).electionAddr.getAllPorts().get(0);
proposedLeader = -1;
proposedZxid = -1;
try {
mySocket = new DatagramSocket(port);
// mySocket.setSoTimeout(20000);
} catch (SocketException e1) {
e1.printStackTrace();
throw new RuntimeException();
}
sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size());
recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView().size());
new Messenger(self.getVotingView().size() * 2, mySocket);
}
private void leaveInstance() {
logicalclock.incrementAndGet();
}
private void sendNotifications() {
for (QuorumServer server : self.getView().values()) {
InetSocketAddress address = self.getView().get(server.id).electionAddr.getReachableOrOne();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
AuthFastLeaderElection.sequencer++,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
address);
sendqueue.offer(notmsg);
}
}
private boolean totalOrderPredicate(long id, long zxid) {
return (zxid > proposedZxid) || ((zxid == proposedZxid) && (id > proposedLeader));
}
private boolean termPredicate(Map<InetSocketAddress, Vote> votes, long l, long zxid) {
Collection<Vote> votesCast = votes.values();
int count = 0;
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Vote v : votesCast) {
if ((v.getId() == l) && (v.getZxid() == zxid)) {
count++;
}
}
return count > (self.getVotingView().size() / 2);
}
/**
* There is nothing to shutdown in this implementation of
* leader election, so we simply have an empty method.
*/
public void shutdown() {
}
/**
* Invoked in QuorumPeer to find or elect a new leader.
*
* @throws InterruptedException
*/
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
try {
HashMap<InetSocketAddress, Vote> recvset = new HashMap<InetSocketAddress, Vote>();
HashMap<InetSocketAddress, Vote> outofelection = new HashMap<InetSocketAddress, Vote>();
logicalclock.incrementAndGet();
proposedLeader = self.getId();
proposedZxid = self.getLastLoggedZxid();
LOG.info("Election tally");
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while (self.getPeerState() == ServerState.LOOKING) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(2 * finalizeWait, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
if (((!outofelection.isEmpty()) || (recvset.size() > 1))) {
sendNotifications();
}
} else {
switch (n.state) {
case LOOKING:
if (n.epoch > logicalclock.get()) {
logicalclock.set(n.epoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid)) {
proposedLeader = n.leader;
proposedZxid = n.zxid;
}
sendNotifications();
} else if (n.epoch < logicalclock.get()) {
break;
} else if (totalOrderPredicate(n.leader, n.zxid)) {
proposedLeader = n.leader;
proposedZxid = n.zxid;
sendNotifications();
}
recvset.put(n.addr, new Vote(n.leader, n.zxid));
// If have received from all nodes, then terminate
if (self.getVotingView().size() == recvset.size()) {
self.setPeerState((proposedLeader == self.getId())
? ServerState.LEADING
: ServerState.FOLLOWING);
// if (self.state == ServerState.FOLLOWING) {
// Thread.sleep(100);
// }
leaveInstance();
return new Vote(proposedLeader, proposedZxid);
} else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
// Otherwise, wait for a fixed amount of time
LOG.info("Passed predicate");
Thread.sleep(finalizeWait);
// Notification probe = recvqueue.peek();
// Verify if there is any change in the proposed leader
while ((!recvqueue.isEmpty())
&& !totalOrderPredicate(recvqueue.peek().leader, recvqueue.peek().zxid)) {
recvqueue.poll();
}
if (recvqueue.isEmpty()) {
// LOG.warn("Proposed leader: " +
// proposedLeader);
self.setPeerState((proposedLeader == self.getId())
? ServerState.LEADING
: ServerState.FOLLOWING);
leaveInstance();
return new Vote(proposedLeader, proposedZxid);
}
}
break;
case LEADING:
outofelection.put(n.addr, new Vote(n.leader, n.zxid));
if (termPredicate(outofelection, n.leader, n.zxid)) {
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);
leaveInstance();
return new Vote(n.leader, n.zxid);
}
break;
case FOLLOWING:
outofelection.put(n.addr, new Vote(n.leader, n.zxid));
if (termPredicate(outofelection, n.leader, n.zxid)) {
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);
leaveInstance();
return new Vote(n.leader, n.zxid);
}
break;
default:
break;
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
}
}
}