blob: 699c7067d8ba3737aac53ac84b398820c516af98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.tcp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Properties;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.LonerDistributionManager;
import org.apache.geode.distributed.internal.direct.DirectChannel;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.internal.alerting.AlertingAction;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
/**
* <p>
* TCPConduit manages a server socket and a collection of connections to other systems. Connections
* are identified by DistributedMember IDs. These types of messages are currently supported:
* </p>
*
* <pre>
* <p>
* DistributionMessage - message is delivered to the server's
* ServerDelegate
* <p>
* </pre>
* <p>
* In the current implementation, ServerDelegate is the DirectChannel used by the GemFire
* DistributionManager to send and receive messages.
* <p>
* If the ServerDelegate is null, DistributionMessages are ignored by the TCPConduit.
* </p>
*
* @since GemFire 2.0
*/
public class TCPConduit implements Runnable {
private static final Logger logger = LogService.getLogger();
/**
* max amount of time (ms) to wait for listener threads to stop
*/
@MakeNotStatic
private static int LISTENER_CLOSE_TIMEOUT;
/**
* BACKLOG is the "accept" backlog configuration parameter for the conduits server socket.
* In most operating systems this is limited to 128 by default and you must change
* the OS setting somaxconn to go beyond that limit. Note that setting this too high
* can have ramifications when disconnecting from the distributed system if a thread
* is trying to connect to another member that is not accepting connections quickly
* enough. Setting it too low can also have adverse effects because backlog overflows
* aren't handled well by most tcp/ip implementations, causing connect timeouts instead
* of expected ServerRefusedConnection exceptions.
* <p>
* Normally the backlog isn't that important because if it's full of connection requests
* a SYN "cookie" mechanism is used to bypass the backlog queue. If this is turned off
* though connection requests are dropped when the queue is full.
*/
@MakeNotStatic
private static int BACKLOG;
/**
* use javax.net.ssl.SSLServerSocketFactory?
*/
boolean useSSL;
/**
* The socket producer used by the cluster
*/
private final SocketCreator socketCreator;
private MembershipManager membershipManager;
static {
init();
}
public MembershipManager getMembershipManager() {
return membershipManager;
}
public static int getBackLog() {
return BACKLOG;
}
public static void init() {
// only use direct buffers if we are using nio
LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
// note: bug 37730 concerned this defaulting to 50
BACKLOG = Integer.getInteger("p2p.backlog", 1280);
if (Boolean.getBoolean("p2p.oldIO")) {
logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
}
}
///////////////// permanent conduit state
/**
* the size of OS TCP/IP buffers, not set by default
*/
int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
/**
* port is the tcp/ip port that this conduit binds to. If it is zero, a port from
* membership-port-range is selected to bind to. The actual port number this conduit is listening
* on will be in the "id" instance variable
*/
private int port;
private int[] tcpPortRange = new int[] {DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[0],
DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[1]};
/**
* The java groups address that this conduit is associated with
*/
private InternalDistributedMember localAddr;
/**
* address is the InetAddress that this conduit uses for identity
*/
private final InetAddress address;
/**
* isBindAddress is true if we should bind to the address
*/
private final boolean isBindAddress;
/**
* the object that receives DistributionMessage messages received by this conduit.
*/
private final DirectChannel directChannel;
private DMStats stats;
/**
* Config from the delegate
*
* @since GemFire 4.2.1
*/
DistributionConfig config;
////////////////// runtime state that is re-initialized on a restart
/**
* server socket address
*/
private InetSocketAddress id;
protected volatile boolean stopped;
/**
* the listener thread
*/
private Thread thread;
/**
* if using NIO, this is the object used for accepting connections
*/
private ServerSocketChannel channel;
/**
* the server socket
*/
private ServerSocket socket;
/**
* a table of Connections from this conduit to others
*/
private ConnectionTable conTable;
/**
* <p>
* creates a new TCPConduit bound to the given InetAddress and port. The given ServerDelegate will
* receive any DistributionMessages passed to the conduit.
* </p>
* <p>
* This constructor forces the conduit to ignore the following system properties and look for them
* only in the <i>props</i> argument:
* </p>
*
* <pre>
* p2p.tcpBufferSize
* p2p.idleConnectionTimeout
* </pre>
*/
public TCPConduit(MembershipManager mgr, int port, InetAddress address, boolean isBindAddress,
DirectChannel receiver, Properties props) throws ConnectionException {
parseProperties(props);
this.address = address;
this.isBindAddress = isBindAddress;
this.port = port;
this.directChannel = receiver;
this.stats = null;
this.config = null;
this.membershipManager = mgr;
if (directChannel != null) {
this.stats = directChannel.getDMStats();
this.config = directChannel.getDMConfig();
}
if (this.getStats() == null) {
this.stats = new LonerDistributionManager.DummyDMStats();
}
try {
this.conTable = ConnectionTable.create(this);
} catch (IOException io) {
throw new ConnectionException(
"Unable to initialize connection table",
io);
}
this.socketCreator =
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
this.useSSL = socketCreator.useSSL();
InetAddress addr = address;
if (addr == null) {
try {
addr = SocketCreator.getLocalHost();
} catch (java.net.UnknownHostException e) {
throw new ConnectionException("Unable to resolve localHost address", e);
}
}
startAcceptor();
}
/**
* parse instance-level properties from the given object
*/
private void parseProperties(Properties p) {
if (p != null) {
String s;
s = p.getProperty("p2p.tcpBufferSize", "" + tcpBufferSize);
try {
tcpBufferSize = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("exception parsing p2p.tcpBufferSize",
e);
}
if (tcpBufferSize < Connection.SMALL_BUFFER_SIZE) {
// enforce minimum
tcpBufferSize = Connection.SMALL_BUFFER_SIZE;
}
s = p.getProperty("p2p.idleConnectionTimeout", "" + idleConnectionTimeout);
try {
idleConnectionTimeout = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("exception parsing p2p.idleConnectionTimeout", e);
}
s = p.getProperty("membership_port_range_start");
try {
tcpPortRange[0] = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("Exception parsing membership-port-range start port.", e);
}
s = p.getProperty("membership_port_range_end");
try {
tcpPortRange[1] = Integer.parseInt(s);
} catch (Exception e) {
logger.warn("Exception parsing membership-port-range end port.",
e);
}
}
}
/**
* the reason for a shutdown, if abnormal
*/
private volatile Exception shutdownCause;
/**
* binds the server socket and gets threads going
*/
private void startAcceptor() throws ConnectionException {
int localPort;
int p = this.port;
createServerSocket();
try {
localPort = socket.getLocalPort();
id = new InetSocketAddress(socket.getInetAddress(), localPort);
stopped = false;
thread = new LoggingThread("P2P Listener Thread " + id, this);
try {
thread.setPriority(Thread.MAX_PRIORITY);
} catch (Exception e) {
logger.info("unable to set listener priority: {}", e.getMessage());
}
if (!Boolean.getBoolean("p2p.test.inhibitAcceptor")) {
thread.start();
} else {
logger.fatal(
"p2p.test.inhibitAcceptor was found to be set, inhibiting incoming tcp/ip connections");
socket.close();
}
} catch (IOException io) {
String s = "While creating ServerSocket on port " + p;
throw new ConnectionException(s, io);
}
this.port = localPort;
}
/**
* creates the server sockets. This can be used to recreate the socket using this.port and
* this.bindAddress, which must be set before invoking this method.
*/
private void createServerSocket() {
int serverPort = this.port;
int connectionRequestBacklog = BACKLOG;
InetAddress bindAddress = this.address;
try {
if (serverPort <= 0) {
socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
connectionRequestBacklog, isBindAddress,
true, 0, tcpPortRange);
} else {
ServerSocketChannel channel = ServerSocketChannel.open();
socket = channel.socket();
InetSocketAddress inetSocketAddress =
new InetSocketAddress(isBindAddress ? bindAddress : null, serverPort);
socket.bind(inetSocketAddress, connectionRequestBacklog);
}
try {
// set these buffers early so that large buffers will be allocated
// on accepted sockets (see java.net.ServerSocket.setReceiverBufferSize javadocs)
socket.setReceiveBufferSize(tcpBufferSize);
int newSize = socket.getReceiveBufferSize();
if (newSize != tcpBufferSize) {
logger.info("{} is {} instead of the requested {}",
"Listener receiverBufferSize", newSize,
tcpBufferSize);
}
} catch (SocketException ex) {
logger.warn("Failed to set listener receiverBufferSize to {}",
tcpBufferSize);
}
channel = socket.getChannel();
port = socket.getLocalPort();
} catch (IOException io) {
throw new ConnectionException(
String.format("While creating ServerSocket on port %s with address %s",
serverPort, bindAddress),
io);
}
}
/**
* Ensure that the ConnectionTable class gets loaded.
*
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
ConnectionTable.loadEmergencyClasses();
}
/**
* Close the ServerSocketChannel, ServerSocket, and the ConnectionTable.
*
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
// stop(); // Causes grief
if (stopped) {
return;
}
stopped = true;
try {
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point.
// Doing so interferes with the channel's socket logic.
} else {
if (socket != null) {
socket.close();
}
}
} catch (IOException e) {
// ignore, please!
}
// this.conTable.close(); not safe against deadlocks
ConnectionTable.emergencyClose();
socket = null;
thread = null;
conTable = null;
}
/* stops the conduit, closing all tcp/ip connections */
public void stop(Exception cause) {
if (!stopped) {
stopped = true;
shutdownCause = cause;
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "Shutting down conduit");
}
try {
// set timeout endpoint here since interrupt() has been known
// to hang
long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
Thread t = this.thread;
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point.
// Doing so interferes with the channel's socket logic.
} else {
ServerSocket s = this.socket;
if (s != null) {
s.close();
}
if (t != null) {
t.interrupt();
}
}
do {
t = this.thread;
if (t == null || !t.isAlive()) {
break;
}
t.join(200);
} while (timeout > System.currentTimeMillis());
if (t != null && t.isAlive()) {
logger.warn(
"Unable to shut down listener within {}ms. Unable to interrupt socket.accept() due to JDK bug. Giving up.",
LISTENER_CLOSE_TIMEOUT);
}
} catch (IOException | InterruptedException e) {
// we're already trying to shutdown, ignore
}
// close connections after shutting down acceptor to fix bug 30695
this.conTable.close();
socket = null;
thread = null;
conTable = null;
}
}
/**
* Returns whether or not this conduit is stopped
*
* @since GemFire 3.0
*/
public boolean isStopped() {
return this.stopped;
}
/**
* starts the conduit again after it's been stopped. This will clear the server map if the
* conduit's port is zero (wildcard bind)
*/
public void restart() throws ConnectionException {
if (!stopped) {
return;
}
this.stats = null;
if (directChannel != null) {
this.stats = directChannel.getDMStats();
}
if (this.getStats() == null) {
this.stats = new LonerDistributionManager.DummyDMStats();
}
try {
this.conTable = ConnectionTable.create(this);
} catch (IOException io) {
throw new ConnectionException(
"Unable to initialize connection table",
io);
}
startAcceptor();
}
/**
* this is the server socket listener thread's run loop
*/
@Override
public void run() {
ConnectionTable.threadWantsSharedResources();
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "Starting P2P Listener on {}", id);
}
for (;;) {
SystemFailure.checkFailure();
if (stopper.isCancelInProgress()) {
break;
}
if (stopped) {
break;
}
if (Thread.currentThread().isInterrupted()) {
break;
}
if (stopper.isCancelInProgress()) {
break; // part of bug 37271
}
Socket othersock = null;
try {
SocketChannel otherChannel = channel.accept();
othersock = otherChannel.socket();
if (stopped) {
try {
if (othersock != null) {
othersock.close();
}
} catch (Exception e) {
}
continue;
}
acceptConnection(othersock);
} catch (ClosedByInterruptException cbie) {
// safe to ignore
} catch (ClosedChannelException | CancelException e) {
break;
} catch (IOException e) {
this.getStats().incFailedAccept();
try {
if (othersock != null) {
othersock.close();
}
} catch (IOException ignore) {
}
if (!stopped) {
if (e instanceof SocketException && "Socket closed".equalsIgnoreCase(e.getMessage())) {
// safe to ignore; see bug 31156
if (!socket.isClosed()) {
logger.warn("ServerSocket threw 'socket closed' exception but says it is not closed",
e);
try {
socket.close();
createServerSocket();
} catch (IOException ioe) {
logger.fatal("Unable to close and recreate server socket",
ioe);
// post 5.1.0x, this should force shutdown
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
// Don't reset; we're just exiting the thread
logger.info("Interrupted and exiting while trying to recreate listener sockets");
return;
}
}
}
} else if ("Too many open files".equals(e.getMessage())) {
getConTable().fileDescriptorsExhausted();
} else {
logger.warn(e.getMessage(), e);
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
if (!stopped && socket.isClosed()) {
// NOTE: do not check for distributed system closing here. Messaging
// may need to occur during the closing of the DS or cache
logger.warn("ServerSocket closed - reopening");
try {
createServerSocket();
} catch (ConnectionException ex) {
logger.warn(ex.getMessage(), ex);
}
}
} // for
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace("Stopped P2P Listener on {}", id);
}
}
private ConnectionTable getConTable() {
ConnectionTable result = this.conTable;
if (result == null) {
stopper.checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(
"tcp layer has been shutdown");
}
return result;
}
private void acceptConnection(Socket othersock) {
try {
getConTable().acceptConnection(othersock, new PeerConnectionFactory());
} catch (IOException | ConnectionException io) {
// exception is logged by the Connection
if (!stopped) {
this.getStats().incFailedAccept();
}
} catch (CancelException e) {
} catch (Exception e) {
if (!stopped) {
this.getStats().incFailedAccept();
logger.warn("Failed to accept connection from {} because {}",
othersock.getInetAddress(), e);
}
}
}
/**
* records the current outgoing message count on all thread-owned ordered connections
*
* @since GemFire 5.1
*/
public void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
getConTable().getThreadOwnedOrderedConnectionState(member, result);
}
/**
* wait for the incoming connections identified by the keys in the argument to receive and
* dispatch the number of messages associated with the key
*
* @since GemFire 5.1
*/
public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState)
throws InterruptedException {
getConTable().waitForThreadOwnedOrderedConnectionState(member, channelState);
}
/**
* connections send messageReceived when a message object has been read.
*
* @param bytesRead number of bytes read off of network to get this message
*/
protected void messageReceived(Connection receiver, DistributionMessage message, int bytesRead) {
if (logger.isTraceEnabled()) {
logger.trace("{} received {} from {}", id, message, receiver);
}
if (directChannel != null) {
DistributionMessage msg = message;
msg.setBytesRead(bytesRead);
msg.setSender(receiver.getRemoteAddress());
msg.setSharedReceiver(receiver.isSharedResource());
directChannel.receive(msg, bytesRead);
}
}
/**
* gets the address of this conduit's ServerSocket endpoint
*/
public InetSocketAddress getSocketId() {
return id;
}
/**
* gets the actual port to which this conduit's ServerSocket is bound
*/
public int getPort() {
return id.getPort();
}
/**
* Gets the local member ID that identifies this conduit
*/
public InternalDistributedMember getMemberId() {
return this.localAddr;
}
public void setMemberId(InternalDistributedMember addr) {
localAddr = addr;
}
/**
* Return a connection to the given member. This method must continue to attempt to create a
* connection to the given member as long as that member is in the membership view and the system
* is not shutting down.
*
* @param memberAddress the IDS associated with the remoteId
* @param preserveOrder whether this is an ordered or unordered connection
* @param retry false if this is the first attempt
* @param startTime the time this operation started
* @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted
* (or zero)
*
* @return the connection
*/
public Connection getConnection(InternalDistributedMember memberAddress,
final boolean preserveOrder, boolean retry, long startTime, long ackTimeout,
long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException {
if (stopped) {
throw new DistributedSystemDisconnectedException(
"The conduit is stopped");
}
Connection conn = null;
InternalDistributedMember memberInTrouble = null;
boolean breakLoop = false;
for (;;) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
// If this is the second time through this loop, we had
// problems. Tear down the connection so that it gets
// rebuilt.
if (retry || conn != null) { // not first time in loop
if (!membershipManager.memberExists(memberAddress)
|| membershipManager.isShunned(memberAddress)
|| membershipManager.shutdownInProgress()) {
throw new IOException(
"TCP/IP connection lost and member is not in view");
}
// bug35953: Member is still in view; we MUST NOT give up!
// Pause just a tiny bit...
try {
Thread.sleep(100);
} catch (InterruptedException e) {
interrupted = true;
stopper.checkCancelInProgress(e);
}
// try again after sleep
if (!membershipManager.memberExists(memberAddress)
|| membershipManager.isShunned(memberAddress)) {
// OK, the member left. Just register an error.
throw new IOException(
"TCP/IP connection lost and member is not in view");
}
// Print a warning (once)
if (memberInTrouble == null) {
memberInTrouble = memberAddress;
logger.warn("Attempting TCP/IP reconnect to {}", memberInTrouble);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
}
}
// Close the connection (it will get rebuilt later).
this.getStats().incReconnectAttempts();
if (conn != null) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}",
conn, memberInTrouble);
}
conn.closeForReconnect("closing before retrying");
} catch (CancelException ex) {
throw ex;
} catch (Exception ex) {
}
}
} // not first time in loop
Exception problem = null;
try {
// Get (or regenerate) the connection
// bug36202: this could generate a ConnectionException, so it
// must be caught and retried
boolean retryForOldConnection;
boolean debugRetry = false;
do {
retryForOldConnection = false;
conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout,
ackSATimeout);
if (conn == null) {
// conduit may be closed - otherwise an ioexception would be thrown
problem = new IOException(
String.format("Unable to reconnect to server; possible shutdown: %s",
memberAddress));
} else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
if (logger.isDebugEnabled()) {
logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn,
conn.hashCode());
}
conn.closeOldConnection("closing old connection");
conn = null;
retryForOldConnection = true;
debugRetry = true;
}
} while (retryForOldConnection);
if (debugRetry && logger.isDebugEnabled()) {
logger.debug("Done removing old connections");
}
// we have a connection; fall through and return it
} catch (ConnectionException e) {
// Race condition between acquiring the connection and attempting
// to use it: another thread closed it.
problem = e;
// [sumedh] No need to retry since Connection.createSender has already
// done retries and now member is really unreachable for some reason
// even though it may be in the view
breakLoop = true;
} catch (IOException e) {
problem = e;
// bug #43962 don't keep trying to connect to an alert listener
if (AlertingAction.isThreadAlerting()) {
if (logger.isDebugEnabled()) {
logger.debug("Giving up connecting to alert listener {}", memberAddress);
}
breakLoop = true;
}
}
if (problem != null) {
// Some problems are not recoverable; check and error out early.
if (!membershipManager.memberExists(memberAddress)
|| membershipManager.isShunned(memberAddress)) { // left the view
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
logger.info("Ending reconnect attempt because {} has disappeared.",
memberInTrouble);
}
throw new IOException(String.format("Peer has disappeared from view: %s",
memberAddress));
} // left the view
if (membershipManager.shutdownInProgress()) { // shutdown in progress
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
logger.info("Ending reconnect attempt to {} because shutdown has started.",
memberInTrouble);
}
stopper.checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(
"Abandoned because shutdown is in progress");
} // shutdown in progress
// Log the warning. We wait until now, because we want
// to have m defined for a nice message...
if (memberInTrouble == null) {
logger.warn("Error sending message to {} (will reattempt): {}",
memberAddress, problem);
memberInTrouble = memberAddress;
} else {
if (logger.isDebugEnabled()) {
logger.debug("Error sending message to {}", memberAddress, problem);
}
}
if (breakLoop) {
if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
logger.warn("Throwing IOException after finding breakLoop=true",
problem);
}
if (problem instanceof IOException) {
throw (IOException) problem;
} else {
IOException ioe = new IOException(String.format("Problem connecting to %s",
memberAddress));
ioe.initCause(problem);
throw ioe;
}
}
// Retry the operation (indefinitely)
continue;
} // problem != null
// Success!
// Make sure our logging is bracketed if there was a problem
if (memberInTrouble != null) {
logger.info("Successfully reconnected to member {}", memberInTrouble);
if (logger.isTraceEnabled()) {
logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
}
}
return conn;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for(;;)
}
@Override
public String toString() {
return "" + id;
}
/**
* Returns the distribution manager of the direct channel
*/
public DistributionManager getDM() {
return directChannel.getDM();
}
public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
ConnectionTable ct = this.conTable;
if (ct == null) {
return;
}
ct.removeEndpoint(mbr, reason, notifyDisconnect);
}
/**
* check to see if there are still any receiver threads for the given end-point
*/
public boolean hasReceiversFor(DistributedMember endPoint) {
ConnectionTable ct = this.conTable;
return (ct != null) && ct.hasReceiversFor(endPoint);
}
/**
* Stats from the delegate
*/
public DMStats getStats() {
return stats;
}
public boolean useSSL() {
return useSSL;
}
public BufferPool getBufferPool() {
return this.conTable.getBufferPool();
}
protected class Stopper extends CancelCriterion {
@Override
public String cancelInProgress() {
DistributionManager dm = getDM();
if (dm == null) {
return "no distribution manager";
}
if (TCPConduit.this.stopped) {
return "Conduit has been stopped";
}
return null;
}
@Override
public RuntimeException generateCancelledException(Throwable e) {
String reason = cancelInProgress();
if (reason == null) {
return null;
}
DistributionManager dm = getDM();
if (dm == null) {
return new DistributedSystemDisconnectedException("no distribution manager");
}
RuntimeException result = dm.getCancelCriterion().generateCancelledException(e);
if (result != null) {
return result;
}
// We know we've been stopped; generate the exception
result = new DistributedSystemDisconnectedException("Conduit has been stopped");
result.initCause(e);
return result;
}
}
private final Stopper stopper = new Stopper();
public CancelCriterion getCancelCriterion() {
return stopper;
}
/**
* if the conduit is disconnected due to an abnormal condition, this will describe the reason
*
* @return exception that caused disconnect
*/
public Exception getShutdownCause() {
return this.shutdownCause;
}
/**
* returns the SocketCreator that should be used to produce sockets for TCPConduit connections.
*/
protected SocketCreator getSocketCreator() {
return socketCreator;
}
/**
* ARB: Called by Connection before handshake reply is sent. Returns true if member is part of
* view, false if membership is not confirmed before timeout.
*/
public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
return membershipManager.waitForNewMember(remoteId);
}
}