blob: dc90fec6e39dec94de49dd82333c8815974a7a85 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.tcp;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.LonerDistributionManager;
import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
/** <p>TCPConduit manages a server socket and a collection of connections to
other systems. Connections are identified by host/port Stubs.
These types of messages are currently supported:</p><pre>
DistributionMessage - message is delivered to the server's
ServerDelegate
</pre>
<p>In the current implementation, ServerDelegate is the DirectChannel
used by the GemFire DistributionManager to send and receive messages.<p>
If the ServerDelegate is null, DistributionMessages are ignored by
the TCPConduit.</p>
@author Bruce Schuchardt
@since 2.0
*/
public class TCPConduit implements Runnable {
private static final Logger logger = LogService.getLogger();
/** max amount of time (ms) to wait for listener threads to stop */
private static int LISTENER_CLOSE_TIMEOUT;
/** backlog is the "accept" backlog configuration parameter all
conduits server socket */
private static int BACKLOG;
/** use javax.net.ssl.SSLServerSocketFactory? */
static boolean useSSL;
// public final static boolean USE_SYNC_WRITES = Boolean.getBoolean("p2p.useSyncWrites");
/**
* Force use of Sockets rather than SocketChannels (NIO). Note from Bruce: due to
* a bug in the java VM, NIO cannot be used with IPv6 addresses on Windows.
* When that condition holds, the useNIO flag must be disregarded.
*/
private static boolean USE_NIO;
/** use direct ByteBuffers instead of heap ByteBuffers for NIO operations */
static boolean useDirectBuffers;
private volatile boolean inhibitNewConnections;
// private transient DistributedMembershipListener messageReceiver;
private MembershipManager membershipManager;
/** true if NIO can be used for the server socket */
private boolean useNIO;
static {
init();
}
public MembershipManager getMembershipManager() {
return membershipManager;
}
public static int getBackLog() {
return BACKLOG;
}
public static void init() {
useSSL = Boolean.getBoolean("p2p.useSSL");
// only use nio if not SSL
USE_NIO = !useSSL && !Boolean.getBoolean("p2p.oldIO");
// only use direct buffers if we are using nio
useDirectBuffers = USE_NIO && !Boolean.getBoolean("p2p.nodirectBuffers");
LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000).intValue();
// fix for bug 37730
BACKLOG = Integer.getInteger("p2p.backlog", HANDSHAKE_POOL_SIZE+1).intValue();
}
///////////////// permanent conduit state
/** the size of OS TCP/IP buffers, not set by default */
public int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
public int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
/** port is the tcp/ip port that this conduit binds to. If it is zero, a port
from membership-port-range is selected to bind to. The actual port number this
conduit is listening on will be in the "id" instance variable */
private int port;
private int[] tcpPortRange = new int[] { 1024, 65535 };
/** The java groups address that this conduit is associated with */
private InternalDistributedMember localAddr;
/** address is the InetAddress that this conduit uses for identity */
private final InetAddress address;
/** isBindAddress is true if we should bind to the address */
private final boolean isBindAddress;
/** the object that receives DistributionMessage messages
received by this conduit. */
private final DirectChannel directChannel;
/**
* Stats from the delegate
*/
DMStats stats;
/**
* Config from the delegate
* @since 4.2.1
*/
DistributionConfig config;
////////////////// runtime state that is re-initialized on a restart
/** id is an endpoint Stub representing this server. It holds the
actual port the server is listening on */
private Stub id;
protected volatile boolean stopped;
/** the listener thread */
private Thread thread;
/** if using NIO, this is the object used for accepting connections */
private ServerSocketChannel channel;
/** the server socket */
private ServerSocket socket;
/** a table of Connections from this conduit to others
*/
private ConnectionTable conTable;
/** <p>creates a new TCPConduit bound to the given InetAddress and port.
The given ServerDelegate will receive any DistributionMessages
passed to the conduit.</p>
<p>This constructor forces the conduit to ignore the following
system properties and look for them only in the <i>props</i> argument:</p>
<pre>
p2p.tcpBufferSize
p2p.idleConnectionTimeout
</pre>
*/
public TCPConduit(MembershipManager mgr, int port,
InetAddress address, boolean isBindAddress,
DirectChannel receiver, Properties props)
throws ConnectionException
{
parseProperties(props);
this.address = address;
this.isBindAddress = isBindAddress;
this.port = port;
this.directChannel = receiver;
this.stats = null;
this.config = null;
this.membershipManager = mgr;
if (directChannel != null) {
this.stats = directChannel.getDMStats();
this.config = directChannel.getDMConfig();
}
if (this.stats == null) {
this.stats = new LonerDistributionManager.DummyDMStats();
}
try {
this.conTable = ConnectionTable.create(this);
}
catch (IOException io) {
throw new ConnectionException(LocalizedStrings.TCPConduit_UNABLE_TO_INITIALIZE_CONNECTION_TABLE.toLocalizedString(), io);
}
this.useNIO = USE_NIO;
if (this.useNIO) {
InetAddress addr = address;
if (addr == null) {
try {
addr = SocketCreator.getLocalHost();
}
catch (java.net.UnknownHostException e) {
throw new ConnectionException("Unable to resolve localHost address", e);
}
}
// JDK bug 6230761 - NIO can't be used with IPv6 on Windows
if (addr instanceof Inet6Address) {
String os = System.getProperty("os.name");
if (os != null) {
if (os.indexOf("Windows") != -1) {
this.useNIO = false;
}
}
}
}
startAcceptor();
}
/** parse instance-level properties from the given object */
private void parseProperties(Properties p) {
if (p != null) {
String s;
s = p.getProperty("p2p.tcpBufferSize", ""+tcpBufferSize);
try { tcpBufferSize = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE), e); }
if (tcpBufferSize < Connection.SMALL_BUFFER_SIZE) {
// enforce minimum
tcpBufferSize = Connection.SMALL_BUFFER_SIZE;
}
s = p.getProperty("p2p.idleConnectionTimeout", ""+idleConnectionTimeout);
try { idleConnectionTimeout = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT), e); }
s = p.getProperty("membership_port_range_start");
try { tcpPortRange[0] = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGESTART), e); }
s = p.getProperty("membership_port_range_end");
try { tcpPortRange[1] = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGEEND), e); }
}
}
private ThreadPoolExecutor hsPool;
/** the reason for a shutdown, if abnormal */
private volatile Exception shutdownCause;
private final static int HANDSHAKE_POOL_SIZE = Integer.getInteger("p2p.HANDSHAKE_POOL_SIZE", 10).intValue();
private final static long HANDSHAKE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.HANDSHAKE_POOL_KEEP_ALIVE_TIME", 60).longValue();
/** added to fix bug 40436 */
public void setMaximumHandshakePoolSize(int maxSize) {
if (this.hsPool != null && maxSize > HANDSHAKE_POOL_SIZE) {
this.hsPool.setMaximumPoolSize(maxSize);
}
}
/** binds the server socket and gets threads going
*
* */
private void startAcceptor() throws ConnectionException {
int localPort;
int p = this.port;
InetAddress ba = this.address;
{
ThreadPoolExecutor tmp_hsPool = null;
String gName = "P2P-Handshaker " + ba + ":" + p;
final ThreadGroup socketThreadGroup
= LoggingThreadGroup.createThreadGroup(gName, logger);
ThreadFactory socketThreadFactory = new ThreadFactory() {
int connNum = -1;
public Thread newThread(Runnable command) {
int tnum;
synchronized (this) {
tnum = ++connNum;
}
String tName = socketThreadGroup.getName() + " Thread " + tnum;
return new Thread(socketThreadGroup, command, tName);
}
};
try {
final BlockingQueue bq = new SynchronousQueue();
final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
try {
bq.put(r);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt(); // preserve the state
throw new RejectedExecutionException(LocalizedStrings.TCPConduit_INTERRUPTED.toLocalizedString(), ex);
}
}
};
tmp_hsPool = new ThreadPoolExecutor(1,
HANDSHAKE_POOL_SIZE,
HANDSHAKE_POOL_KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
bq,
socketThreadFactory,
reh);
}
catch (IllegalArgumentException poolInitException) {
throw new ConnectionException(LocalizedStrings.TCPConduit_WHILE_CREATING_HANDSHAKE_POOL.toLocalizedString(), poolInitException);
}
this.hsPool = tmp_hsPool;
}
createServerSocket();
try {
localPort = socket.getLocalPort();
id = new Stub(socket.getInetAddress(), localPort, 0);
stopped = false;
ThreadGroup group =
LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger);
thread = new Thread(group, this, "P2P Listener Thread " + id);
thread.setDaemon(true);
try { thread.setPriority(thread.getThreadGroup().getMaxPriority()); }
catch (Exception e) {
logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_SET_LISTENER_PRIORITY__0, e.getMessage()));
}
if (!Boolean.getBoolean("p2p.test.inhibitAcceptor")) {
thread.start();
}
else {
logger.fatal(LocalizedMessage.create(LocalizedStrings.TCPConduit_INHIBITACCEPTOR));
socket.close();
this.hsPool.shutdownNow();
}
}
catch (IOException io) {
String s = "While creating ServerSocket and Stub on port " + p;
throw new ConnectionException(s, io);
}
this.port = localPort;
}
/**
* After startup we install the view ID into the conduit stub to avoid
* confusion during overlapping shutdown/startup from the same member.
*
* @param viewID
*/
public void setVmViewID(int viewID) {
this.id.setViewID(viewID);
}
/** creates the server sockets. This can be used to recreate the
* socket using this.port and this.bindAddress, which must be set
* before invoking this method.
*/
private void createServerSocket() {
int p = this.port;
int b = BACKLOG;
InetAddress ba = this.address;
try {
if (this.useNIO) {
if (p <= 0) {
socket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(ba, b, isBindAddress,
this.useNIO, 0, tcpPortRange);
} else {
ServerSocketChannel channl = ServerSocketChannel.open();
socket = channl.socket();
InetSocketAddress addr = new InetSocketAddress(isBindAddress ? ba : null, p);
socket.bind(addr, b);
}
if (useNIO) {
try {
// set these buffers early so that large buffers will be allocated
// on accepted sockets (see java.net.ServerSocket.setReceiverBufferSize javadocs)
socket.setReceiveBufferSize(tcpBufferSize);
int newSize = socket.getReceiveBufferSize();
if (newSize != tcpBufferSize) {
logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
new Object[] {"Listener receiverBufferSize", Integer.valueOf(newSize), Integer.valueOf(tcpBufferSize)}));
}
} catch (SocketException ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_SET_LISTENER_RECEIVERBUFFERSIZE_TO__0, tcpBufferSize));
}
}
channel = socket.getChannel();
}
else {
try {
if (p <= 0) {
socket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(ba, b, isBindAddress,
this.useNIO, this.tcpBufferSize, tcpPortRange);
} else {
socket = SocketCreator.getDefaultInstance().createServerSocket(p, b, isBindAddress? ba : null, this.tcpBufferSize);
}
int newSize = socket.getReceiveBufferSize();
if (newSize != this.tcpBufferSize) {
logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
new Object[] {"Listener receiverBufferSize", Integer.valueOf(newSize), Integer.valueOf(this.tcpBufferSize)}));
}
} catch (SocketException ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_SET_LISTENER_RECEIVERBUFFERSIZE_TO__0, this.tcpBufferSize));
}
}
port = socket.getLocalPort();
}
catch (IOException io) {
throw new ConnectionException( LocalizedStrings.TCPConduit_EXCEPTION_CREATING_SERVERSOCKET.toLocalizedString(
new Object[] {Integer.valueOf(p), ba}), io);
}
}
/**
* Ensure that the ConnectionTable class gets loaded.
*
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
ConnectionTable.loadEmergencyClasses();
}
/**
* Close the ServerSocketChannel, ServerSocket, and the
* ConnectionTable.
*
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
// stop(); // Causes grief
if (stopped) {
return;
}
stopped = true;
// System.err.println("DEBUG: TCPConduit emergencyClose");
try {
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point.
// Doing so interferes with the channel's socket logic.
}
else {
if (socket != null) {
socket.close();
}
}
}
catch (IOException e) {
// ignore, please!
}
// this.hsPool.shutdownNow(); // I don't trust this not to allocate objects or to synchronize
// this.conTable.close(); not safe against deadlocks
ConnectionTable.emergencyClose();
socket = null;
thread = null;
conTable = null;
// System.err.println("DEBUG: end of TCPConduit emergencyClose");
}
/* stops the conduit, closing all tcp/ip connections */
public void stop(Exception cause) {
if (!stopped) {
stopped = true;
shutdownCause = cause;
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "Shutting down conduit");
}
try {
// set timeout endpoint here since interrupt() has been known
// to hang
long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
Thread t = this.thread;;
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point.
// Doing so interferes with the channel's socket logic.
}
else {
ServerSocket s = this.socket;
if (s != null) {
s.close();
}
if (t != null) {
t.interrupt();
}
}
do {
t = this.thread;
if (t == null || !t.isAlive()) {
break;
}
t.join(200);
} while (timeout > System.currentTimeMillis());
if (t != null && t.isAlive()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.TCPConduit_UNABLE_TO_SHUT_DOWN_LISTENER_WITHIN_0_MS_UNABLE_TO_INTERRUPT_SOCKET_ACCEPT_DUE_TO_JDK_BUG_GIVING_UP,
Integer.valueOf(LISTENER_CLOSE_TIMEOUT)));
}
} catch (IOException e) {
} catch (InterruptedException e) {
// Ignore, we're trying to stop already.
}
finally {
this.hsPool.shutdownNow();
}
// close connections after shutting down acceptor to fix bug 30695
this.conTable.close();
socket = null;
thread = null;
conTable = null;
}
}
/**
* Returns whether or not this conduit is stopped
*
* @since 3.0
*/
public boolean isStopped() {
return this.stopped;
}
/** starts the conduit again after it's been stopped. This will clear the
server map if the conduit's port is zero (wildcard bind) */
public void restart() throws ConnectionException {
if (!stopped)
return;
this.stats = null;
if (directChannel != null) {
this.stats = directChannel.getDMStats();
}
if (this.stats == null) {
this.stats = new LonerDistributionManager.DummyDMStats();
}
try {
this.conTable = ConnectionTable.create(this);
}
catch (IOException io) {
throw new ConnectionException(LocalizedStrings.TCPConduit_UNABLE_TO_INITIALIZE_CONNECTION_TABLE.toLocalizedString(), io);
}
startAcceptor();
}
/** this is the server socket listener thread's run loop */
public void run() {
ConnectionTable.threadWantsSharedResources();
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "Starting P2P Listener on {}", this.getId());
}
for(;;) {
SystemFailure.checkFailure();
if (stopper.cancelInProgress() != null) {
break;
}
if (stopped) {
break;
}
if (Thread.currentThread().isInterrupted()) {
break;
}
if (stopper.cancelInProgress() != null) {
break; // part of bug 37271
}
Socket othersock = null;
try {
if (this.useNIO) {
SocketChannel otherChannel = channel.accept();
othersock = otherChannel.socket();
}
else {
try {
othersock = socket.accept();
}
catch (SSLException ex) {
// SW: This is the case when there is a problem in P2P
// SSL configuration, so need to exit otherwise goes into an
// infinite loop just filling the logs
logger.warn(LocalizedMessage.create(
LocalizedStrings.TCPConduit_STOPPING_P2P_LISTENER_DUE_TO_SSL_CONFIGURATION_PROBLEM), ex);
break;
}
SocketCreator.getDefaultInstance().configureServerSSLSocket(
othersock);
}
if (stopped) {
try {
if (othersock != null) {
othersock.close();
}
}
catch (Exception e) {
}
continue;
}
if (inhibitNewConnections) {
if (logger.isTraceEnabled(LogMarker.QA)) {
logger.trace(LogMarker.QA, "Test hook: inhibiting acceptance of connection {}", othersock);
}
othersock.close();
while (inhibitNewConnections && !stopped) {
this.stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
interrupted = true;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // while
if (logger.isTraceEnabled(LogMarker.QA)) {
logger.trace(LogMarker.QA, "Test hook: finished inhibiting acceptance of connections");
}
}
else {
acceptConnection(othersock);
}
}
catch (ClosedByInterruptException cbie) {
//safe to ignore
}
catch (ClosedChannelException e) {
break; // we're dead
}
catch (CancelException e) {
break;
}
catch (Exception e) {
if (!stopped) {
if (e instanceof SocketException && "Socket closed".equalsIgnoreCase(e.getMessage())) {
// safe to ignore; see bug 31156
if (!socket.isClosed()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.TCPConduit_SERVERSOCKET_THREW_SOCKET_CLOSED_EXCEPTION_BUT_SAYS_IT_IS_NOT_CLOSED), e);
try {
socket.close();
createServerSocket();
}
catch (IOException ioe) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_CLOSE_AND_RECREATE_SERVER_SOCKET), ioe);
// post 5.1.0x, this should force shutdown
try {
Thread.sleep(5000);
}
catch (InterruptedException ie) {
// Don't reset; we're just exiting the thread
logger.info(LocalizedMessage.create(
LocalizedStrings.TCPConduit_INTERRUPTED_AND_EXITING_WHILE_TRYING_TO_RECREATE_LISTENER_SOCKETS));
return;
}
}
}
} else {
this.stats.incFailedAccept();
if (e instanceof IOException && "Too many open files".equals(e.getMessage())) {
getConTable().fileDescriptorsExhausted();
} else {
logger.warn(e.getMessage(), e);
}
}
}
//connections.cleanupLowWater();
}
if (!stopped && socket.isClosed()) {
// NOTE: do not check for distributed system closing here. Messaging
// may need to occur during the closing of the DS or cache
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_SERVERSOCKET_CLOSED_REOPENING));
try {
createServerSocket();
}
catch (ConnectionException ex) {
logger.warn(ex.getMessage(), ex);
}
}
} // for
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.debug("Stopped P2P Listener on {}", this.getId());
}
}
private void acceptConnection(final Socket othersock) {
try {
this.hsPool.execute(new Runnable() {
public void run() {
basicAcceptConnection(othersock);
}
});
}
catch (RejectedExecutionException rejected) {
try {
othersock.close();
}
catch (IOException ignore) {
}
}
}
private ConnectionTable getConTable() {
ConnectionTable result = this.conTable;
if (result == null) {
stopper.checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_TCP_LAYER_HAS_BEEN_SHUTDOWN.toLocalizedString());
}
return result;
}
protected void basicAcceptConnection(Socket othersock) {
try {
getConTable().acceptConnection(othersock);
}
catch (IOException io) {
// exception is logged by the Connection
if (!stopped) {
this.stats.incFailedAccept();
}
}
catch (ConnectionException ex) {
// exception is logged by the Connection
if (!stopped) {
this.stats.incFailedAccept();
}
}
catch (CancelException e) {
}
catch (Exception e) {
if (!stopped) {
// if (e instanceof SocketException
// && "Socket closed".equals(e.getMessage())) {
// // safe to ignore; see bug 31156
// }
// else
{
this.stats.incFailedAccept();
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
new Object[] {othersock.getInetAddress(), e}), e);
}
}
//connections.cleanupLowWater();
}
}
/**
* return true if "new IO" classes are being used for the server socket
*/
protected boolean useNIO() {
return this.useNIO;
}
/**
* records the current outgoing message count on all thread-owned
* ordered connections
* @since 5.1
*/
public void getThreadOwnedOrderedConnectionState(
Stub member,
HashMap result)
{
getConTable().getThreadOwnedOrderedConnectionState(member, result);
}
/**
* wait for the incoming connections identified by the keys in the
* argument to receive and dispatch the number of messages associated
* with the key
* @since 5.1
*/
public void waitForThreadOwnedOrderedConnectionState(Stub member, HashMap channelState)
throws InterruptedException
{
// if (Thread.interrupted()) throw new InterruptedException(); not necessary done in waitForThreadOwnedOrderedConnectionState
getConTable().waitForThreadOwnedOrderedConnectionState(member, channelState);
}
/**
* connections send messageReceived when a message object has been
* read.
* @param bytesRead number of bytes read off of network to get this message
*/
protected void messageReceived(Connection receiver, DistributionMessage message,
int bytesRead) {
if (logger.isTraceEnabled()) {
logger.trace("{} received {} from {}", id, message, receiver);
}
if (directChannel != null) {
DistributionMessage msg = message;
msg.setBytesRead(bytesRead);
msg.setSender(receiver.getRemoteAddress());
msg.setSharedReceiver(receiver.isSharedResource());
directChannel.receive(msg, bytesRead, receiver.getRemoteId());
}
}
/** gets the Stub representing this conduit's ServerSocket endpoint. This
is used to generate other stubs containing endpoint information. */
public Stub getId() {
return id;
}
/** gets the actual port to which this conduit's ServerSocket is bound */
public int getPort() {
return id.getPort();
}
/**
* Gets the local java groups address that identifies this conduit
*/
public InternalDistributedMember getLocalAddress() {
return this.localAddr;
}
/** gets the requested port that this TCPConduit bound to. This could
be zero if a wildcard bind was done */
public int getBindPort() {
return port;
}
/** gets the channel that is used to process non-Stub messages */
public DirectChannel getDirectChannel() {
return directChannel;
}
public InternalDistributedMember getMemberForStub(Stub s, boolean validate) {
return membershipManager.getMemberForStub(s, validate);
}
public void setLocalAddr(InternalDistributedMember addr) {
localAddr = addr;
}
public InternalDistributedMember getLocalId() {
return localAddr;
}
/**
* Return a connection to the given member. This method must continue
* to attempt to create a connection to the given member as long as that
* member is in the membership view and the system is not shutting down.
*
* @param memberAddress the IDS associated with the remoteId
* @param remoteId the TCPConduit stub for this member
* @param preserveOrder whether this is an ordered or unordered connection
* @param retry false if this is the first attempt
* @param startTime the time this operation started
* @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted (or zero)
* @return the connection
*/
public Connection getConnection(InternalDistributedMember memberAddress, Stub remoteId, final boolean preserveOrder, boolean retry,
long startTime, long ackTimeout, long ackSATimeout)
throws java.io.IOException, DistributedSystemDisconnectedException
{
//final boolean preserveOrder = (processorType == DistributionManager.SERIAL_EXECUTOR )|| (processorType == DistributionManager.PARTITIONED_REGION_EXECUTOR);
if (stopped) {
throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_THE_CONDUIT_IS_STOPPED.toLocalizedString());
}
Connection conn = null;
InternalDistributedMember memberInTrouble = null;
boolean breakLoop = false;
for (;;) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
// If this is the second time through this loop, we had
// problems. Tear down the connection so that it gets
// rebuilt.
if (retry || conn != null) { // not first time in loop
// Consult with the membership manager; if member has gone away,
// there will not be an entry for this stub.
InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true);
if (m == null) {
// OK, the member left. Just register an error.
throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
}
// bug35953: Member is still in view; we MUST NOT give up!
// Pause just a tiny bit...
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
interrupted = true;
stopper.checkCancelInProgress(e);
}
// try again after sleep
m = this.membershipManager.getMemberForStub(remoteId, true);
if (m == null) {
// OK, the member left. Just register an error.
throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
}
// Print a warning (once)
if (memberInTrouble == null) {
memberInTrouble = m;
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble));
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
}
}
// Close the connection (it will get rebuilt later).
this.stats.incReconnectAttempts();
if (conn != null) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Closing old connection. conn={} before retrying. remoteID={} memberInTrouble={}",
conn, remoteId, memberInTrouble);
}
conn.closeForReconnect("closing before retrying");
}
catch (CancelException ex) {
throw ex;
}
catch (Exception ex) {
}
}
} // not first time in loop
Exception problem = null;
try {
// Get (or regenerate) the connection
// bug36202: this could generate a ConnectionException, so it
// must be caught and retried
boolean retryForOldConnection;
boolean debugRetry = false;
do {
retryForOldConnection = false;
conn = getConTable().get(remoteId, preserveOrder, startTime, ackTimeout, ackSATimeout);
if (conn == null) {
// conduit may be closed - otherwise an ioexception would be thrown
problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(remoteId));
} else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
if (logger.isDebugEnabled()) {
logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode());
}
conn.closeOldConnection("closing old connection");
conn = null;
retryForOldConnection = true;
debugRetry = true;
}
} while (retryForOldConnection);
if (debugRetry && logger.isDebugEnabled()) {
logger.debug("Done removing old connections");
}
// we have a connection; fall through and return it
}
catch (ConnectionException e) {
// Race condition between acquiring the connection and attempting
// to use it: another thread closed it.
problem = e;
// [sumedh] No need to retry since Connection.createSender has already
// done retries and now member is really unreachable for some reason
// even though it may be in the view
breakLoop = true;
}
catch (IOException e) {
problem = e;
// bug #43962 don't keep trying to connect to an alert listener
if (AlertAppender.isThreadAlerting()) {
if (logger.isDebugEnabled()) {
logger.debug("Giving up connecting to alert listener {}", memberAddress);
}
breakLoop = true;
}
}
if (problem != null) {
// Some problems are not recoverable; check and error out early.
InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true);
if (m == null) { // left the view
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
logger.info(LocalizedMessage.create(
LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble));
}
throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(remoteId));
} // left the view
if (membershipManager.shutdownInProgress()) { // shutdown in progress
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
logger.info(LocalizedMessage.create(
LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED, memberInTrouble));
}
stopper.checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_ABANDONED_BECAUSE_SHUTDOWN_IS_IN_PROGRESS.toLocalizedString());
} // shutdown in progress
// Log the warning. We wait until now, because we want
// to have m defined for a nice message...
if (memberInTrouble == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1,
new Object[] {m, problem}));
memberInTrouble = m;
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Error sending message to {}", m, problem);
}
}
if (breakLoop) {
if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.TCPConduit_THROWING_IOEXCEPTION_AFTER_FINDING_BREAKLOOP_TRUE),
problem);
}
if (problem instanceof IOException) {
throw (IOException)problem;
}
else {
IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(remoteId));
ioe.initCause(problem);
throw ioe;
}
}
// Retry the operation (indefinitely)
continue;
} // problem != null
// Success!
// Make sure our logging is bracketed if there was a problem
if (memberInTrouble != null) {
logger.info(LocalizedMessage.create(
LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0,
memberInTrouble));
if (logger.isTraceEnabled()) {
logger.trace("new connection is {} remoteId={} memberAddress={}", conn, remoteId, memberAddress);
}
}
return conn;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for(;;)
}
// /**
// * Send a message.
// * @return the connection used to send the message
// * @throws IOException if peer departed view or shutdown in progress
// */
// private Connection send(Stub remoteId, ByteBuffer bb, boolean preserveOrder, DistributionMessage msg)
// throws java.io.IOException
// {
// if (stopped) {
// throw new ConduitStoppedException("The conduit is stopped");
// }
// if (!QUIET) {
// LogWriterI18n l = getLogger();
// if (l.finerEnabled()) {
// l.finer(id.toString() + " sending " + bb
// + " to " + remoteId);
// }
// }
// Connection conn = null;
// InternalDistributedMember memberInTrouble = null;
// for (;;) {
// // If this is the second time through this loop, we had
// // problems. Tear down the connection so that it gets
// // rebuilt.
// if (conn != null) { // not first time in loop
// // Consult with the membership manager; if member has gone away,
// // there will not be an entry for this stub.
// InternalDistributedMember m = membershipManager.getMemberForStub(remoteId);
// if (m == null) {
// // OK, the member left. Just register an error.
// throw new IOException("TCP/IP connection lost and member no longer in view");
// }
// // bug35953: Member is still in view; we MUST NOT give up!
// // Pause just a tiny bit...
// try {
// Thread.sleep(5000);
// }
// catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// if (membershipManager.shutdownInProgress()) { // shutdown in progress
// // Bracket our original warning
// if (memberInTrouble != null) {
// logger.info("Ending retry attempt because shutdown has started.");
// }
// throw new IOException("Abandoned because shutdown is in progress");
// } // shutdown in progress
// // Strange random interrupt intercepted?
// logger.warning("Thread has been interrupted but no shutdown in progress", e);
// throw new DistributedSystemDisconnectedException(e);
// }
// // Print a warning (once)
// if (memberInTrouble == null) {
// memberInTrouble = m;
// getLogger().warning("Attempting TCP/IP reconnect to " + memberInTrouble);
// }
// else {
// getLogger().fine("Attempting TCP/IP reconnect to " + memberInTrouble);
// }
// // Close the connection (it will get rebuilt later).
// this.stats.incReconnectAttempts();
// try {
// conn.closeForReconnect("closing before retrying");
// }
// catch (CancelException ex) {
// // In general we ignore close problems, but if the system
// // is shutting down, we should just quit.
// throw ex;
// }
// catch (Exception ex) {
// }
// } // not first time in loop
// // Do the send
// Exception problem = null;
// try {
// // Get (or regenerate) the connection
// // bug36202: this could generate a ConnectionException, so it
// // must be caught and retried
// conn = getConTable().get(remoteId, preserveOrder);
// // getLogger().info ("connections returned " + conn);
// if (conn == null) {
// // conduit may be closed - otherwise an ioexception would be thrown
// throw new IOException("Unable to reconnect to server; possible shutdown: "
// + remoteId);
// }
// conn.sendPreserialized(bb, msg);
// }
// catch (ConnectionException e) {
// // Race condition between acquiring the connection and attempting
// // to use it: another thread closed it.
// problem = e;
// }
// catch (IOException e) {
// problem = e;
// }
// if (problem != null) {
// // Some problems are not recoverable; check an error out early.
// InternalDistributedMember m = membershipManager.getMemberForStub(remoteId);
// if (m == null) { // left the view
// // Bracket our original warning
// if (memberInTrouble != null) {
// logger.info("Ending retry attempt because " + memberInTrouble
// + " has disappeared.");
// }
// throw new IOException("Peer has disappeared from view");
// } // left the view
// if (membershipManager.shutdownInProgress()) { // shutdown in progress
// // Bracket our original warning
// if (memberInTrouble != null) {
// logger.info("Ending retry attempt because shutdown has started.");
// }
// throw new IOException("Abandoned because shutdown is in progress");
// } // shutdown in progress
// if (endpointRemoved(remoteId)) { // endpoint removed
// // TODO what does this mean?
// // Bracket our original warning
// if (memberInTrouble != null) {
// logger.info("Ending retry attempt because " + memberInTrouble
// + " has lost its endpoint.");
// }
// throw new IOException("Endpoint was removed");
// } // endpoint removed
// // Log the warning. We wait until now, because we want
// // to have m defined for a nice message...
// if (memberInTrouble == null) {
// logger.warning(
// "Error sending message to " + m + " (will reattempt): "
// + problem.toString(),
// logger.finerEnabled() ? problem : null);
// memberInTrouble = m;
// }
// else {
// logger.fine("Error sending message to " + m, problem);
// }
// // Retry the operation (indefinitely)
// continue;
// } // problem != null
// // Success!
// // Make sure our logging is bracketed if there was a problem
// if (memberInTrouble != null) {
// logger.info("Successfully reestablished connection to server "
// + memberInTrouble);
// }
// return conn;
// } // while retry
// }
// /**
// * Sends an already serialized message in a byte buffer
// * to the given endpoint. Waits for the send to complete
// * before returning.
// * @return the connection used to send the message
// */
// public Connection sendSync(Stub remoteId, ByteBuffer bb, int processorType, DistributionMessage msg)
// throws java.io.IOException
// {
// return send(remoteId, bb,
// processorType == DistributionManager.SERIAL_EXECUTOR,
// msg);
// }
@Override
public String toString() {
return "" + id;
}
public boolean threadOwnsResources() {
ConnectionTable ct = this.conTable;
if (ct == null) {
return false;
} else {
DM d = getDM();
if (d != null) {
return d.getSystem().threadOwnsResources();
} else {
return false;
}
}
}
/**
* Returns the distribution manager of the direct channel
*/
public DM getDM() {
return directChannel.getDM();
}
/**
* Closes any connections used to communicate with the given stub
*/
public void removeEndpoint(Stub stub, String reason) {
removeEndpoint(stub, reason, true);
}
public void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) {
ConnectionTable ct = this.conTable;
if (ct == null) {
return;
}
ct.removeEndpoint(stub, reason, notifyDisconnect);
}
/** check to see if there are still any receiver threads for the given end-point */
public boolean hasReceiversFor(Stub endPoint) {
ConnectionTable ct = this.conTable;
return (ct != null) && ct.hasReceiversFor(endPoint);
}
protected class Stopper extends CancelCriterion {
/* (non-Javadoc)
* @see com.gemstone.gemfire.CancelCriterion#cancelInProgress()
*/
@Override
public String cancelInProgress() {
DM dm = getDM();
if (dm == null) {
return "no distribution manager";
}
if (TCPConduit.this.stopped) {
return "Conduit has been stopped";
}
return null;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.CancelCriterion#generateCancelledException(java.lang.Throwable)
*/
@Override
public RuntimeException generateCancelledException(Throwable e) {
String reason = cancelInProgress();
if (reason == null) {
return null;
}
DM dm = getDM();
if (dm == null) {
return new DistributedSystemDisconnectedException("no distribution manager");
}
RuntimeException result = dm.getCancelCriterion().generateCancelledException(e);
if (result != null) {
return result;
}
// We know we've been stopped; generate the exception
result = new DistributedSystemDisconnectedException("Conduit has been stopped");
result.initCause(e);
return result;
}
}
private final Stopper stopper = new Stopper();
public CancelCriterion getCancelCriterion() {
return stopper;
}
/**
* if the conduit is disconnected due to an abnormal condition, this
* will describe the reason
* @return exception that caused disconnect
*/
public Exception getShutdownCause() {
return this.shutdownCause;
}
/**
* ARB: Called by Connection before handshake reply is sent.
* Returns true if member is part of view, false if membership is not confirmed before timeout.
*/
public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
return membershipManager.waitForMembershipCheck(remoteId);
}
/**
* simulate being sick
*/
public void beSick() {
this.inhibitNewConnections = true;
this.conTable.closeReceivers(true);
}
/**
* simulate being healthy
*/
public void beHealthy() {
this.inhibitNewConnections = false;
}
}