blob: 72d06ecbd053d940ec7cb3454cddf97f4589c2c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.security.sasl.SaslException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.apache.zookeeper.common.AtomicFileWritingIdiom;
import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.admin.AdminServer;
import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
import org.apache.zookeeper.server.admin.AdminServerFactory;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class manages the quorum protocol. There are three states this server
* can be in:
* <ol>
* <li>Leader election - each server will elect a leader (proposing itself as a
* leader initially).</li>
* <li>Follower - the server will synchronize with the leader and replicate any
* transactions.</li>
* <li>Leader - the server will process requests and forward them to followers.
* A majority of followers must log the request before it can be accepted.
* </ol>
*
* This class will setup a datagram socket that will always respond with its
* view of the current leader. The response will take the form of:
*
* <pre>
* int xid;
*
* long myid;
*
* long leader_id;
*
* long leader_zxid;
* </pre>
*
* The request for the current leader will consist solely of an xid: int xid;
*/
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class);
private QuorumBean jmxQuorumBean;
LocalPeerBean jmxLocalPeerBean;
private Map<Long, RemotePeerBean> jmxRemotePeerBean;
LeaderElectionBean jmxLeaderElectionBean;
// The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
// of updates; see the implementation comment at setLastSeenQuorumVerifier().
private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
QuorumAuthServer authServer;
QuorumAuthLearner authLearner;
/**
* ZKDatabase is a top level member of quorumpeer
* which will be used in all the zookeeperservers
* instantiated later. Also, it is created once on
* bootup and only thrown away in case of a truncate
* message from the leader
*/
private ZKDatabase zkDb;
private JvmPauseMonitor jvmPauseMonitor;
public static final class AddressTuple {
public final MultipleAddresses quorumAddr;
public final MultipleAddresses electionAddr;
public final InetSocketAddress clientAddr;
public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
this.quorumAddr = quorumAddr;
this.electionAddr = electionAddr;
this.clientAddr = clientAddr;
}
}
private int observerMasterPort;
public int getObserverMasterPort() {
return observerMasterPort;
}
public void setObserverMasterPort(int observerMasterPort) {
this.observerMasterPort = observerMasterPort;
}
public static class QuorumServer {
public MultipleAddresses addr = new MultipleAddresses();
public MultipleAddresses electionAddr = new MultipleAddresses();
public InetSocketAddress clientAddr = null;
public long id;
public String hostname;
public LearnerType type = LearnerType.PARTICIPANT;
public boolean isClientAddrFromStatic = false;
private List<InetSocketAddress> myAddrs;
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT);
}
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) {
this(id, addr, electionAddr, null, LearnerType.PARTICIPANT);
}
// VisibleForTesting
public QuorumServer(long id, InetSocketAddress addr) {
this(id, addr, null, null, LearnerType.PARTICIPANT);
}
public long getId() {
return id;
}
/**
* Performs a DNS lookup for server address and election address.
*
* If the DNS lookup fails, this.addr and electionAddr remain
* unmodified.
*/
public void recreateSocketAddresses() {
if (this.addr.isEmpty()) {
LOG.warn("Server address has not been initialized");
return;
}
if (this.electionAddr.isEmpty()) {
LOG.warn("Election address has not been initialized");
return;
}
this.addr.recreateSocketAddresses();
this.electionAddr.recreateSocketAddresses();
}
private LearnerType getType(String s) throws ConfigException {
switch (s.trim().toLowerCase()) {
case "observer":
return LearnerType.OBSERVER;
case "participant":
return LearnerType.PARTICIPANT;
default:
throw new ConfigException("Unrecognised peertype: " + s);
}
}
private static final String wrongFormat =
" does not have the form server_config or server_config;client_config"
+ " where server_config is the pipe separated list of host:port:port or host:port:port:type"
+ " and client_config is port or host:port";
public QuorumServer(long sid, String addressStr) throws ConfigException {
this.id = sid;
LearnerType newType = null;
String[] serverClientParts = addressStr.split(";");
String[] serverAddresses = serverClientParts[0].split("\\|");
if (serverClientParts.length == 2) {
String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]);
if (clientParts.length > 2) {
throw new ConfigException(addressStr + wrongFormat);
}
// is client_config a host:port or just a port
hostname = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
try {
clientAddr = new InetSocketAddress(hostname, Integer.parseInt(clientParts[clientParts.length - 1]));
} catch (NumberFormatException e) {
throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
}
}
for (String serverAddress : serverAddresses) {
String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
if ((serverClientParts.length > 2) || (serverParts.length < 3)
|| (serverParts.length > 4)) {
throw new ConfigException(addressStr + wrongFormat);
}
// server_config should be either host:port:port or host:port:port:type
InetSocketAddress tempAddress;
InetSocketAddress tempElectionAddress;
try {
tempAddress = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[1]));
addr.addAddress(tempAddress);
} catch (NumberFormatException e) {
throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]);
}
try {
tempElectionAddress = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[2]));
electionAddr.addAddress(tempElectionAddress);
} catch (NumberFormatException e) {
throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]);
}
if (tempAddress.getPort() == tempElectionAddress.getPort()) {
throw new ConfigException("Client and election port must be different! Please update the "
+ "configuration file on server." + sid);
}
if (serverParts.length == 4) {
LearnerType tempType = getType(serverParts[3]);
if (newType == null) {
newType = tempType;
}
if (newType != tempType) {
throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType);
}
}
this.hostname = serverParts[0];
}
if (newType != null) {
type = newType;
}
setMyAddrs();
}
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
this(id, addr, electionAddr, null, type);
}
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
this.id = id;
if (addr != null) {
this.addr.addAddress(addr);
}
if (electionAddr != null) {
this.electionAddr.addAddress(electionAddr);
}
this.type = type;
this.clientAddr = clientAddr;
setMyAddrs();
}
private void setMyAddrs() {
this.myAddrs = new ArrayList<>();
this.myAddrs.addAll(this.addr.getAllAddresses());
this.myAddrs.add(this.clientAddr);
this.myAddrs.addAll(this.electionAddr.getAllAddresses());
this.myAddrs = excludedSpecialAddresses(this.myAddrs);
}
public static String delimitedHostString(InetSocketAddress addr) {
String host = addr.getHostString();
if (host.contains(":")) {
return "[" + host + "]";
} else {
return host;
}
}
public String toString() {
StringWriter sw = new StringWriter();
List<InetSocketAddress> addrList = new LinkedList<>(addr.getAllAddresses());
List<InetSocketAddress> electionAddrList = new LinkedList<>(electionAddr.getAllAddresses());
if (addrList.size() > 0 && electionAddrList.size() > 0) {
addrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d",
delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort()))
.collect(Collectors.joining("|")));
}
if (type == LearnerType.OBSERVER) {
sw.append(":observer");
} else if (type == LearnerType.PARTICIPANT) {
sw.append(":participant");
}
if (clientAddr != null && !isClientAddrFromStatic) {
sw.append(";");
sw.append(delimitedHostString(clientAddr));
sw.append(":");
sw.append(String.valueOf(clientAddr.getPort()));
}
return sw.toString();
}
public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
return (addr1 != null || addr2 == null)
&& (addr1 == null || addr2 != null)
&& (addr1 == null || addr2 == null || addr1.equals(addr2));
}
public boolean equals(Object o) {
if (!(o instanceof QuorumServer)) {
return false;
}
QuorumServer qs = (QuorumServer) o;
if ((qs.id != id) || (qs.type != type)) {
return false;
}
if (!addr.equals(qs.addr)) {
return false;
}
if (!electionAddr.equals(qs.electionAddr)) {
return false;
}
return checkAddressesEqual(clientAddr, qs.clientAddr);
}
public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException {
List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses());
otherAddrs.add(s.clientAddr);
otherAddrs.addAll(s.electionAddr.getAllAddresses());
otherAddrs = excludedSpecialAddresses(otherAddrs);
for (InetSocketAddress my : this.myAddrs) {
for (InetSocketAddress other : otherAddrs) {
if (my.equals(other)) {
String error = String.format("%s of server.%d conflicts %s of server.%d", my, this.id, other, s.id);
throw new BadArgumentsException(error);
}
}
}
}
private List<InetSocketAddress> excludedSpecialAddresses(List<InetSocketAddress> addrs) {
List<InetSocketAddress> included = new ArrayList<>();
for (InetSocketAddress addr : addrs) {
if (addr == null) {
continue;
}
InetAddress inetaddr = addr.getAddress();
if (inetaddr == null || inetaddr.isAnyLocalAddress() || // wildCard addresses (0.0.0.0 or [::])
inetaddr.isLoopbackAddress()) { // loopback address(localhost/127.0.0.1)
continue;
}
included.add(addr);
}
return included;
}
}
public enum ServerState {
LOOKING,
FOLLOWING,
LEADING,
OBSERVING
}
/**
* (Used for monitoring) shows the current phase of
* Zab protocol that peer is running.
*/
public enum ZabState {
ELECTION,
DISCOVERY,
SYNCHRONIZATION,
BROADCAST
}
/**
* (Used for monitoring) When peer is in synchronization phase, this shows
* which synchronization mechanism is being used
*/
public enum SyncMode {
NONE,
DIFF,
SNAP,
TRUNC
}
/*
* A peer can either be participating, which implies that it is willing to
* both vote in instances of consensus and to elect or become a Leader, or
* it may be observing in which case it isn't.
*
* We need this distinction to decide which ServerState to move to when
* conditions change (e.g. which state to become after LOOKING).
*/
public enum LearnerType {
PARTICIPANT,
OBSERVER
}
/*
* To enable observers to have no identifier, we need a generic identifier
* at least for QuorumCnxManager. We use the following constant to as the
* value of such a generic identifier.
*/
static final long OBSERVER_ID = Long.MAX_VALUE;
/*
* Record leader election time
*/
public long start_fle, end_fle; // fle = fast leader election
public static final String FLE_TIME_UNIT = "MS";
/*
* Default value of peer is participant
*/
private LearnerType learnerType = LearnerType.PARTICIPANT;
public LearnerType getLearnerType() {
return learnerType;
}
/**
* Sets the LearnerType
*/
public void setLearnerType(LearnerType p) {
learnerType = p;
}
protected synchronized void setConfigFileName(String s) {
configFilename = s;
}
private String configFilename = null;
public int getQuorumSize() {
return getVotingView().size();
}
public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) {
this.jvmPauseMonitor = jvmPauseMonitor;
}
/**
* QuorumVerifier implementation; default (majority).
*/
//last committed quorum verifier
private QuorumVerifier quorumVerifier;
//last proposed quorum verifier
private QuorumVerifier lastSeenQuorumVerifier = null;
// Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
final Object QV_LOCK = new Object();
/**
* My id
*/
private long myid;
/**
* get the id of this quorum peer.
*/
public long getId() {
return myid;
}
// VisibleForTesting
void setId(long id) {
this.myid = id;
}
private boolean sslQuorum;
private boolean shouldUsePortUnification;
public boolean isSslQuorum() {
return sslQuorum;
}
public boolean shouldUsePortUnification() {
return shouldUsePortUnification;
}
private final QuorumX509Util x509Util;
QuorumX509Util getX509Util() {
return x509Util;
}
/**
* This is who I think the leader currently is.
*/
private volatile Vote currentVote;
public synchronized Vote getCurrentVote() {
return currentVote;
}
public synchronized void setCurrentVote(Vote v) {
currentVote = v;
}
private volatile boolean running = true;
private String initialConfig;
/**
* The number of milliseconds of each tick
*/
protected int tickTime;
/**
* Whether learners in this quorum should create new sessions as local.
* False by default to preserve existing behavior.
*/
protected boolean localSessionsEnabled = false;
/**
* Whether learners in this quorum should upgrade local sessions to
* global. Only matters if local sessions are enabled.
*/
protected boolean localSessionsUpgradingEnabled = true;
/**
* Minimum number of milliseconds to allow for session timeout.
* A value of -1 indicates unset, use default.
*/
protected int minSessionTimeout = -1;
/**
* Maximum number of milliseconds to allow for session timeout.
* A value of -1 indicates unset, use default.
*/
protected int maxSessionTimeout = -1;
/**
* The ZooKeeper server's socket backlog length. The number of connections
* that will be queued to be read before new connections are dropped. A
* value of one indicates the default backlog will be used.
*/
protected int clientPortListenBacklog = -1;
/**
* The number of ticks that the initial synchronization phase can take
*/
protected volatile int initLimit;
/**
* The number of ticks that can pass between sending a request and getting
* an acknowledgment
*/
protected volatile int syncLimit;
/**
* The number of ticks that can pass before retrying to connect to learner master
*/
protected volatile int connectToLearnerMasterLimit;
/**
* Enables/Disables sync request processor. This option is enabled
* by default and is to be used with observers.
*/
protected boolean syncEnabled = true;
/**
* The current tick
*/
protected AtomicInteger tick = new AtomicInteger();
/**
* Whether or not to listen on all IPs for the two quorum ports
* (broadcast and fast leader election).
*/
protected boolean quorumListenOnAllIPs = false;
/**
* Keeps time taken for leader election in milliseconds. Sets the value to
* this variable only after the completion of leader election.
*/
private long electionTimeTaken = -1;
/**
* Enable/Disables quorum authentication using sasl. Defaulting to false.
*/
protected boolean quorumSaslEnableAuth;
/**
* If this is false, quorum peer server will accept another quorum peer client
* connection even if the authentication did not succeed. This can be used while
* upgrading ZooKeeper server. Defaulting to false (required).
*/
protected boolean quorumServerSaslAuthRequired;
/**
* If this is false, quorum peer learner will talk to quorum peer server
* without authentication. This can be used while upgrading ZooKeeper
* server. Defaulting to false (required).
*/
protected boolean quorumLearnerSaslAuthRequired;
/**
* Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
*/
protected String quorumServicePrincipal;
/**
* Quorum learner login context name in jaas-conf file to read the kerberos
* security details. Defaulting to 'QuorumLearner'.
*/
protected String quorumLearnerLoginContext;
/**
* Quorum server login context name in jaas-conf file to read the kerberos
* security details. Defaulting to 'QuorumServer'.
*/
protected String quorumServerLoginContext;
// TODO: need to tune the default value of thread size
private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
/**
* The maximum number of threads to allow in the connectionExecutors thread
* pool which will be used to initiate quorum server connections.
*/
protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;
public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs";
private static int quorumCnxnTimeoutMs;
static {
quorumCnxnTimeoutMs = Integer.getInteger(QUORUM_CNXN_TIMEOUT_MS, -1);
LOG.info("{}={}", QUORUM_CNXN_TIMEOUT_MS, quorumCnxnTimeoutMs);
}
/**
* @deprecated As of release 3.4.0, this class has been deprecated, since
* it is used with one of the udp-based versions of leader election, which
* we are also deprecating.
*
* This class simply responds to requests for the current leader of this
* node.
* <p>
* The request contains just an xid generated by the requestor.
* <p>
* The response has the xid, the id of this server, the id of the leader,
* and the zxid of the leader.
*
*
*/
@Deprecated
class ResponderThread extends ZooKeeperThread {
ResponderThread() {
super("ResponderThread");
}
volatile boolean running = true;
@Override
public void run() {
try {
byte[] b = new byte[36];
ByteBuffer responseBuffer = ByteBuffer.wrap(b);
DatagramPacket packet = new DatagramPacket(b, b.length);
while (running) {
udpSocket.receive(packet);
if (packet.getLength() != 4) {
LOG.warn("Got more than just an xid! Len = {}", packet.getLength());
} else {
responseBuffer.clear();
responseBuffer.getInt(); // Skip the xid
responseBuffer.putLong(myid);
Vote current = getCurrentVote();
switch (getPeerState()) {
case LOOKING:
responseBuffer.putLong(current.getId());
responseBuffer.putLong(current.getZxid());
break;
case LEADING:
responseBuffer.putLong(myid);
try {
long proposed;
synchronized (leader) {
proposed = leader.lastProposed;
}
responseBuffer.putLong(proposed);
} catch (NullPointerException npe) {
// This can happen in state transitions,
// just ignore the request
}
break;
case FOLLOWING:
responseBuffer.putLong(current.getId());
try {
responseBuffer.putLong(follower.getZxid());
} catch (NullPointerException npe) {
// This can happen in state transitions,
// just ignore the request
}
break;
case OBSERVING:
// Do nothing, Observers keep themselves to
// themselves.
break;
}
packet.setData(b);
udpSocket.send(packet);
}
packet.setLength(b.length);
}
} catch (RuntimeException e) {
LOG.warn("Unexpected runtime exception in ResponderThread", e);
} catch (IOException e) {
LOG.warn("Unexpected IO exception in ResponderThread", e);
} finally {
LOG.warn("QuorumPeer responder thread exited");
}
}
}
private ServerState state = ServerState.LOOKING;
private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
private AtomicReference<String> leaderAddress = new AtomicReference<String>("");
private AtomicLong leaderId = new AtomicLong(-1);
private boolean reconfigFlag = false; // indicates that a reconfig just committed
public synchronized void setPeerState(ServerState newState) {
state = newState;
if (newState == ServerState.LOOKING) {
setLeaderAddressAndId(null, -1);
setZabState(ZabState.ELECTION);
} else {
LOG.info("Peer state changed: {}", getDetailedPeerState());
}
}
public void setZabState(ZabState zabState) {
this.zabState.set(zabState);
LOG.info("Peer state changed: {}", getDetailedPeerState());
}
public void setSyncMode(SyncMode syncMode) {
this.syncMode.set(syncMode);
LOG.info("Peer state changed: {}", getDetailedPeerState());
}
public ZabState getZabState() {
return zabState.get();
}
public SyncMode getSyncMode() {
return syncMode.get();
}
public void setLeaderAddressAndId(MultipleAddresses addr, long newId) {
if (addr != null) {
leaderAddress.set(String.join("|", addr.getAllHostStrings()));
} else {
leaderAddress.set(null);
}
leaderId.set(newId);
}
public String getLeaderAddress() {
return leaderAddress.get();
}
public long getLeaderId() {
return leaderId.get();
}
public String getDetailedPeerState() {
final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
final ZabState zabState = getZabState();
if (!ZabState.ELECTION.equals(zabState)) {
sb.append(" - ").append(zabState.toString().toLowerCase());
}
final SyncMode syncMode = getSyncMode();
if (!SyncMode.NONE.equals(syncMode)) {
sb.append(" - ").append(syncMode.toString().toLowerCase());
}
return sb.toString();
}
public synchronized void reconfigFlagSet() {
reconfigFlag = true;
}
public synchronized void reconfigFlagClear() {
reconfigFlag = false;
}
public synchronized boolean isReconfigStateChange() {
return reconfigFlag;
}
public synchronized ServerState getPeerState() {
return state;
}
DatagramSocket udpSocket;
private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
/**
* Resolves hostname for a given server ID.
*
* This method resolves hostname for a given server ID in both quorumVerifer
* and lastSeenQuorumVerifier. If the server ID matches the local server ID,
* it also updates myAddrs.
*/
public void recreateSocketAddresses(long id) {
QuorumVerifier qv = getQuorumVerifier();
if (qv != null) {
QuorumServer qs = qv.getAllMembers().get(id);
if (qs != null) {
qs.recreateSocketAddresses();
if (id == getId()) {
setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
}
}
}
qv = getLastSeenQuorumVerifier();
if (qv != null) {
QuorumServer qs = qv.getAllMembers().get(id);
if (qs != null) {
qs.recreateSocketAddresses();
}
}
}
private AddressTuple getAddrs() {
AddressTuple addrs = myAddrs.get();
if (addrs != null) {
return addrs;
}
try {
synchronized (QV_LOCK) {
addrs = myAddrs.get();
while (addrs == null) {
QV_LOCK.wait();
addrs = myAddrs.get();
}
return addrs;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
public MultipleAddresses getQuorumAddress() {
return getAddrs().quorumAddr;
}
public MultipleAddresses getElectionAddress() {
return getAddrs().electionAddr;
}
public InetSocketAddress getClientAddress() {
final AddressTuple addrs = myAddrs.get();
return (addrs == null) ? null : addrs.clientAddr;
}
private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
synchronized (QV_LOCK) {
myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
QV_LOCK.notifyAll();
}
}
private int electionType;
Election electionAlg;
ServerCnxnFactory cnxnFactory;
ServerCnxnFactory secureCnxnFactory;
private FileTxnSnapLog logFactory = null;
private final QuorumStats quorumStats;
AdminServer adminServer;
public static QuorumPeer testingQuorumPeer() throws SaslException {
return new QuorumPeer();
}
public QuorumPeer() throws SaslException {
super("QuorumPeer");
quorumStats = new QuorumStats(this);
jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
adminServer = AdminServerFactory.createAdminServer();
x509Util = createX509Util();
initialize();
}
// VisibleForTesting
QuorumX509Util createX509Util() {
return new QuorumX509Util();
}
/**
* For backward compatibility purposes, we instantiate QuorumMaj by default.
*/
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, ServerCnxnFactory cnxnFactory) throws IOException {
this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory, new QuorumMaj(quorumPeers));
}
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
this();
this.cnxnFactory = cnxnFactory;
this.electionType = electionType;
this.myid = myid;
this.tickTime = tickTime;
this.initLimit = initLimit;
this.syncLimit = syncLimit;
this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
this.quorumListenOnAllIPs = quorumListenOnAllIPs;
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
this.zkDb = new ZKDatabase(this.logFactory);
if (quorumConfig == null) {
quorumConfig = new QuorumMaj(quorumPeers);
}
setQuorumVerifier(quorumConfig, false);
adminServer = AdminServerFactory.createAdminServer();
}
public void initialize() throws SaslException {
// init quorum auth server & learner
if (isQuorumSaslAuthEnabled()) {
Set<String> authzHosts = new HashSet<String>();
for (QuorumServer qs : getView().values()) {
authzHosts.add(qs.hostname);
}
authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts);
authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext);
} else {
authServer = new NullQuorumAuthServer();
authLearner = new NullQuorumAuthLearner();
}
}
QuorumStats quorumStats() {
return quorumStats;
}
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
private void loadDataBase() {
try {
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
CURRENT_EPOCH_FILENAME,
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, "
+ ZxidUtils.zxidToString(currentEpoch)
+ ", is older than the last zxid, "
+ lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
ACCEPTED_EPOCH_FILENAME,
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, "
+ ZxidUtils.zxidToString(acceptedEpoch)
+ " is less than the current epoch, "
+ ZxidUtils.zxidToString(currentEpoch));
}
} catch (IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
ResponderThread responder;
public synchronized void stopLeaderElection() {
responder.running = false;
responder.interrupt();
}
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
this.electionAlg = createElectionAlgorithm(electionType);
}
private void startJvmPauseMonitor() {
if (this.jvmPauseMonitor != null) {
this.jvmPauseMonitor.serviceStart();
}
}
/**
* Count the number of nodes in the map that could be followers.
* @param peers
* @return The number of followers in the map
*/
protected static int countParticipants(Map<Long, QuorumServer> peers) {
int count = 0;
for (QuorumServer q : peers.values()) {
if (q.type == LearnerType.PARTICIPANT) {
count++;
}
}
return count;
}
/**
* This constructor is only used by the existing unit test code.
* It defaults to FileLogProvider persistence provider.
*/
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException {
this(
quorumPeers,
snapDir,
logDir,
electionAlg,
myid,
tickTime,
initLimit,
syncLimit,
connectToLearnerMasterLimit,
false,
ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
new QuorumMaj(quorumPeers));
}
/**
* This constructor is only used by the existing unit test code.
* It defaults to FileLogProvider persistence provider.
*/
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, QuorumVerifier quorumConfig) throws IOException {
this(
quorumPeers,
snapDir,
logDir,
electionAlg,
myid,
tickTime,
initLimit,
syncLimit,
connectToLearnerMasterLimit,
false,
ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
quorumConfig);
}
private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException {
QuorumServer quorumServer = quorumPeers.get(myid);
if (null == quorumServer) {
throw new IOException("No QuorumServer correspoding to myid " + myid);
}
if (null == quorumServer.clientAddr) {
return new InetSocketAddress(clientPort);
}
if (quorumServer.clientAddr.getPort() != clientPort) {
throw new IOException("QuorumServer port "
+ quorumServer.clientAddr.getPort()
+ " does not match with given port "
+ clientPort);
}
return quorumServer.clientAddr;
}
/**
* returns the highest zxid that this host has seen
*
* @return the highest zxid for this host
*/
public long getLastLoggedZxid() {
if (!zkDb.isInitialized()) {
loadDataBase();
}
return zkDb.getDataTreeLastProcessedZxid();
}
public Follower follower;
public Leader leader;
public Observer observer;
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
@SuppressWarnings("deprecation")
protected Election makeLEStrategy() {
LOG.debug("Initializing leader election protocol...");
return electionAlg;
}
protected synchronized void setLeader(Leader newLeader) {
leader = newLeader;
}
protected synchronized void setFollower(Follower newFollower) {
follower = newFollower;
}
protected synchronized void setObserver(Observer newObserver) {
observer = newObserver;
}
public synchronized ZooKeeperServer getActiveServer() {
if (leader != null) {
return leader.zk;
} else if (follower != null) {
return follower.zk;
} else if (observer != null) {
return observer.zk;
}
return null;
}
boolean shuttingDownLE = false;
@Override
public void run() {
updateThreadName();
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for (QuorumServer s : getView().values()) {
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);
for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
jmxRemotePeerBean = null;
}
}
private synchronized void updateServerState() {
if (!reconfigFlag) {
setPeerState(ServerState.LOOKING);
LOG.warn("PeerState set to LOOKING");
return;
}
if (getId() == getCurrentVote().getId()) {
setPeerState(ServerState.LEADING);
LOG.debug("PeerState set to LEADING");
} else if (getLearnerType() == LearnerType.PARTICIPANT) {
setPeerState(ServerState.FOLLOWING);
LOG.debug("PeerState set to FOLLOWING");
} else if (getLearnerType() == LearnerType.OBSERVER) {
setPeerState(ServerState.OBSERVING);
LOG.debug("PeerState set to OBSERVER");
} else { // currently shouldn't happen since there are only 2 learner types
setPeerState(ServerState.LOOKING);
LOG.debug("Should not be here");
}
reconfigFlag = false;
}
public void shutdown() {
running = false;
x509Util.close();
if (leader != null) {
leader.shutdown("quorum Peer shutdown");
}
if (follower != null) {
follower.shutdown();
}
shutdownServerCnxnFactory();
if (udpSocket != null) {
udpSocket.close();
}
if (jvmPauseMonitor != null) {
jvmPauseMonitor.serviceStop();
}
try {
adminServer.shutdown();
} catch (AdminServerException e) {
LOG.warn("Problem stopping AdminServer", e);
}
if (getElectionAlg() != null) {
this.interrupt();
getElectionAlg().shutdown();
}
try {
zkDb.close();
} catch (IOException ie) {
LOG.warn("Error closing logs ", ie);
}
}
/**
* A 'view' is a node's current opinion of the membership of the entire
* ensemble.
*/
public Map<Long, QuorumPeer.QuorumServer> getView() {
return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers());
}
/**
* Observers are not contained in this view, only nodes with
* PeerType=PARTICIPANT.
*/
public Map<Long, QuorumPeer.QuorumServer> getVotingView() {
return getQuorumVerifier().getVotingMembers();
}
/**
* Returns only observers, no followers.
*/
public Map<Long, QuorumPeer.QuorumServer> getObservingView() {
return getQuorumVerifier().getObservingMembers();
}
public synchronized Set<Long> getCurrentAndNextConfigVoters() {
Set<Long> voterIds = new HashSet<Long>(getQuorumVerifier().getVotingMembers().keySet());
if (getLastSeenQuorumVerifier() != null) {
voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers().keySet());
}
return voterIds;
}
/**
* Check if a node is in the current view. With static membership, the
* result of this check will never change; only when dynamic membership
* is introduced will this be more useful.
*/
public boolean viewContains(Long sid) {
return this.getView().containsKey(sid);
}
/**
* Only used by QuorumStats at the moment
*/
public String[] getQuorumPeers() {
List<String> l = new ArrayList<String>();
synchronized (this) {
if (leader != null) {
for (LearnerHandler fh : leader.getLearners()) {
if (fh.getSocket() != null) {
String s = formatInetAddr((InetSocketAddress) fh.getSocket().getRemoteSocketAddress());
if (leader.isLearnerSynced(fh)) {
s += "*";
}
l.add(s);
}
}
} else if (follower != null) {
l.add(formatInetAddr((InetSocketAddress) follower.sock.getRemoteSocketAddress()));
}
}
return l.toArray(new String[0]);
}
public String getServerState() {
switch (getPeerState()) {
case LOOKING:
return QuorumStats.Provider.LOOKING_STATE;
case LEADING:
return QuorumStats.Provider.LEADING_STATE;
case FOLLOWING:
return QuorumStats.Provider.FOLLOWING_STATE;
case OBSERVING:
return QuorumStats.Provider.OBSERVING_STATE;
}
return QuorumStats.Provider.UNKNOWN_STATE;
}
/**
* set the id of this quorum peer.
*/
public void setMyid(long myid) {
this.myid = myid;
}
public void setInitialConfig(String initialConfig) {
this.initialConfig = initialConfig;
}
public String getInitialConfig() {
return initialConfig;
}
/**
* Get the number of milliseconds of each tick
*/
public int getTickTime() {
return tickTime;
}
/**
* Set the number of milliseconds of each tick
*/
public void setTickTime(int tickTime) {
LOG.info("tickTime set to {}", tickTime);
this.tickTime = tickTime;
}
/** Maximum number of connections allowed from particular host (ip) */
public int getMaxClientCnxnsPerHost() {
if (cnxnFactory != null) {
return cnxnFactory.getMaxClientCnxnsPerHost();
}
if (secureCnxnFactory != null) {
return secureCnxnFactory.getMaxClientCnxnsPerHost();
}
return -1;
}
/** Whether local sessions are enabled */
public boolean areLocalSessionsEnabled() {
return localSessionsEnabled;
}
/** Whether to enable local sessions */
public void enableLocalSessions(boolean flag) {
LOG.info("Local sessions {}", (flag ? "enabled" : "disabled"));
localSessionsEnabled = flag;
}
/** Whether local sessions are allowed to upgrade to global sessions */
public boolean isLocalSessionsUpgradingEnabled() {
return localSessionsUpgradingEnabled;
}
/** Whether to allow local sessions to upgrade to global sessions */
public void enableLocalSessionsUpgrading(boolean flag) {
LOG.info("Local session upgrading {}", (flag ? "enabled" : "disabled"));
localSessionsUpgradingEnabled = flag;
}
/** minimum session timeout in milliseconds */
public int getMinSessionTimeout() {
return minSessionTimeout;
}
/** minimum session timeout in milliseconds */
public void setMinSessionTimeout(int min) {
LOG.info("minSessionTimeout set to {}", min);
this.minSessionTimeout = min;
}
/** maximum session timeout in milliseconds */
public int getMaxSessionTimeout() {
return maxSessionTimeout;
}
/** maximum session timeout in milliseconds */
public void setMaxSessionTimeout(int max) {
LOG.info("maxSessionTimeout set to {}", max);
this.maxSessionTimeout = max;
}
/** The server socket's listen backlog length */
public int getClientPortListenBacklog() {
return this.clientPortListenBacklog;
}
/** Sets the server socket's listen backlog length. */
public void setClientPortListenBacklog(int backlog) {
this.clientPortListenBacklog = backlog;
}
/**
* Get the number of ticks that the initial synchronization phase can take
*/
public int getInitLimit() {
return initLimit;
}
/**
* Set the number of ticks that the initial synchronization phase can take
*/
public void setInitLimit(int initLimit) {
LOG.info("initLimit set to {}", initLimit);
this.initLimit = initLimit;
}
/**
* Get the current tick
*/
public int getTick() {
return tick.get();
}
public QuorumVerifier configFromString(String s) throws IOException, ConfigException {
Properties props = new Properties();
props.load(new StringReader(s));
return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false);
}
/**
* Return QuorumVerifier object for the last committed configuration.
*/
public QuorumVerifier getQuorumVerifier() {
synchronized (QV_LOCK) {
return quorumVerifier;
}
}
/**
* Return QuorumVerifier object for the last proposed configuration.
*/
public QuorumVerifier getLastSeenQuorumVerifier() {
synchronized (QV_LOCK) {
return lastSeenQuorumVerifier;
}
}
public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW) {
if (qvOLD == null || !qvOLD.equals(qvNEW)) {
LOG.warn("Restarting Leader Election");
getElectionAlg().shutdown();
shuttingDownLE = false;
startLeaderElection();
}
}
public String getNextDynamicConfigFilename() {
if (configFilename == null) {
LOG.warn("configFilename is null! This should only happen in tests.");
return null;
}
return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
}
// On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
// must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
// under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take
// the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
// qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
private void connectNewPeers(QuorumCnxManager qcm) {
if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) {
qcm.connectOne(e.getKey());
}
}
}
}
public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
// If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm
// and then take QV_LOCK. Take the locks in the same order to ensure that we don't
// deadlock against other callers of connectOne(). If qcmRef gets set in another
// thread while we're inside the synchronized block, that does no harm; if we didn't
// take a lock on qcm (because it was null when we sampled it), we won't call
// connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility
// of updates that provably happen in another thread before entering this method.)
QuorumCnxManager qcm = qcmRef.get();
Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
synchronized (outerLockObject) {
synchronized (QV_LOCK) {
if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
LOG.error("setLastSeenQuorumVerifier called with stale config "
+ qv.getVersion()
+ ". Current version: "
+ quorumVerifier.getVersion());
}
// assuming that a version uniquely identifies a configuration, so if
// version is the same, nothing to do here.
if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
return;
}
lastSeenQuorumVerifier = qv;
if (qcm != null) {
connectNewPeers(qcm);
}
if (writeToDisk) {
try {
String fileName = getNextDynamicConfigFilename();
if (fileName != null) {
QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
}
} catch (IOException e) {
LOG.error("Error writing next dynamic config file to disk", e);
}
}
}
}
}
public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
synchronized (QV_LOCK) {
if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
// this is normal. For example - server found out about new config through FastLeaderElection gossiping
// and then got the same config in UPTODATE message so its already known
LOG.debug(
"{} setQuorumVerifier called with known or old config {}. Current version: {}",
getId(),
qv.getVersion(),
quorumVerifier.getVersion());
return quorumVerifier;
}
QuorumVerifier prevQV = quorumVerifier;
quorumVerifier = qv;
if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) {
lastSeenQuorumVerifier = qv;
}
if (writeToDisk) {
// some tests initialize QuorumPeer without a static config file
if (configFilename != null) {
try {
String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
} catch (IOException e) {
LOG.error("Error closing file", e);
}
} else {
LOG.info("writeToDisk == true but configFilename == null");
}
}
if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename());
}
QuorumServer qs = qv.getAllMembers().get(getId());
if (qs != null) {
setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
}
updateObserverMasterList();
return prevQV;
}
}
private String makeDynamicConfigFilename(long version) {
return configFilename + ".dynamic." + Long.toHexString(version);
}
private boolean needEraseClientInfoFromStaticConfig() {
QuorumServer server = quorumVerifier.getAllMembers().get(getId());
return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic);
}
/**
* Get an instance of LeaderElection
*/
public Election getElectionAlg() {
return electionAlg;
}
/**
* Get the synclimit
*/
public int getSyncLimit() {
return syncLimit;
}
/**
* Set the synclimit
*/
public void setSyncLimit(int syncLimit) {
LOG.info("syncLimit set to {}", syncLimit);
this.syncLimit = syncLimit;
}
/**
* Get the connectToLearnerMasterLimit
*/
public int getConnectToLearnerMasterLimit() {
return connectToLearnerMasterLimit;
}
/**
* Set the connectToLearnerMasterLimit
*/
public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
LOG.info("connectToLearnerMasterLimit set to {}", connectToLearnerMasterLimit);
this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
}
/**
* The syncEnabled can also be set via a system property.
*/
public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled";
/**
* Return syncEnabled.
*
* @return
*/
public boolean getSyncEnabled() {
if (System.getProperty(SYNC_ENABLED) != null) {
LOG.info("{}={}", SYNC_ENABLED, Boolean.getBoolean(SYNC_ENABLED));
return Boolean.getBoolean(SYNC_ENABLED);
} else {
return syncEnabled;
}
}
/**
* Set syncEnabled.
*
* @param syncEnabled
*/
public void setSyncEnabled(boolean syncEnabled) {
this.syncEnabled = syncEnabled;
}
/**
* Gets the election type
*/
public int getElectionType() {
return electionType;
}
/**
* Sets the election type
*/
public void setElectionType(int electionType) {
this.electionType = electionType;
}
public boolean getQuorumListenOnAllIPs() {
return quorumListenOnAllIPs;
}
public void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) {
this.quorumListenOnAllIPs = quorumListenOnAllIPs;
}
public void setCnxnFactory(ServerCnxnFactory cnxnFactory) {
this.cnxnFactory = cnxnFactory;
}
public void setSecureCnxnFactory(ServerCnxnFactory secureCnxnFactory) {
this.secureCnxnFactory = secureCnxnFactory;
}
public void setSslQuorum(boolean sslQuorum) {
if (sslQuorum) {
LOG.info("Using TLS encrypted quorum communication");
} else {
LOG.info("Using insecure (non-TLS) quorum communication");
}
this.sslQuorum = sslQuorum;
}
public void setUsePortUnification(boolean shouldUsePortUnification) {
LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled");
this.shouldUsePortUnification = shouldUsePortUnification;
}
private void startServerCnxnFactory() {
if (cnxnFactory != null) {
cnxnFactory.start();
}
if (secureCnxnFactory != null) {
secureCnxnFactory.start();
}
}
private void shutdownServerCnxnFactory() {
if (cnxnFactory != null) {
cnxnFactory.shutdown();
}
if (secureCnxnFactory != null) {
secureCnxnFactory.shutdown();
}
}
// Leader and learner will control the zookeeper server and pass it into QuorumPeer.
public void setZooKeeperServer(ZooKeeperServer zks) {
if (cnxnFactory != null) {
cnxnFactory.setZooKeeperServer(zks);
}
if (secureCnxnFactory != null) {
secureCnxnFactory.setZooKeeperServer(zks);
}
}
public void closeAllConnections() {
if (cnxnFactory != null) {
cnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
}
if (secureCnxnFactory != null) {
secureCnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
}
}
public int getClientPort() {
if (cnxnFactory != null) {
return cnxnFactory.getLocalPort();
}
return -1;
}
public void setTxnFactory(FileTxnSnapLog factory) {
this.logFactory = factory;
}
public FileTxnSnapLog getTxnFactory() {
return this.logFactory;
}
/**
* set zk database for this node
* @param database
*/
public void setZKDatabase(ZKDatabase database) {
this.zkDb = database;
}
protected ZKDatabase getZkDb() {
return zkDb;
}
public synchronized void initConfigInZKDatabase() {
if (zkDb != null) {
zkDb.initConfigInZKDatabase(getQuorumVerifier());
}
}
public boolean isRunning() {
return running;
}
/**
* get reference to QuorumCnxManager
*/
public QuorumCnxManager getQuorumCnxManager() {
return qcmRef.get();
}
private long readLongFromFile(String name) throws IOException {
File file = new File(logFactory.getSnapDir(), name);
BufferedReader br = new BufferedReader(new FileReader(file));
String line = "";
try {
line = br.readLine();
return Long.parseLong(line);
} catch (NumberFormatException e) {
throw new IOException("Found " + line + " in " + file);
} finally {
br.close();
}
}
private long acceptedEpoch = -1;
private long currentEpoch = -1;
public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
/**
* Write a long value to disk atomically. Either succeeds or an exception
* is thrown.
* @param name file name to write the long to
* @param value the long value to write to the named file
* @throws IOException if the file cannot be written atomically
*/
private void writeLongToFile(String name, final long value) throws IOException {
File file = new File(logFactory.getSnapDir(), name);
new AtomicFileWritingIdiom(file, new WriterStatement() {
@Override
public void write(Writer bw) throws IOException {
bw.write(Long.toString(value));
}
});
}
public long getCurrentEpoch() throws IOException {
if (currentEpoch == -1) {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
}
return currentEpoch;
}
public long getAcceptedEpoch() throws IOException {
if (acceptedEpoch == -1) {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
}
return acceptedEpoch;
}
public void setCurrentEpoch(long e) throws IOException {
currentEpoch = e;
writeLongToFile(CURRENT_EPOCH_FILENAME, e);
}
public void setAcceptedEpoch(long e) throws IOException {
acceptedEpoch = e;
writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
}
public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
if (!QuorumPeerConfig.isReconfigEnabled()) {
LOG.debug("Reconfig feature is disabled, skip reconfig processing.");
return false;
}
InetSocketAddress oldClientAddr = getClientAddress();
// update last committed quorum verifier, write the new config to disk
// and restart leader election if config changed.
QuorumVerifier prevQV = setQuorumVerifier(qv, true);
// There is no log record for the initial config, thus after syncing
// with leader
// /zookeeper/config is empty! it is also possible that last committed
// config is propagated during leader election
// without the propagation the corresponding log records.
// so we should explicitly do this (this is not necessary when we're
// already a Follower/Observer, only
// for Learner):
initConfigInZKDatabase();
if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) {
Map<Long, QuorumServer> newMembers = qv.getAllMembers();
updateRemotePeerMXBeans(newMembers);
if (restartLE) {
restartLeaderElection(prevQV, qv);
}
QuorumServer myNewQS = newMembers.get(getId());
if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) {
cnxnFactory.reconfigure(myNewQS.clientAddr);
updateThreadName();
}
boolean roleChange = updateLearnerType(qv);
boolean leaderChange = false;
if (suggestedLeaderId != null) {
// zxid should be non-null too
leaderChange = updateVote(suggestedLeaderId, zxid);
} else {
long currentLeaderId = getCurrentVote().getId();
QuorumServer myleaderInCurQV = prevQV.getVotingMembers().get(currentLeaderId);
QuorumServer myleaderInNewQV = qv.getVotingMembers().get(currentLeaderId);
leaderChange = (myleaderInCurQV == null
|| myleaderInCurQV.addr == null
|| myleaderInNewQV == null
|| !myleaderInCurQV.addr.equals(myleaderInNewQV.addr));
// we don't have a designated leader - need to go into leader
// election
reconfigFlagClear();
}
return roleChange || leaderChange;
}
return false;
}
private void updateRemotePeerMXBeans(Map<Long, QuorumServer> newMembers) {
Set<Long> existingMembers = new HashSet<Long>(newMembers.keySet());
existingMembers.retainAll(jmxRemotePeerBean.keySet());
for (Long id : existingMembers) {
RemotePeerBean rBean = jmxRemotePeerBean.get(id);
rBean.setQuorumServer(newMembers.get(id));
}
Set<Long> joiningMembers = new HashSet<Long>(newMembers.keySet());
joiningMembers.removeAll(jmxRemotePeerBean.keySet());
joiningMembers.remove(getId()); // remove self as it is local bean
for (Long id : joiningMembers) {
QuorumServer qs = newMembers.get(id);
RemotePeerBean rBean = new RemotePeerBean(this, qs);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(qs.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
Set<Long> leavingMembers = new HashSet<Long>(jmxRemotePeerBean.keySet());
leavingMembers.removeAll(newMembers.keySet());
for (Long id : leavingMembers) {
RemotePeerBean rBean = jmxRemotePeerBean.remove(id);
try {
MBeanRegistry.getInstance().unregister(rBean);
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
}
}
private ArrayList<QuorumServer> observerMasters = new ArrayList<>();
private void updateObserverMasterList() {
if (observerMasterPort <= 0) {
return; // observer masters not enabled
}
observerMasters.clear();
StringBuilder sb = new StringBuilder();
for (QuorumServer server : quorumVerifier.getVotingMembers().values()) {
InetAddress address = server.addr.getReachableOrOne().getAddress();
InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort);
observerMasters.add(new QuorumServer(server.id, addr));
sb.append(addr).append(",");
}
LOG.info("Updated learner master list to be {}", sb.toString());
Collections.shuffle(observerMasters);
// Reset the internal index of the observerMaster when
// the observerMaster List is refreshed
nextObserverMaster = 0;
}
private boolean useObserverMasters() {
return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0;
}
private int nextObserverMaster = 0;
private QuorumServer nextObserverMaster() {
if (nextObserverMaster >= observerMasters.size()) {
nextObserverMaster = 0;
// Add a reconnect delay only after the observer
// has exhausted trying to connect to all the masters
// from the observerMasterList
if (isRunning()) {
Observer.waitForReconnectDelay();
}
}
return observerMasters.get(nextObserverMaster++);
}
QuorumServer findLearnerMaster(QuorumServer leader) {
if (useObserverMasters()) {
return nextObserverMaster();
} else {
// Add delay jitter to reduce the load on the leader
if (isRunning()) {
Observer.waitForReconnectDelay();
}
return leader;
}
}
/**
* Vet a given learner master's information.
* Allows specification by server id, ip only, or ip and port
*/
QuorumServer validateLearnerMaster(String desiredMaster) {
if (useObserverMasters()) {
Long sid;
try {
sid = Long.parseLong(desiredMaster);
} catch (NumberFormatException e) {
sid = null;
}
for (QuorumServer server : observerMasters) {
if (sid == null) {
for (InetSocketAddress address : server.addr.getAllAddresses()) {
String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort();
if (serverAddr.startsWith(desiredMaster)) {
return server;
}
}
} else {
if (sid.equals(server.id)) {
return server;
}
}
}
if (sid == null) {
LOG.info("could not find learner master address={}", desiredMaster);
} else {
LOG.warn("could not find learner master sid={}", sid);
}
} else {
LOG.info("cannot validate request, observer masters not enabled");
}
return null;
}
private boolean updateLearnerType(QuorumVerifier newQV) {
//check if I'm an observer in new config
if (newQV.getObservingMembers().containsKey(getId())) {
if (getLearnerType() != LearnerType.OBSERVER) {
setLearnerType(LearnerType.OBSERVER);
LOG.info("Becoming an observer");
reconfigFlagSet();
return true;
} else {
return false;
}
} else if (newQV.getVotingMembers().containsKey(getId())) {
if (getLearnerType() != LearnerType.PARTICIPANT) {
setLearnerType(LearnerType.PARTICIPANT);
LOG.info("Becoming a voting participant");
reconfigFlagSet();
return true;
} else {
return false;
}
}
// I'm not in the view
if (getLearnerType() != LearnerType.PARTICIPANT) {
setLearnerType(LearnerType.PARTICIPANT);
LOG.info("Becoming a non-voting participant");
reconfigFlagSet();
return true;
}
return false;
}
private boolean updateVote(long designatedLeader, long zxid) {
Vote currentVote = getCurrentVote();
if (currentVote != null && designatedLeader != currentVote.getId()) {
setCurrentVote(new Vote(designatedLeader, zxid));
reconfigFlagSet();
LOG.warn("Suggested leader: {}", designatedLeader);
return true;
}
return false;
}
/**
* Updates leader election info to avoid inconsistencies when
* a new server tries to join the ensemble.
*
* Here is the inconsistency scenario we try to solve by updating the peer
* epoch after following leader:
*
* Let's say we have an ensemble with 3 servers z1, z2 and z3.
*
* 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is
* 0xb9, aka current accepted epoch on disk.
* 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading
* the current accept epoch from disk.
* 3. z2 received notification from z1 and z3, which is following z3 with
* epoch 0xb8, so it started following z3 again with peer epoch 0xb8.
* 4. before z2 successfully connected to z3, z3 get restarted with new
* epoch 0xb9.
* 5. z2 will retry around a few round (default 5s) before giving up,
* meanwhile it will report z3 as leader.
* 6. z1 restarted, and looking with peer epoch 0xb9.
* 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9.
* 8. z2 successfully connected to z3 before giving up, but with peer
* epoch 0xb8.
* 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot
* join, because z2 is reporting peer epoch 0xb8, while z3 is reporting
* 0xb9.
*
* By updating the election vote after actually following leader, we can
* avoid this kind of stuck happened.
*
* Btw, the zxid and electionEpoch could be inconsistent because of the same
* reason, it's better to update these as well after syncing with leader, but
* that required protocol change which is non trivial. This problem is worked
* around by skipping comparing the zxid and electionEpoch when counting for
* votes for out of election servers during looking for leader.
*
* See https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
protected void updateElectionVote(long newEpoch) {
Vote currentVote = getCurrentVote();
if (currentVote != null) {
setCurrentVote(new Vote(currentVote.getId(), currentVote.getZxid(), currentVote.getElectionEpoch(), newEpoch, currentVote
.getState()));
}
}
private void updateThreadName() {
String plain = cnxnFactory != null
? cnxnFactory.getLocalAddress() != null
? formatInetAddr(cnxnFactory.getLocalAddress())
: "disabled"
: "disabled";
String secure = secureCnxnFactory != null ? formatInetAddr(secureCnxnFactory.getLocalAddress()) : "disabled";
setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getId(), plain, secure));
}
/**
* Sets the time taken for leader election in milliseconds.
*
* @param electionTimeTaken time taken for leader election
*/
void setElectionTimeTaken(long electionTimeTaken) {
this.electionTimeTaken = electionTimeTaken;
}
/**
* @return the time taken for leader election in milliseconds.
*/
long getElectionTimeTaken() {
return electionTimeTaken;
}
void setQuorumServerSaslRequired(boolean serverSaslRequired) {
quorumServerSaslAuthRequired = serverSaslRequired;
LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, serverSaslRequired);
}
void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
quorumLearnerSaslAuthRequired = learnerSaslRequired;
LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, learnerSaslRequired);
}
void setQuorumSaslEnabled(boolean enableAuth) {
quorumSaslEnableAuth = enableAuth;
if (!quorumSaslEnableAuth) {
LOG.info("QuorumPeer communication is not secured! (SASL auth disabled)");
} else {
LOG.info("{} set to {}", QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
}
}
void setQuorumServicePrincipal(String servicePrincipal) {
quorumServicePrincipal = servicePrincipal;
LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, quorumServicePrincipal);
}
void setQuorumLearnerLoginContext(String learnerContext) {
quorumLearnerLoginContext = learnerContext;
LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, quorumLearnerLoginContext);
}
void setQuorumServerLoginContext(String serverContext) {
quorumServerLoginContext = serverContext;
LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT, quorumServerLoginContext);
}
void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
quorumCnxnThreadsSize = qCnxnThreadsSize;
}
LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
}
boolean isQuorumSaslAuthEnabled() {
return quorumSaslEnableAuth;
}
private boolean isQuorumServerSaslAuthRequired() {
return quorumServerSaslAuthRequired;
}
private boolean isQuorumLearnerSaslAuthRequired() {
return quorumLearnerSaslAuthRequired;
}
public QuorumCnxManager createCnxnManager() {
int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout);
return new QuorumCnxManager(
this,
this.getId(),
this.getView(),
this.authServer,
this.authLearner,
timeout,
this.getQuorumListenOnAllIPs(),
this.quorumCnxnThreadsSize,
this.isQuorumSaslAuthEnabled());
}
boolean isLeader(long id) {
Vote vote = getCurrentVote();
return vote != null && id == vote.getId();
}
@InterfaceAudience.Private
/**
* This is a metric that depends on the status of the peer.
*/ public Integer getSynced_observers_metric() {
if (leader != null) {
return leader.getObservingLearners().size();
} else if (follower != null) {
return follower.getSyncedObserverSize();
} else {
return null;
}
}
}