blob: 55c1dc271396f0e18923af80d2d3124dcd23d95f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.net.ssl.SSLSocket;
import org.apache.zookeeper.common.NetUtils;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.util.CircularBlockingQueue;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements a connection manager for leader election using TCP. It
* maintains one connection for every pair of servers. The tricky part is to
* guarantee that there is exactly one connection for every pair of servers that
* are operating correctly and that can communicate over the network.
*
* If two servers try to start a connection concurrently, then the connection
* manager uses a very simple tie-breaking mechanism to decide which connection
* to drop based on the IP addressed of the two parties.
*
* For every peer, the manager maintains a queue of messages to send. If the
* connection to any particular peer drops, then the sender thread puts the
* message back on the list. As this implementation currently uses a queue
* implementation to maintain messages to send to another peer, we add the
* message to the tail of the queue, thus changing the order of messages.
* Although this is not a problem for the leader election, it could be a problem
* when consolidating peer communication. This is to be verified, though.
*
*/
public class QuorumCnxManager {
private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManager.class);
/*
* Maximum capacity of thread queues
*/
static final int RECV_CAPACITY = 100;
// Initialized to 1 to prevent sending
// stale notifications to peers
static final int SEND_CAPACITY = 1;
static final int PACKETMAXSIZE = 1024 * 512;
/*
* Negative counter for observer server ids.
*/
private AtomicLong observerCounter = new AtomicLong(-1);
/*
* Protocol identifier used among peers
*/
public static final long PROTOCOL_VERSION = -65535L;
/*
* Max buffer size to be read from the network.
*/
public static final int maxBuffer = 2048;
/*
* Connection time out value in milliseconds
*/
private int cnxTO = 5000;
final QuorumPeer self;
/*
* Local IP address
*/
final long mySid;
final int socketTimeout;
final Map<Long, QuorumPeer.QuorumServer> view;
final boolean listenOnAllIPs;
private ThreadPoolExecutor connectionExecutor;
private final Set<Long> inprogressConnections = Collections.synchronizedSet(new HashSet<>());
private QuorumAuthServer authServer;
private QuorumAuthLearner authLearner;
private boolean quorumSaslAuthEnabled;
/*
* Counter to count connection processing threads.
*/
private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
/*
* Mapping from Peer to Thread number
*/
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
/*
* Reception queue
*/
public final BlockingQueue<Message> recvQueue;
/*
* Shutdown flag
*/
volatile boolean shutdown = false;
/*
* Listener thread
*/
public final Listener listener;
/*
* Counter to count worker threads
*/
private AtomicInteger threadCnt = new AtomicInteger(0);
/*
* Socket options for TCP keepalive
*/
private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");
public static class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
this.sid = sid;
}
ByteBuffer buffer;
long sid;
}
/*
* This class parses the initial identification sent out by peers with their
* sid & hostname.
*/
public static class InitialMessage {
public Long sid;
public List<InetSocketAddress> electionAddr;
InitialMessage(Long sid, List<InetSocketAddress> addresses) {
this.sid = sid;
this.electionAddr = addresses;
}
@SuppressWarnings("serial")
public static class InitialMessageException extends Exception {
InitialMessageException(String message, Object... args) {
super(String.format(message, args));
}
}
public static InitialMessage parse(Long protocolVersion, DataInputStream din) throws InitialMessageException, IOException {
Long sid;
if (protocolVersion != PROTOCOL_VERSION) {
throw new InitialMessageException("Got unrecognized protocol version %s", protocolVersion);
}
sid = din.readLong();
int remaining = din.readInt();
if (remaining <= 0 || remaining > maxBuffer) {
throw new InitialMessageException("Unreasonable buffer length: %s", remaining);
}
byte[] b = new byte[remaining];
int num_read = din.read(b);
if (num_read != remaining) {
throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", num_read, remaining, sid);
}
String[] addressStrings = new String(b).split("\\|");
List<InetSocketAddress> addresses = new ArrayList<>(addressStrings.length);
for (String addr : addressStrings) {
String[] host_port;
try {
host_port = ConfigUtils.getHostAndPort(addr);
} catch (ConfigException e) {
throw new InitialMessageException("Badly formed address: %s", addr);
}
if (host_port.length != 2) {
throw new InitialMessageException("Badly formed address: %s", addr);
}
int port;
try {
port = Integer.parseInt(host_port[1]);
} catch (NumberFormatException e) {
throw new InitialMessageException("Bad port number: %s", host_port[1]);
} catch (ArrayIndexOutOfBoundsException e) {
throw new InitialMessageException("No port number in: %s", addr);
}
addresses.add(new InetSocketAddress(host_port[0], port));
}
return new InitialMessage(sid, addresses);
}
}
public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs,
int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<>();
this.senderWorkerMap = new ConcurrentHashMap<>();
this.lastMessageSent = new ConcurrentHashMap<>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if (cnxToValue != null) {
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.self = self;
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
listener = new Listener();
listener.setName("QuorumPeerListener");
}
private void initializeAuth(final long mySid, final QuorumAuthServer authServer,
final QuorumAuthLearner authLearner, final int quorumCnxnThreadsSize, final boolean quorumSaslAuthEnabled) {
this.authServer = authServer;
this.authLearner = authLearner;
this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
if (!this.quorumSaslAuthEnabled) {
LOG.debug("Not initializing connection executor as quorum sasl auth is disabled");
return;
}
// init connection executors
final AtomicInteger threadIndex = new AtomicInteger(1);
SecurityManager s = System.getSecurityManager();
final ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
ThreadFactory daemonThFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(
group,
r,
"QuorumConnectionThread-[myid=" + mySid + "]-" + threadIndex.getAndIncrement());
return t;
}
};
this.connectionExecutor = new ThreadPoolExecutor(3, quorumCnxnThreadsSize, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), daemonThFactory);
this.connectionExecutor.allowCoreThreadTimeOut(true);
}
/**
* Invokes initiateConnection for testing purposes
*
* @param sid
*/
public void testInitiateConnection(long sid) throws Exception {
LOG.debug("Opening channel to server {}", sid);
Socket sock = new Socket();
setSockOpts(sock);
InetSocketAddress address = self.getVotingView().get(sid).electionAddr.getReachableOrOne();
sock.connect(address, cnxTO);
initiateConnection(sock, sid);
}
/**
* If this server has initiated the connection, then it gives up on the
* connection if it loses challenge. Otherwise, it keeps the connection.
*/
public void initiateConnection(final Socket sock, final Long sid) {
try {
startConnection(sock, sid);
} catch (IOException e) {
LOG.error(
"Exception while connecting, id: {}, addr: {}, closing learner connection",
sid,
sock.getRemoteSocketAddress(),
e);
closeSocket(sock);
}
}
/**
* Server will initiate the connection request to its peer server
* asynchronously via separate connection thread.
*/
public void initiateConnectionAsync(final Socket sock, final Long sid) {
if (!inprogressConnections.add(sid)) {
// simply return as there is a connection request to
// server 'sid' already in progress.
LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid);
closeSocket(sock);
return;
}
try {
connectionExecutor.execute(new QuorumConnectionReqThread(sock, sid));
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
// Imp: Safer side catching all type of exceptions and remove 'sid'
// from inprogress connections. This is to avoid blocking further
// connection requests from this 'sid' in case of errors.
inprogressConnections.remove(sid);
LOG.error("Exception while submitting quorum connection request", e);
closeSocket(sock);
}
}
/**
* Thread to send connection request to peer server.
*/
private class QuorumConnectionReqThread extends ZooKeeperThread {
final Socket sock;
final Long sid;
QuorumConnectionReqThread(final Socket sock, final Long sid) {
super("QuorumConnectionReqThread-" + sid);
this.sock = sock;
this.sid = sid;
}
@Override
public void run() {
try {
initiateConnection(sock, sid);
} finally {
inprogressConnections.remove(sid);
}
}
}
private boolean startConnection(Socket sock, Long sid) throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = self.getElectionAddress().getAllAddresses().stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
LOG.info(
"Have smaller server identifier, so dropping the connection: ({}, {})",
sid,
self.getId());
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
/**
* If this server receives a connection request, then it gives up on the new
* connection if it wins. Notice that it checks whether it has a connection
* to this server already or not. If it does, then it sends the smallest
* possible long value to lose the challenge.
*
*/
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
closeSocket(sock);
}
}
/**
* Server receives a connection request and handles it asynchronously via
* separate thread.
*/
public void receiveConnectionAsync(final Socket sock) {
try {
connectionExecutor.execute(new QuorumConnectionReceiverThread(sock));
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
closeSocket(sock);
}
}
/**
* Thread to receive connection request from peer server.
*/
private class QuorumConnectionReceiverThread extends ZooKeeperThread {
private final Socket sock;
QuorumConnectionReceiverThread(final Socket sock) {
super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
this.sock = sock;
}
@Override
public void run() {
receiveConnection(sock);
}
}
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
Long sid = null, protocolVersion = null;
MultipleAddresses electionAddr = null;
try {
protocolVersion = din.readLong();
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
} else {
try {
InitialMessage init = InitialMessage.parse(protocolVersion, din);
sid = init.sid;
electionAddr = new MultipleAddresses(init.electionAddr);
} catch (InitialMessage.InitialMessageException ex) {
LOG.error(ex.toString());
closeSocket(sock);
return;
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter.getAndDecrement();
LOG.info("Setting arbitrary identifier to observer: {}", sid);
}
} catch (IOException e) {
LOG.warn("Exception reading or writing challenge", e);
closeSocket(sock);
return;
}
// do authenticating learner
authServer.authenticate(sock, din);
//If wins the challenge, then close the new connection.
if (sid < self.getId()) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: {}", sid);
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else { // Otherwise start worker threads to receive data.
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
}
}
/**
* Processes invoke this message to queue a message to send. Currently,
* only leader election uses it.
*/
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
connectOne(sid);
}
}
/**
* Try to establish a connection to server with id sid using its electionAddr.
*
* VisibleForTesting.
*
* @param sid server id
* @return boolean success indication
*/
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server {}", sid);
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
return true;
}
Socket sock = null;
try {
LOG.debug("Opening channel to server {}", sid);
if (self.isSslQuorum()) {
sock = self.getX509Util().createSSLSocket();
} else {
sock = new Socket();
}
setSockOpts(sock);
sock.connect(electionAddr.getReachableAddress(), cnxTO);
if (sock instanceof SSLSocket) {
SSLSocket sslSock = (SSLSocket) sock;
sslSock.startHandshake();
LOG.info("SSL handshake complete with {} - {} - {}",
sslSock.getRemoteSocketAddress(),
sslSock.getSession().getProtocol(),
sslSock.getSession().getCipherSuite());
}
LOG.debug("Connected to server {} using election address: {}:{}",
sid, sock.getInetAddress(), sock.getPort());
// Sends connection request asynchronously if the quorum
// sasl authentication is enabled. This is required because
// sasl server authentication process may take few seconds to
// finish, this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
initiateConnectionAsync(sock, sid);
} else {
initiateConnection(sock, sid);
}
return true;
} catch (UnresolvedAddressException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, also UAE cannot be wrapped cleanly
// so we log the exception in order to capture this critical
// detail.
LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);
closeSocket(sock);
throw e;
} catch (X509Exception e) {
LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);
closeSocket(sock);
return false;
} catch (NoRouteToHostException e) {
LOG.warn("None of the addresses ({}) are reachable for sid {}", electionAddr, sid, e);
closeSocket(sock);
return false;
} catch (IOException e) {
LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);
closeSocket(sock);
return false;
}
}
/**
* Try to establish a connection to server with id sid.
*
* @param sid server id
*/
synchronized void connectOne(long sid) {
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server {}", sid);
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
return;
}
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
return;
}
}
if (lastSeenQV != null
&& lastProposedView.containsKey(sid)
&& (!knownId
|| (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) {
knownId = true;
if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
return;
}
}
if (!knownId) {
LOG.warn("Invalid server id: {} ", sid);
}
}
}
/**
* Try to establish a connection with each server if one
* doesn't exist.
*/
public void connectAll() {
long sid;
for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
sid = en.nextElement();
connectOne(sid);
}
}
/**
* Check if all queues are empty, indicating that all messages have been delivered.
*/
boolean haveDelivered() {
for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
final int queueSize = queue.size();
LOG.debug("Queue size: {}", queueSize);
if (queueSize == 0) {
return true;
}
}
return false;
}
/**
* Flag that it is time to wrap up all activities and interrupt the listener.
*/
public void halt() {
shutdown = true;
LOG.debug("Halting listener");
listener.halt();
// Wait for the listener to terminate.
try {
listener.join();
} catch (InterruptedException ex) {
LOG.warn("Got interrupted before joining the listener", ex);
}
softHalt();
// clear data structures used for auth
if (connectionExecutor != null) {
connectionExecutor.shutdown();
}
inprogressConnections.clear();
resetConnectionThreadCount();
}
/**
* A soft halt simply finishes workers.
*/
public void softHalt() {
for (SendWorker sw : senderWorkerMap.values()) {
LOG.debug("Halting sender: {}", sw);
sw.finish();
}
}
/**
* Helper method to set socket options.
*
* @param sock
* Reference to socket
*/
private void setSockOpts(Socket sock) throws SocketException {
sock.setTcpNoDelay(true);
sock.setKeepAlive(tcpKeepAlive);
sock.setSoTimeout(this.socketTimeout);
}
/**
* Helper method to close a socket.
*
* @param sock
* Reference to socket
*/
private void closeSocket(Socket sock) {
if (sock == null) {
return;
}
try {
sock.close();
} catch (IOException ie) {
LOG.error("Exception while closing", ie);
}
}
/**
* Return number of worker threads
*/
public long getThreadCount() {
return threadCnt.get();
}
/**
* Return number of connection processing threads.
*/
public long getConnectionThreadCount() {
return connectionThreadCnt.get();
}
/**
* Reset the value of connection processing threads count to zero.
*/
private void resetConnectionThreadCount() {
connectionThreadCnt.set(0);
}
/**
* Thread to listen on some ports
*/
public class Listener extends ZooKeeperThread {
private static final String ELECTION_PORT_BIND_RETRY = "zookeeper.electionPortBindRetry";
private static final int DEFAULT_PORT_BIND_MAX_RETRY = 3;
private final int portBindMaxRetry;
private Runnable socketBindErrorHandler = () -> ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
private List<ListenerHandler> listenerHandlers;
private final AtomicBoolean socketException;
public Listener() {
// During startup of thread, thread name will be overridden to
// specific election address
super("ListenerThread");
socketException = new AtomicBoolean(false);
// maximum retry count while trying to bind to election port
// see ZOOKEEPER-3320 for more details
final Integer maxRetry = Integer.getInteger(ELECTION_PORT_BIND_RETRY,
DEFAULT_PORT_BIND_MAX_RETRY);
if (maxRetry >= 0) {
LOG.info("Election port bind maximum retries is {}", maxRetry == 0 ? "infinite" : maxRetry);
portBindMaxRetry = maxRetry;
} else {
LOG.info(
"'{}' contains invalid value: {}(must be >= 0). Use default value of {} instead.",
ELECTION_PORT_BIND_RETRY,
maxRetry,
DEFAULT_PORT_BIND_MAX_RETRY);
portBindMaxRetry = DEFAULT_PORT_BIND_MAX_RETRY;
}
}
/**
* Change socket bind error handler. Used for testing.
*/
void setSocketBindErrorHandler(Runnable errorHandler) {
this.socketBindErrorHandler = errorHandler;
}
@Override
public void run() {
if (!shutdown) {
Set<InetSocketAddress> addresses;
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getElectionAddress().getWildcardAddresses();
} else {
addresses = self.getElectionAddress().getAllAddresses();
}
CountDownLatch latch = new CountDownLatch(addresses.size());
listenerHandlers = addresses.stream().map(address ->
new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
.collect(Collectors.toList());
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
listenerHandlers.forEach(executor::submit);
try {
latch.await();
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
} finally {
// Clean up for shutdown.
for (ListenerHandler handler : listenerHandlers) {
try {
handler.close();
} catch (IOException ie) {
// Don't log an error for shutdown.
LOG.debug("Error closing server socket", ie);
}
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error(
"As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}",
self.getElectionAddress().getAllAddresses().stream()
.map(NetUtils::formatInetAddr)
.collect(Collectors.joining("|")));
if (socketException.get()) {
// After leaving listener thread, the host cannot join the quorum anymore,
// this is a severe error that we cannot recover from, so we need to exit
socketBindErrorHandler.run();
}
}
}
/**
* Halts this listener thread.
*/
void halt() {
LOG.debug("Trying to close listeners");
if (listenerHandlers != null) {
LOG.debug("Closing listener: {}", QuorumCnxManager.this.mySid);
for (ListenerHandler handler : listenerHandlers) {
try {
handler.close();
} catch (IOException e) {
LOG.warn("Exception when shutting down listener: ", e);
}
}
}
}
class ListenerHandler implements Runnable, Closeable {
private ServerSocket serverSocket;
private InetSocketAddress address;
private boolean portUnification;
private boolean sslQuorum;
private CountDownLatch latch;
ListenerHandler(InetSocketAddress address, boolean portUnification, boolean sslQuorum,
CountDownLatch latch) {
this.address = address;
this.portUnification = portUnification;
this.sslQuorum = sslQuorum;
this.latch = latch;
}
/**
* Sleeps on acceptConnections().
*/
@Override
public void run() {
try {
Thread.currentThread().setName("ListenerHandler-" + address);
acceptConnections();
try {
close();
} catch (IOException e) {
LOG.warn("Exception when shutting down listener: ", e);
}
} catch (Exception e) {
// Output of unexpected exception, should never happen
LOG.error("Unexpected error ", e);
} finally {
latch.countDown();
}
}
@Override
public synchronized void close() throws IOException {
if (serverSocket != null && !serverSocket.isClosed()) {
LOG.debug("Trying to close listeners: {}", serverSocket);
serverSocket.close();
}
}
/**
* Sleeps on accept().
*/
private void acceptConnections() {
int numRetries = 0;
Socket client = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
serverSocket = createNewServerSocket();
LOG.info("My election bind port: {}", address.toString());
while (!shutdown) {
try {
client = serverSocket.accept();
setSockOpts(client);
LOG.info("Received connection request {}", client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {
LOG.warn("The socket is listening for the election accepted "
+ "and it timed out unexpectedly, but will retry."
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException e) {
if (shutdown) {
break;
}
LOG.error("Exception while listening", e);
if (e instanceof SocketException) {
socketException.set(true);
}
numRetries++;
try {
close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
}
closeSocket(client);
}
}
if (!shutdown) {
LOG.error(
"Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.",
formatInetAddr(address),
numRetries,
ELECTION_PORT_BIND_RETRY);
}
}
private ServerSocket createNewServerSocket() throws IOException {
ServerSocket socket;
if (portUnification) {
LOG.info("Creating TLS-enabled quorum server socket");
socket = new UnifiedServerSocket(self.getX509Util(), true);
} else if (sslQuorum) {
LOG.info("Creating TLS-only quorum server socket");
socket = new UnifiedServerSocket(self.getX509Util(), false);
} else {
socket = new ServerSocket();
}
socket.setReuseAddress(true);
socket.bind(address);
return socket;
}
}
}
/**
* Thread to send messages. Instance waits on a queue, and send a message as
* soon as there is one available. If connection breaks, then opens a new
* one.
*/
class SendWorker extends ZooKeeperThread {
Long sid;
Socket sock;
RecvWorker recvWorker;
volatile boolean running = true;
DataOutputStream dout;
AtomicBoolean ongoingAsyncValidation = new AtomicBoolean(false);
/**
* An instance of this thread receives messages to send
* through a queue and sends them to the server sid.
*
* @param sock
* Socket to remote peer
* @param sid
* Server identifier of remote peer
*/
SendWorker(Socket sock, Long sid) {
super("SendWorker:" + sid);
this.sid = sid;
this.sock = sock;
recvWorker = null;
try {
dout = new DataOutputStream(sock.getOutputStream());
} catch (IOException e) {
LOG.error("Unable to access socket output stream", e);
closeSocket(sock);
running = false;
}
LOG.debug("Address of remote peer: {}", this.sid);
}
synchronized void setRecv(RecvWorker recvWorker) {
this.recvWorker = recvWorker;
}
/**
* Returns RecvWorker that pairs up with this SendWorker.
*
* @return RecvWorker
*/
synchronized RecvWorker getRecvWorker() {
return recvWorker;
}
synchronized boolean finish() {
LOG.debug("Calling finish for {}", sid);
if (!running) {
/*
* Avoids running finish() twice.
*/
return running;
}
running = false;
closeSocket(sock);
this.interrupt();
if (recvWorker != null) {
recvWorker.finish();
}
LOG.debug("Removing entry from senderWorkerMap sid={}", sid);
senderWorkerMap.remove(sid, this);
threadCnt.decrementAndGet();
return running;
}
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
b.position(0);
b.get(msgBytes);
} catch (BufferUnderflowException be) {
LOG.error("BufferUnderflowException ", be);
return;
}
dout.writeInt(b.capacity());
dout.write(b.array());
dout.flush();
}
@Override
public void run() {
threadCnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid={}", sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for server {}", sid);
break;
}
if (b != null) {
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue", e);
}
}
} catch (Exception e) {
LOG.warn(
"Exception when using channel: for id {} my id = {}",
sid ,
QuorumCnxManager.this.mySid,
e);
}
this.finish();
LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId());
}
public void asyncValidateIfSocketIsStillReachable() {
if (ongoingAsyncValidation.compareAndSet(false, true)) {
new Thread(() -> {
LOG.debug("validate if destination address is reachable for sid {}", sid);
if (sock != null) {
InetAddress address = sock.getInetAddress();
try {
if (address.isReachable(500)) {
LOG.debug("destination address {} is reachable for sid {}", address.toString(), sid);
ongoingAsyncValidation.set(false);
return;
}
} catch (NullPointerException | IOException ignored) {
}
LOG.warn(
"destination address {} not reachable anymore, shutting down the SendWorker for sid {}",
address.toString(),
sid);
this.finish();
}
}).start();
} else {
LOG.debug("validation of destination address for sid {} is skipped (it is already running)", sid);
}
}
}
/**
* Thread to receive messages. Instance waits on a socket read. If the
* channel breaks, then removes itself from the pool of receivers.
*/
class RecvWorker extends ZooKeeperThread {
Long sid;
Socket sock;
volatile boolean running = true;
final DataInputStream din;
final SendWorker sw;
RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
super("RecvWorker:" + sid);
this.sid = sid;
this.sock = sock;
this.sw = sw;
this.din = din;
try {
// OK to wait until socket disconnects while reading.
sock.setSoTimeout(0);
} catch (IOException e) {
LOG.error("Error while accessing socket for {}", sid, e);
closeSocket(sock);
running = false;
}
}
/**
* Shuts down this worker
*
* @return boolean Value of variable running
*/
synchronized boolean finish() {
if (!running) {
/*
* Avoids running finish() twice.
*/
return running;
}
running = false;
this.interrupt();
threadCnt.decrementAndGet();
return running;
}
@Override
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
final byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
}
} catch (Exception e) {
LOG.warn(
"Connection broken for id {}, my id = {}",
sid,
QuorumCnxManager.this.mySid,
e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
closeSocket(sock);
}
}
}
/**
* Inserts an element in the provided {@link BlockingQueue}. This method
* assumes that if the Queue is full, an element from the head of the Queue is
* removed and the new item is inserted at the tail of the queue. This is done
* to prevent a thread from blocking while inserting an element in the queue.
*
* @param queue Reference to the Queue
* @param buffer Reference to the buffer to be inserted in the queue
*/
private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
final ByteBuffer buffer) {
final boolean success = queue.offer(buffer);
if (!success) {
throw new RuntimeException("Could not insert into receive queue");
}
}
/**
* Returns true if queue is empty.
* @param queue
* Reference to the queue
* @return
* true if the specified queue is empty
*/
private boolean isSendQueueEmpty(final BlockingQueue<ByteBuffer> queue) {
return queue.isEmpty();
}
/**
* Retrieves and removes buffer at the head of this queue,
* waiting up to the specified wait time if necessary for an element to
* become available.
*
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
final long timeout, final TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
/**
* Inserts an element in the {@link #recvQueue}. If the Queue is full, this
* methods removes an element from the head of the Queue and then inserts the
* element at the tail of the queue.
*
* @param msg Reference to the message to be inserted in the queue
*/
public void addToRecvQueue(final Message msg) {
final boolean success = this.recvQueue.offer(msg);
if (!success) {
throw new RuntimeException("Could not insert into receive queue");
}
}
/**
* Retrieves and removes a message at the head of this queue,
* waiting up to the specified wait time if necessary for an element to
* become available.
*
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
public Message pollRecvQueue(final long timeout, final TimeUnit unit)
throws InterruptedException {
return this.recvQueue.poll(timeout, unit);
}
public boolean connectedToPeer(long peerSid) {
return senderWorkerMap.get(peerSid) != null;
}
}