blob: 843b49c25fd4a24514dc46fb8a55a07d1f2374a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.tcp;
import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE;
import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.alerting.internal.spi.AlertingIOException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.LonerDistributionManager;
import org.apache.geode.distributed.internal.direct.DirectChannel;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.MemberShunnedException;
import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* <p>
* TCPConduit manages a server socket and a collection of connections to other systems. Connections
* are identified by DistributedMember IDs. These types of messages are currently supported:
*
* <p>
* DistributionMessage - message is delivered to the server's ServerDelegate
*
* <p>
* In the current implementation, ServerDelegate is the DirectChannel used by the GemFire
* DistributionManager to send and receive messages.
*
* <p>
* If the ServerDelegate is null, DistributionMessages are ignored by the TCPConduit.
*
* @since GemFire 2.0
*/
public class TCPConduit implements Runnable {
private static final Logger logger = LogService.getLogger();
/**
* max amount of time (ms) to wait for listener threads to stop
*/
@MakeNotStatic
private static int LISTENER_CLOSE_TIMEOUT;
/**
* BACKLOG is the "accept" backlog configuration parameter for the conduits server socket.
* In most operating systems this is limited to 128 by default and you must change
* the OS setting somaxconn to go beyond that limit. Note that setting this too high
* can have ramifications when disconnecting from the distributed system if a thread
* is trying to connect to another member that is not accepting connections quickly
* enough. Setting it too low can also have adverse effects because backlog overflows
* aren't handled well by most tcp/ip implementations, causing connect timeouts instead
* of expected ServerRefusedConnection exceptions.
*
* <p>
* Normally the backlog isn't that important because if it's full of connection requests
* a SYN "cookie" mechanism is used to bypass the backlog queue. If this is turned off
* though connection requests are dropped when the queue is full.
*/
@MakeNotStatic
private static int BACKLOG;
/**
* use javax.net.ssl.SSLServerSocketFactory?
*/
private final boolean useSSL;
private final BufferPool bufferPool;
/**
* The socket producer used by the cluster
*/
private final SocketCreator socketCreator;
private final Membership<InternalDistributedMember> membership;
static {
init();
}
/**
* the size of OS TCP/IP buffers, not set by default
*/
int tcpBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
int idleConnectionTimeout = DEFAULT_SOCKET_LEASE_TIME;
/**
* port is the tcp/ip port that this conduit binds to. If it is zero, a port from
* membership-port-range is selected to bind to. The actual port number this conduit is listening
* on will be in the "id" instance variable
*/
private int port;
private final int[] tcpPortRange =
new int[] {DEFAULT_MEMBERSHIP_PORT_RANGE[0], DEFAULT_MEMBERSHIP_PORT_RANGE[1]};
/**
* The java groups address that this conduit is associated with
*/
private InternalDistributedMember localAddr;
/**
* address is the InetAddress that this conduit uses for identity
*/
private final InetAddress address;
/**
* isBindAddress is true if we should bind to the address
*/
private final boolean isBindAddress;
/**
* the object that receives DistributionMessage messages received by this conduit.
*/
private final DirectChannel directChannel;
private DMStats stats;
/**
* Config from the delegate
*
* @since GemFire 4.2.1
*/
private DistributionConfig config;
/**
* server socket address
*/
private InetSocketAddress id;
private volatile boolean stopped;
/**
* the listener thread
*/
private Thread thread;
/**
* if using NIO, this is the object used for accepting connections
*/
private ServerSocketChannel channel;
/**
* the server socket
*/
private ServerSocket socket;
/**
* a table of Connections from this conduit to others
*/
private ConnectionTable conTable;
/**
* the reason for a shutdown, if abnormal
*/
private volatile Exception shutdownCause;
private final Stopper stopper = new Stopper();
/**
* <p>
* creates a new TCPConduit bound to the given InetAddress and port. The given ServerDelegate will
* receive any DistributionMessages passed to the conduit.
*
* <p>
* This constructor forces the conduit to ignore the following system properties and look for them
* only in the <i>props</i> argument:
*
* <pre>
* p2p.tcpBufferSize
* p2p.idleConnectionTimeout
* </pre>
*/
public TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindAddress,
DirectChannel receiver, BufferPool bufferPool, Properties props) throws ConnectionException {
this(mgr, port, address, isBindAddress, receiver, bufferPool, props, ConnectionTable::create,
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER),
() -> {
try {
LocalHostUtil.getLocalHost();
} catch (UnknownHostException e) {
throw new ConnectionException("Unable to resolve localHost address", e);
}
},
true);
}
@VisibleForTesting
TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindAddress,
DirectChannel receiver, BufferPool bufferPool, Properties props,
Function<TCPConduit, ConnectionTable> connectionTableFactory, SocketCreator socketCreator,
Runnable localHostValidation, boolean startAcceptor) throws ConnectionException {
parseProperties(props);
this.address = address;
this.isBindAddress = isBindAddress;
this.port = port;
directChannel = receiver;
this.bufferPool = bufferPool;
stats = null;
config = null;
membership = mgr;
if (directChannel != null) {
stats = directChannel.getDMStats();
config = directChannel.getDMConfig();
}
if (getStats() == null) {
stats = new LonerDistributionManager.DummyDMStats();
}
conTable = connectionTableFactory.apply(this);
this.socketCreator = socketCreator;
useSSL = socketCreator.forAdvancedUse().useSSL();
if (address == null) {
localHostValidation.run();
}
if (startAcceptor) {
startAcceptor();
}
}
public static void init() {
// only use direct buffers if we are using nio
LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
BACKLOG = Integer.getInteger("p2p.backlog", 1280);
if (Boolean.getBoolean("p2p.oldIO")) {
logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
}
}
public Membership<InternalDistributedMember> getMembership() {
return membership;
}
public static int getBackLog() {
return BACKLOG;
}
/**
* parse instance-level properties from the given object
*/
private void parseProperties(Properties p) {
if (p != null) {
String s = p.getProperty("p2p.tcpBufferSize", String.valueOf(tcpBufferSize));
try {
tcpBufferSize = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("exception parsing p2p.tcpBufferSize", e);
}
if (tcpBufferSize < Connection.SMALL_BUFFER_SIZE) {
// enforce minimum
tcpBufferSize = Connection.SMALL_BUFFER_SIZE;
}
s = p.getProperty("p2p.idleConnectionTimeout", String.valueOf(idleConnectionTimeout));
try {
idleConnectionTimeout = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("exception parsing p2p.idleConnectionTimeout", e);
}
s = p.getProperty("membership_port_range_start");
try {
tcpPortRange[0] = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("Exception parsing membership-port-range start port.", e);
}
s = p.getProperty("membership_port_range_end");
try {
tcpPortRange[1] = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("Exception parsing membership-port-range end port.", e);
}
}
}
/**
* binds the server socket and gets threads going
*/
private void startAcceptor() throws ConnectionException {
int p = port;
createServerSocket();
int localPort;
try {
localPort = socket.getLocalPort();
id = new InetSocketAddress(socket.getInetAddress(), localPort);
stopped = false;
thread = new LoggingThread("P2P Listener Thread " + id, this);
try {
thread.setPriority(Thread.MAX_PRIORITY);
} catch (Exception e) {
logger.info("unable to set listener priority: {}", e.getMessage());
}
if (!Boolean.getBoolean("p2p.test.inhibitAcceptor")) {
thread.start();
} else {
logger.fatal(
"p2p.test.inhibitAcceptor was found to be set, inhibiting incoming tcp/ip connections");
socket.close();
}
} catch (IOException io) {
String s = "While creating ServerSocket on port " + p;
throw new ConnectionException(s, io);
}
port = localPort;
}
/**
* creates the server sockets. This can be used to recreate the socket using this.port and
* this.bindAddress, which must be set before invoking this method.
*/
private void createServerSocket() {
int serverPort = port;
int connectionRequestBacklog = BACKLOG;
InetAddress bindAddress = address;
try {
if (serverPort <= 0) {
socket = socketCreator.forAdvancedUse().createServerSocketUsingPortRange(bindAddress,
connectionRequestBacklog, isBindAddress, true, 0, tcpPortRange,
socketCreator.forAdvancedUse().useSSL());
} else {
ServerSocketChannel channel = ServerSocketChannel.open();
socket = channel.socket();
InetSocketAddress inetSocketAddress =
new InetSocketAddress(isBindAddress ? bindAddress : null, serverPort);
socket.bind(inetSocketAddress, connectionRequestBacklog);
}
try {
// set these buffers early so that large buffers will be allocated
// on accepted sockets (see java.net.ServerSocket.setReceiverBufferSize javadocs)
socket.setReceiveBufferSize(tcpBufferSize);
int newSize = socket.getReceiveBufferSize();
if (newSize != tcpBufferSize) {
logger.info("{} is {} instead of the requested {}",
"Listener receiverBufferSize", newSize, tcpBufferSize);
}
} catch (SocketException ex) {
logger.warn("Failed to set listener receiverBufferSize to {}",
tcpBufferSize);
}
channel = socket.getChannel();
port = socket.getLocalPort();
} catch (IOException io) {
throw new ConnectionException(
String.format("While creating ServerSocket on port %s with address %s",
serverPort, bindAddress),
io);
}
}
/**
* Close the ServerSocketChannel, ServerSocket, and the ConnectionTable.
*
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
// stop(); // Causes grief
if (stopped) {
return;
}
stopped = true;
try {
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point.
// Doing so interferes with the channel's socket logic.
} else {
if (socket != null) {
socket.close();
}
}
} catch (IOException e) {
// ignore, please!
}
// this.conTable.close(); not safe against deadlocks
ConnectionTable.emergencyClose();
socket = null;
thread = null;
conTable = null;
}
/**
* stops the conduit, closing all tcp/ip connections
*/
public void stop(Exception cause) {
if (!stopped) {
stopped = true;
shutdownCause = cause;
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "Shutting down conduit");
}
try {
// set timeout endpoint here since interrupt() has been known to hang
long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
Thread t = thread;
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point;
// doing so interferes with the channel's socket logic.
} else {
ServerSocket s = socket;
if (s != null) {
s.close();
}
if (t != null) {
t.interrupt();
}
}
do {
t = thread;
if (t == null || !t.isAlive()) {
break;
}
t.join(200);
} while (timeout > System.currentTimeMillis());
if (t != null && t.isAlive()) {
logger.warn(
"Unable to shut down listener within {}ms. Unable to interrupt socket.accept() due to JDK bug. Giving up.",
LISTENER_CLOSE_TIMEOUT);
}
} catch (IOException | InterruptedException e) {
// we're already trying to shutdown, ignore
}
// close connections after shutting down acceptor to fix bug 30695
conTable.close();
socket = null;
thread = null;
conTable = null;
}
}
/**
* starts the conduit again after it's been stopped. This will clear the server map if the
* conduit's port is zero (wildcard bind)
*/
public void restart() throws ConnectionException {
if (!stopped) {
return;
}
stats = null;
if (directChannel != null) {
stats = directChannel.getDMStats();
}
if (getStats() == null) {
stats = new LonerDistributionManager.DummyDMStats();
}
conTable = ConnectionTable.create(this);
startAcceptor();
}
/**
* this is the server socket listener thread's run loop
*/
@Override
public void run() {
ConnectionTable.threadWantsSharedResources();
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "Starting P2P Listener on {}", id);
}
for (;;) {
SystemFailure.checkFailure();
if (stopper.isCancelInProgress()) {
break;
}
if (stopped) {
break;
}
if (Thread.currentThread().isInterrupted()) {
break;
}
Socket othersock = null;
try {
SocketChannel otherChannel = channel.accept();
othersock = otherChannel.socket();
if (stopped) {
try {
if (othersock != null) {
othersock.close();
}
} catch (Exception e) {
// ignored
}
continue;
}
acceptConnection(othersock);
} catch (ClosedByInterruptException cbie) {
// safe to ignore
} catch (ClosedChannelException | CancelException e) {
break;
} catch (IOException e) {
getStats().incFailedAccept();
try {
if (othersock != null) {
othersock.close();
}
} catch (IOException ignore) {
}
if (!stopped) {
if (e instanceof SocketException && "Socket closed".equalsIgnoreCase(e.getMessage())) {
// safe to ignore
if (!socket.isClosed()) {
logger.warn("ServerSocket threw 'socket closed' exception but says it is not closed",
e);
try {
socket.close();
createServerSocket();
} catch (IOException ioe) {
logger.fatal("Unable to close and recreate server socket", ioe);
// post 5.1.0x, this should force shutdown
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
// Don't reset; we're just exiting the thread
logger.info("Interrupted and exiting while trying to recreate listener sockets");
return;
}
}
}
} else if ("Too many open files".equals(e.getMessage())) {
getConTable().fileDescriptorsExhausted();
} else {
logger.warn(e.getMessage(), e);
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
if (!stopped && socket.isClosed()) {
// NOTE: do not check for distributed system closing here.
// Messaging may need to occur during the closing of the DS or cache
logger.warn("ServerSocket closed - reopening");
try {
createServerSocket();
} catch (ConnectionException ex) {
logger.warn(ex.getMessage(), ex);
}
}
}
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace("Stopped P2P Listener on {}", id);
}
}
ConnectionTable getConTable() {
ConnectionTable result = conTable;
if (result == null) {
stopper.checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException("tcp layer has been shutdown");
}
return result;
}
private void acceptConnection(Socket otherSocket) {
try {
getConTable().acceptConnection(otherSocket, new PeerConnectionFactory());
} catch (IOException | ConnectionException io) {
// exception is logged by the Connection
if (!stopped) {
getStats().incFailedAccept();
}
} catch (CancelException e) {
// ignored
} catch (Exception e) {
if (!stopped) {
getStats().incFailedAccept();
logger.warn("Failed to accept connection from {} because {}",
otherSocket.getInetAddress(), e);
}
}
}
/**
* records the current outgoing message count on all thread-owned ordered connections
*
* @since GemFire 5.1
*/
public void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
getConTable().getThreadOwnedOrderedConnectionState(member, result);
}
/**
* wait for the incoming connections identified by the keys in the argument to receive and
* dispatch the number of messages associated with the key
*
* @since GemFire 5.1
*/
public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState)
throws InterruptedException {
getConTable().waitForThreadOwnedOrderedConnectionState(member, channelState);
}
/**
* connections send messageReceived when a message object has been read.
*
* @param bytesRead number of bytes read off of network to get this message
*/
void messageReceived(Connection receiver, DistributionMessage message, int bytesRead)
throws MemberShunnedException {
if (logger.isTraceEnabled()) {
logger.trace("{} received {} from {}", id, message, receiver);
}
if (directChannel != null) {
message.setBytesRead(bytesRead);
message.setSender(receiver.getRemoteAddress());
message.setSharedReceiver(receiver.isSharedResource());
directChannel.receive(message, bytesRead);
}
}
/**
* gets the address of this conduit's ServerSocket endpoint
*/
public InetSocketAddress getSocketId() {
return id;
}
/**
* gets the actual port to which this conduit's ServerSocket is bound
*/
public int getPort() {
return id.getPort();
}
/**
* Gets the local member ID that identifies this conduit
*/
public InternalDistributedMember getMemberId() {
return localAddr;
}
public void setMemberId(InternalDistributedMember addr) {
localAddr = addr;
}
public DistributionConfig getConfig() {
return config;
}
/**
* Return a connection to the given member. This method must continue to attempt to create a
* connection to the given member as long as that member is in the membership view and the system
* is not shutting down.
*
* @param memberAddress the IDS associated with the remoteId
* @param preserveOrder whether this is an ordered or unordered connection
* @param startTime the time this operation started
* @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted
* (or zero)
*
* @return the connection
*/
public Connection getConnection(InternalDistributedMember memberAddress,
final boolean preserveOrder, long startTime, long ackTimeout,
long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
if (stopped) {
throw new DistributedSystemDisconnectedException("The conduit is stopped");
}
InternalDistributedMember memberInTrouble = null;
Connection conn = null;
for (boolean breakLoop = false;;) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
// If this is the second time through this loop, we had problems.
// Tear down the connection so that it gets rebuilt.
if (conn != null) { // not first time in loop
if (!membership.memberExists(memberAddress)
|| membership.isShunned(memberAddress)
|| membership.shutdownInProgress()) {
throw new IOException("TCP/IP connection lost and member is not in view");
}
// Member is still in view; we MUST NOT give up!
// Pause just a tiny bit...
try {
Thread.sleep(100);
} catch (InterruptedException e) {
interrupted = true;
stopper.checkCancelInProgress(e);
}
// try again after sleep
if (!membership.memberExists(memberAddress)
|| membership.isShunned(memberAddress)) {
// OK, the member left. Just register an error.
throw new IOException("TCP/IP connection lost and member is not in view");
}
// Print a warning (once)
if (memberInTrouble == null) {
memberInTrouble = memberAddress;
logger.warn("Attempting TCP/IP reconnect to {}", memberInTrouble);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
}
}
// Close the connection (it will get rebuilt later).
getStats().incReconnectAttempts();
try {
if (logger.isDebugEnabled()) {
logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}",
conn, memberInTrouble);
}
conn.closeForReconnect("closing before retrying");
} catch (CancelException ex) {
throw ex;
} catch (Exception ignored) {
}
} // not first time in loop
Exception problem = null;
try {
// Get (or regenerate) the connection
// this could generate a ConnectionException, so it must be caught and retried
boolean retryForOldConnection;
boolean debugRetry = false;
do {
retryForOldConnection = false;
conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout,
ackSATimeout, false);
if (conn == null) {
// conduit may be closed - otherwise an ioexception would be thrown
problem = new IOException(
String.format("Unable to reconnect to server; possible shutdown: %s",
memberAddress));
} else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
if (logger.isDebugEnabled()) {
logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn,
conn.hashCode());
}
conn.closeOldConnection("closing old connection");
conn = null;
retryForOldConnection = true;
debugRetry = true;
}
} while (retryForOldConnection);
if (debugRetry && logger.isDebugEnabled()) {
logger.debug("Done removing old connections");
}
// we have a connection; fall through and return it
} catch (ConnectionException e) {
// Race condition between acquiring the connection and attempting
// to use it: another thread closed it.
problem = e;
// No need to retry since Connection.createSender has already
// done retries and now member is really unreachable for some reason
// even though it may be in the view
breakLoop = true;
} catch (IOException e) {
problem = e;
// don't keep trying to connect to an alert listener
if (AlertingAction.isThreadAlerting()) {
if (logger.isDebugEnabled()) {
logger.debug("Giving up connecting to alert listener {}", memberAddress);
}
breakLoop = true;
}
}
if (problem != null) {
// Some problems are not recoverable; check and error out early.
if (!membership.memberExists(memberAddress)
|| membership.isShunned(memberAddress)) { // left the view
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
logger.info("Ending reconnect attempt because {} has disappeared.", memberInTrouble);
}
throw new IOException(
String.format("Peer has disappeared from view: %s", memberAddress));
} // left the view
if (membership.shutdownInProgress()) { // shutdown in progress
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
logger.info("Ending reconnect attempt to {} because shutdown has started.",
memberInTrouble);
}
stopper.checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(
"Abandoned because shutdown is in progress");
} // shutdown in progress
// Log the warning. We wait until now, because we want
// to have m defined for a nice message...
if (memberInTrouble == null) {
logger.warn("Error sending message to {} (will reattempt): {}", memberAddress, problem);
memberInTrouble = memberAddress;
} else {
if (logger.isDebugEnabled()) {
logger.debug("Error sending message to {}", memberAddress, problem);
}
}
if (breakLoop) {
if (problem instanceof IOException) {
if (problem.getMessage().startsWith("Cannot form connection to alert listener")) {
throw new AlertingIOException((IOException) problem);
}
throw (IOException) problem;
}
throw new IOException(
String.format("Problem connecting to %s", memberAddress), problem);
}
// Retry the operation (indefinitely)
continue;
}
// Success!
// Make sure our logging is bracketed if there was a problem
if (memberInTrouble != null) {
logger.info("Successfully reconnected to member {}", memberInTrouble);
if (logger.isTraceEnabled()) {
logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
}
}
return conn;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Return a connection to the given member. This method performs quick scan for connection.
* Only one attempt to create a connection to the given member .
*
* @param memberAddress the IDS associated with the remoteId
* @param preserveOrder whether this is an ordered or unordered connection
* @param startTime the time this operation started
* @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted
* (or zero)
*
* @return the connection
*/
public Connection getFirstScanForConnection(InternalDistributedMember memberAddress,
final boolean preserveOrder, long startTime, long ackTimeout,
long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
if (stopped) {
throw new DistributedSystemDisconnectedException("The conduit is stopped");
}
Connection connection = null;
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
Exception problem = null;
try {
connection = getConnectionThatIsNotClosed(memberAddress, preserveOrder, startTime,
ackTimeout, ackSATimeout);
// we have a connection; fall through and return it
} catch (ConnectionException e) {
// Race condition between acquiring the connection and attempting
// to use it: another thread closed it.
problem = e;
// No need to retry since Connection.createSender has already
// done retries and now member is really unreachable for some reason
// even though it may be in the view
} catch (IOException e) {
problem = e;
// don't keep trying to connect to an alert listener
if (AlertingAction.isThreadAlerting()) {
if (logger.isDebugEnabled()) {
logger.debug("Giving up connecting to alert listener {}", memberAddress);
}
}
}
if (problem != null) {
if (problem instanceof IOException) {
if (problem.getMessage() != null
&& problem.getMessage().startsWith("Cannot form connection to alert listener")) {
throw new AlertingIOException((IOException) problem);
}
throw (IOException) problem;
}
throw new IOException(
String.format("Problem connecting to %s", memberAddress), problem);
}
// Success!
return connection;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
private Connection getConnectionThatIsNotClosed(InternalDistributedMember memberAddress,
final boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
throws IOException, ConnectionException {
boolean debugEnabled = logger.isDebugEnabled();
Connection connection;
while (true) {
connection = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout,
ackSATimeout, true);
if (connection == null) {
throw new IOException("Unable to reconnect to server; possible shutdown: " + memberAddress);
}
if (!connection.isClosing() && connection.getRemoteAddress().equals(memberAddress)) {
return connection;
}
if (debugEnabled) {
logger.debug("Got an old connection for {}: {}@{}", memberAddress, connection,
connection.hashCode());
}
connection.closeOldConnection("closing old connection");
}
}
@Override
public String toString() {
return String.valueOf(id);
}
/**
* Returns the distribution manager of the direct channel
*/
public DistributionManager getDM() {
return directChannel.getDM();
}
public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
ConnectionTable ct = conTable;
if (ct == null) {
return;
}
ct.removeEndpoint(mbr, reason, notifyDisconnect);
}
/**
* check to see if there are still any receiver threads for the given end-point
*/
public boolean hasReceiversFor(DistributedMember endPoint) {
ConnectionTable ct = conTable;
return ct != null && ct.hasReceiversFor(endPoint);
}
/**
* Stats from the delegate
*/
public DMStats getStats() {
return stats;
}
public boolean useSSL() {
return useSSL;
}
public BufferPool getBufferPool() {
return bufferPool;
}
public CancelCriterion getCancelCriterion() {
return stopper;
}
/**
* if the conduit is disconnected due to an abnormal condition, this will describe the reason
*
* @return exception that caused disconnect
*/
Exception getShutdownCause() {
return shutdownCause;
}
/**
* returns the SocketCreator that should be used to produce sockets for TCPConduit connections.
*/
protected SocketCreator getSocketCreator() {
return socketCreator;
}
/**
* Called by Connection before handshake reply is sent. Returns true if member is part of
* view, false if membership is not confirmed before timeout.
*/
boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
return membership.waitForNewMember(remoteId);
}
private class Stopper extends CancelCriterion {
@Override
public String cancelInProgress() {
DistributionManager dm = getDM();
if (dm == null) {
return "no distribution manager";
}
if (stopped) {
return "Conduit has been stopped";
}
return null;
}
@Override
public RuntimeException generateCancelledException(Throwable e) {
String reason = cancelInProgress();
if (reason == null) {
return null;
}
DistributionManager dm = getDM();
if (dm == null) {
return new DistributedSystemDisconnectedException("no distribution manager");
}
RuntimeException result = dm.getCancelCriterion().generateCancelledException(e);
if (result != null) {
return result;
}
// We know we've been stopped; generate the exception
result = new DistributedSystemDisconnectedException("Conduit has been stopped", e);
return result;
}
}
}