blob: 9beb947dfc130634f60022d7385c24e985acab4b [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.tcp;
import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.SystemTimer;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/** <p>ConnectionTable holds all of the Connection objects in a conduit.
Connections represent a pipe between two endpoints represented
by generic Stubs.</p>
@author Bruce Schuchardt
@author Darrel Schneider
@since 2.1
*/
/*
Note: We no longer use InputMultiplexer
If InputMux is reinstated then the manager needs to be
initialized and all lines that have a NOMUX preface should be uncommented
*/
public class ConnectionTable {
private static final Logger logger = LogService.getLogger();
/** a random number generator for secondary connection selection */
//static java.util.Random random = new java.util.Random();
/** warning when descriptor limit reached */
private static boolean ulimitWarningIssued;
/**
* true if the current thread wants non-shared resources
*/
private static ThreadLocal threadWantsOwnResources = new ThreadLocal();
/**
* Used for messages whose order must be preserved
* Only connections used for sending messages,
* and receiving acks, will be put in this map.
*/
protected final Map orderedConnectionMap = new ConcurrentHashMap();
/**
* ordered connections local to this thread. Note that accesses to
* the resulting map must be synchronized because of static cleanup.
*/
// ThreadLocal<Map>
private final ThreadLocal threadOrderedConnMap;
/**
* List of thread-owned ordered connection maps, for cleanup
*
* Accesses to the maps in this list need to be synchronized on their instance.
*/
private final List threadConnMaps;
/**
* Timer to kill idle threads
*
* @guarded.By this
*/
private SystemTimer idleConnTimer;
/**
* Used to find connections owned by threads.
* The key is the same one used in threadOrderedConnMap.
* The value is an ArrayList since we can have any number of connections
* with the same key.
*/
private ConcurrentMap threadConnectionMap;
/**
* Used for all non-ordered messages.
* Only connections used for sending messages,
* and receiving acks, will be put in this map.
*/
protected final Map unorderedConnectionMap = new ConcurrentHashMap();
/**
* Used for all accepted connections. These connections are read only;
* we never send messages, except for acks; only receive.
*
* Consists of a list of Connection
*/
private final List receivers = new ArrayList();
/**
* the conduit for this table
*/
protected final TCPConduit owner;
// ARB: temp making this protected to provide access to Connection.
//private final TCPConduit owner;
/**
* true if this table is no longer in use
*/
private volatile boolean closed = false;
/**
* The most recent instance to be created
*
* TODO this assumes no more than one instance is created at a time?
*/
private static final AtomicReference lastInstance = new AtomicReference();
/**
* A set of sockets that are in the process of being connected
*/
private Map connectingSockets = new HashMap();
/**
* Cause calling thread to share communication
* resources with other threads.
*/
public static void threadWantsSharedResources() {
threadWantsOwnResources.set(Boolean.FALSE);
}
/**
* Cause calling thread to acquire exclusive access to
* communication resources.
* Exclusive access may not be available in which
* case this call is ignored.
*/
public static void threadWantsOwnResources() {
threadWantsOwnResources.set(Boolean.TRUE);
}
/**
* Returns true if calling thread owns its own communication resources.
*/
boolean threadOwnsResources() {
DM d = getDM();
if (d != null) {
return d.getSystem().threadOwnsResources() && !AlertAppender.isThreadAlerting();
}
return false;
// Boolean b = getThreadOwnsResourcesRegistration();
// if (b == null) {
// // thread does not have a preference so return default
// return !this.owner.shareSockets;
// return false;
// } else {
// return b.booleanValue();
// }
}
public static Boolean getThreadOwnsResourcesRegistration() {
return (Boolean)threadWantsOwnResources.get();
}
// public static void setThreadOwnsResourcesRegistration(
// Boolean newValue) {
// threadWantsOwnResources.set(newValue);
// }
// private Map connections = new HashMap();
/* NOMUX: private InputMuxManager inputMuxManager; */
//private int lowWater;
//private int highWater;
// private static boolean TRACK_SERVER_CONNECTIONS =
// System.getProperty("p2p.bidirectional", "true").equals("true");
private ConnectionTable(TCPConduit c) throws IOException {
this.owner = c;
this.idleConnTimer = (this.owner.idleConnectionTimeout != 0)
? new SystemTimer(c.getDM().getSystem(), true)
: null;
this.threadOrderedConnMap = new ThreadLocal();
this.threadConnMaps = new ArrayList();
this.threadConnectionMap = new ConcurrentHashMap();
/* NOMUX: if (TCPConduit.useNIO) {
inputMuxManager = new InputMuxManager(this);
inputMuxManager.start(c.logger);
}*/
}
/** conduit sends connected() after establishing the server socket */
// protected void connected() {
// /* NOMUX: if (TCPConduit.useNIO) {
// inputMuxManager.connected();
// }*/
// }
/** conduit calls acceptConnection after an accept */
protected void acceptConnection(Socket sock) throws IOException,
ConnectionException {
Connection connection = null;
InetAddress connAddress = sock.getInetAddress(); // for bug 44736
boolean finishedConnecting = false;
Connection conn = null;
// boolean exceptionLogged = false;
try {
conn = Connection.createReceiver(this, sock);
// check for shutdown (so it doesn't get missed in the finally block)
this.owner.getCancelCriterion().checkCancelInProgress(null);
finishedConnecting = true;
} catch (IOException ex) {
// check for shutdown...
this.owner.getCancelCriterion().checkCancelInProgress(ex);
logger.warn(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
throw ex;
} catch (ConnectionException ex) {
// check for shutdown...
this.owner.getCancelCriterion().checkCancelInProgress(ex);
logger.warn(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
throw ex;
} finally {
// note: no need to call incFailedAccept here because it will be done
// in our caller.
// no need to log error here since caller will log warning
if (conn != null && !finishedConnecting) {
// we must be throwing from checkCancelInProgress so close the connection
closeCon(LocalizedStrings.ConnectionTable_CANCEL_AFTER_ACCEPT.toLocalizedString(), conn);
conn = null;
}
}
//Stub id = conn.getRemoteId();
if (conn != null) {
synchronized (this.receivers) {
this.owner.stats.incReceivers();
if (this.closed) {
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_NO_LONGER_IN_USE.toLocalizedString(), conn);
return;
}
this.receivers.add(conn);
}
if (logger.isDebugEnabled()) {
logger.debug("Accepted {} myAddr={} theirAddr={}", conn, getConduit().getLocalAddress(), conn.remoteAddr);
}
}
// cleanupHighWater();
}
// /** returns the connection associated with the given key, or null if
// no such connection exists */
// protected Connection basicGet(Serializable id) {
// synchronized (this.orderedConnectionMap) {
// return (Connection) this.orderedConnectionMap.get(id);
// }
// }
// protected Connection get(Serializable id) throws java.io.IOException {
// return get(id, false);
// }
/**
* Process a newly created PendingConnection
*
* @param id Stub on which the connection is created
* @param sharedResource whether the connection is used by multiple threads
* @param preserveOrder whether to preserve order
* @param m map to add the connection to
* @param pc the PendingConnection to process
* @param startTime the ms clock start time for the operation
* @param ackThreshold the ms ack-wait-threshold, or zero
* @param ackSAThreshold the ms ack-severe_alert-threshold, or zero
* @return the Connection, or null if someone else already created or closed it
* @throws IOException if unable to connect
* @throws DistributedSystemDisconnectedException
*/
private Connection handleNewPendingConnection(Stub id, boolean sharedResource,
boolean preserveOrder,
Map m, PendingConnection pc, long startTime, long ackThreshold, long ackSAThreshold)
throws IOException, DistributedSystemDisconnectedException
{
// handle new pending connection
Connection con = null;
try {
con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder,
id, this.owner.getMemberForStub(id, false),
sharedResource,
startTime, ackThreshold, ackSAThreshold);
this.owner.stats.incSenders(sharedResource, preserveOrder);
}
finally {
// our connection failed to notify anyone waiting for our pending con
if (con == null) {
this.owner.stats.incFailedConnect();
synchronized (m) {
Object rmObj = m.remove(id);
if (rmObj != pc && rmObj != null) {
// put it back since it was not our pc
m.put(id, rmObj);
}
}
pc.notifyWaiters(null);
// we must be throwing an exception
}
} // finally
// Update our list of connections -- either the
// orderedConnectionMap or unorderedConnectionMap
//
// Note that we added the entry _before_ we attempted the connect,
// so it's possible something else got through in the mean time...
synchronized (m) {
Object e = m.get(id);
if (e == pc) {
m.put(id, con);
}
else if (e == null) {
// someone closed our pending connection
// so cleanup the connection we created
con.requestClose(LocalizedStrings.ConnectionTable_PENDING_CONNECTION_CANCELLED.toLocalizedString());
con = null;
}
else {
if (e instanceof Connection) {
Connection newCon = (Connection)e;
if (!newCon.connected) {
// Fix for bug 31590
// someone closed our pending connect
// so cleanup the connection we created
if (con != null) {
con.requestClose(LocalizedStrings.ConnectionTable_PENDING_CONNECTION_CLOSED.toLocalizedString());
con = null;
}
}
else {
// This should not happen. It means that someone else
// created the connection which should only happen if
// our Connection was rejected.
// Assert.assertTrue(false);
// The above assertion was commented out to try the
// following with bug 32680
if (con != null) {
con.requestClose(LocalizedStrings.ConnectionTable_SOMEONE_ELSE_CREATED_THE_CONNECTION.toLocalizedString());
}
con = newCon;
}
}
}
}
pc.notifyWaiters(con);
if (con != null && logger.isDebugEnabled()) {
logger.debug("handleNewPendingConnection {} myAddr={} theirAddr={}", con, getConduit().getLocalAddress(), con.remoteAddr);
}
return con;
}
/**
* unordered or conserve-sockets
* note that unordered connections are currently always shared
*
* @param id the Stub on which we are creating a connection
* @param threadOwnsResources whether unordered conn is owned by the current thread
* @param preserveOrder whether to preserve order
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new Connection, or null if an error
* @throws IOException if unable to create the connection
* @throws DistributedSystemDisconnectedException
*/
private Connection getUnorderedOrConserveSockets(Stub id,
boolean threadOwnsResources, boolean preserveOrder,
long startTime, long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException
{
Connection result = null;
final Map m = preserveOrder ? this.orderedConnectionMap
: this.unorderedConnectionMap;
PendingConnection pc = null; // new connection, if needed
Object mEntry = null; // existing connection (if we don't create a new one)
// Look for pending connection
synchronized (m) {
mEntry = m.get(id);
if (mEntry != null && (mEntry instanceof Connection)) {
Connection existingCon = (Connection)mEntry;
if (!existingCon.connected) {
mEntry = null;
}
}
if (mEntry == null) {
pc = new PendingConnection(preserveOrder, id);
m.put(id, pc);
}
} // synchronized
if (pc != null) {
result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
startTime, ackTimeout, ackSATimeout);
if (!preserveOrder && threadOwnsResources) {
// TODO we only schedule unordered shared cnxs for timeout
// if we own sockets. This seems wrong. We should
// be willing to time them out even if we don't own sockets.
scheduleIdleTimeout(result);
}
} else { // we have existing connection
if (mEntry instanceof PendingConnection) {
if (AlertAppender.isThreadAlerting()) {
// do not change the text of this exception - it is looked for in exception handlers
throw new IOException("Cannot form connection to alert listener " + id);
}
result = ((PendingConnection)mEntry).waitForConnect(
this.owner.getMembershipManager(), startTime,
ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
if (result != null) {
logger.debug("getUnorderedOrConserveSockets {} myAddr={} theirAddr={}",
result, getConduit().getLocalAddress(), result.remoteAddr);
} else {
logger.debug("getUnorderedOrConserveSockets: Connect failed");
}
}
} else {
result = (Connection)mEntry;
}
} // we have existing connection
return result;
}
/**
* Must be looking for an ordered connection that this thread owns
*
* @param id stub on which to create the connection
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the connection, or null if an error
* @throws IOException if the connection could not be created
* @throws DistributedSystemDisconnectedException
*/
Connection getOrderedAndOwned(Stub id, long startTime, long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
// Look for result in the thread local
Map m = (Map)this.threadOrderedConnMap.get();
if (m == null) {
// First time for this thread. Create thread local
m = new HashMap();
synchronized (this.threadConnMaps) {
if (this.closed) {
owner.getCancelCriterion().checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_IS_CLOSED.toLocalizedString());
}
// check for stale references and remove them.
for (Iterator it=this.threadConnMaps.iterator(); it.hasNext();) {
Reference r = (Reference)it.next();
if (r.get() == null) {
it.remove();
}
} // for
this.threadConnMaps.add(new WeakReference(m)); // ref added for bug 38011
} // synchronized
this.threadOrderedConnMap.set(m);
} else {
// Consult thread local.
synchronized (m) {
result = (Connection)m.get(id);
}
if (result != null && result.timedOut) {
result = null;
}
}
if (result != null)
return result;
// OK, we have to create a new connection.
result = Connection.createSender(owner.getMembershipManager(),
this, true /* preserveOrder */, id,
this.owner.getMemberForStub(id, false), false /* shared */,
startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}", result);
}
this.owner.stats.incSenders(false/*shared*/, true /* preserveOrder */);
// Update the list of connections owned by this thread....
if (this.threadConnectionMap == null) {
// This instance is being destroyed; fail the operation
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED.toLocalizedString(), result);
return null;
}
ArrayList al = (ArrayList)this.threadConnectionMap.get(id);
if (al == null) {
// First connection for this Stub. Make sure list for this
// stub is created if it isn't already there.
al = new ArrayList();
// Since it's a concurrent map, we just try to put it and then
// return whichever we got.
Object o = this.threadConnectionMap.putIfAbsent(id, al);
if (o != null) {
al = (ArrayList)o;
}
}
// Add our Connection to the list
synchronized (al) {
al.add(result);
}
// Finally, add the connection to our thread local map.
synchronized (m) {
m.put(id, result);
}
scheduleIdleTimeout(result);
return result;
}
/** schedule an idle-connection timeout task */
private void scheduleIdleTimeout(Connection conn) {
if (conn == null) {
// fix for bug 43529
return;
}
// Set the idle timeout
if (this.owner.idleConnectionTimeout != 0) {
try {
synchronized(this) {
if (!this.closed) {
IdleConnTT task = new IdleConnTT(conn);
conn.setIdleTimeoutTask(task);
this.getIdleConnTimer().scheduleAtFixedRate(task,
this.owner.idleConnectionTimeout, this.owner.idleConnectionTimeout);
}
}
}
catch (IllegalStateException e) {
if (conn.isClosing()) {
// bug #45077 - connection is closed before we schedule the timeout task,
// causing the task to be canceled
return;
}
logger.debug("Got an illegal state exception: {}", e.getMessage(), e);
// Unfortunately, cancelInProgress() is not set until *after*
// the shutdown message has been sent, so we need to check the
// "closeInProgress" bit instead.
owner.getCancelCriterion().checkCancelInProgress(null);
Throwable cause = owner.getShutdownCause();
if (cause == null) {
cause = e;
}
throw new DistributedSystemDisconnectedException(
LocalizedStrings.ConnectionTable_THE_DISTRIBUTED_SYSTEM_IS_SHUTTING_DOWN.toLocalizedString(),
cause);
}
}
}
/**
* Get a new connection
* @param id the Stub on which to create the connection
* @param preserveOrder whether order should be preserved
* @param startTime the ms clock start time
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new Connection, or null if a problem
* @throws java.io.IOException if the connection could not be created
* @throws DistributedSystemDisconnectedException
*/
protected Connection get(Stub id, boolean preserveOrder,
long startTime, long ackTimeout, long ackSATimeout)
throws java.io.IOException, DistributedSystemDisconnectedException
{
if (this.closed) {
this.owner.getCancelCriterion().checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_IS_CLOSED.toLocalizedString());
}
Connection result = null;
boolean threadOwnsResources = threadOwnsResources();
if (!preserveOrder || !threadOwnsResources) {
result = getUnorderedOrConserveSockets(id, threadOwnsResources, preserveOrder, startTime, ackTimeout, ackSATimeout);
} else {
result = getOrderedAndOwned(id, startTime, ackTimeout, ackSATimeout);
}
if (result != null) {
Assert.assertTrue(result.preserveOrder == preserveOrder);
}
return result;
}
protected synchronized void fileDescriptorsExhausted() {
if (!ulimitWarningIssued) {
ulimitWarningIssued = true;
logger.fatal(LocalizedMessage.create(LocalizedStrings.ConnectionTable_OUT_OF_FILE_DESCRIPTORS_USING_SHARED_CONNECTION));
InternalDistributedSystem.getAnyInstance().setShareSockets(true);
threadWantsOwnResources = new ThreadLocal();
}
}
protected final TCPConduit getConduit() {
return owner;
}
public boolean isClosed() {
return this.closed;
}
private static void closeCon(String reason, Object c) {
closeCon(reason, c, false);
}
private static void closeCon(String reason, Object c, boolean beingSick) {
if (c == null) {
return;
}
if (c instanceof Connection) {
((Connection)c).closePartialConnect(reason, beingSick); // fix for bug 31666
} else {
((PendingConnection)c).notifyWaiters(null);
}
}
/**
* returns the idle connection timer, or null if the connection table is closed.
* guarded by a sync on the connection table
*/
protected synchronized SystemTimer getIdleConnTimer() {
if (this.closed) {
return null;
}
if (this.idleConnTimer == null) {
this.idleConnTimer = new SystemTimer(getDM().getSystem(), true);
}
return this.idleConnTimer;
}
protected void close() {
/* NOMUX if (inputMuxManager != null) {
inputMuxManager.stop();
}*/
if (this.closed) {
return;
}
this.closed = true;
synchronized (this) {
if (this.idleConnTimer != null) {
this.idleConnTimer.cancel();
}
}
synchronized (this.orderedConnectionMap) {
for (Iterator it=this.orderedConnectionMap.values().iterator(); it.hasNext(); ) {
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED.toLocalizedString(), it.next());
}
this.orderedConnectionMap.clear();
}
synchronized (this.unorderedConnectionMap) {
for (Iterator it=this.unorderedConnectionMap.values().iterator(); it.hasNext(); ) {
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED.toLocalizedString(), it.next());
}
this.unorderedConnectionMap.clear();
}
if (this.threadConnectionMap != null) {
this.threadConnectionMap = null;
}
if (this.threadConnMaps != null) {
synchronized (this.threadConnMaps) {
for (Iterator it=this.threadConnMaps.iterator(); it.hasNext();) {
Reference r = (Reference)it.next();
Map m = (Map)r.get();
if (m != null) {
synchronized (m) {
for (Iterator mit=m.values().iterator(); mit.hasNext(); ) {
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED.toLocalizedString(), mit.next());
}
}
}
}
this.threadConnMaps.clear();
}
}
closeReceivers(false);
Map m = (Map)this.threadOrderedConnMap.get();
if(m != null)
{
synchronized (m) {
m.clear();
}
}
}
/**
* Close all receiving threads. This is used during shutdown and is also
* used by a test hook that makes us deaf to incoming messages.
* @param beingSick a test hook to simulate a sick process
*/
protected void closeReceivers(boolean beingSick) {
synchronized (this.receivers) {
for (Iterator it=this.receivers.iterator(); it.hasNext();) {
Connection con = (Connection)it.next();
if (!beingSick || con.preserveOrder) {
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED.toLocalizedString(), con, beingSick);
it.remove();
}
}
// now close any sockets being formed
SocketCreator sc = SocketCreator.getDefaultInstance();
synchronized(connectingSockets) {
for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
// ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
try {
((Socket)entry.getKey()).close();
} catch (IOException e) {
// ignored - we're shutting down
}
it.remove();
}
}
}
}
protected void removeReceiver(Object con) {
synchronized (this.receivers) {
this.receivers.remove(con);
}
}
/**
* Return true if our owner already knows that this endpoint is departing
*/
protected boolean isEndpointShuttingDown(Stub stub) {
return this.owner.getMemberForStub(stub, true) == null;
}
/** remove an endpoint and notify the membership manager of the departure */
protected void removeEndpoint(Stub stub, String reason) {
removeEndpoint(stub, reason, true);
}
protected void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) {
if (this.closed) {
return;
}
boolean needsRemoval = false;
synchronized (this.orderedConnectionMap) {
if (this.orderedConnectionMap.get(stub) != null)
needsRemoval = true;
}
if (!needsRemoval) {
synchronized (this.unorderedConnectionMap) {
if (this.unorderedConnectionMap.get(stub) != null)
needsRemoval = true;
}
}
if (!needsRemoval) {
ConcurrentMap cm = this.threadConnectionMap;
if (cm != null) {
ArrayList al = (ArrayList)cm.get(stub);
needsRemoval = al != null && al.size() > 0;
}
}
if (needsRemoval) {
synchronized (this.orderedConnectionMap) {
closeCon(reason, this.orderedConnectionMap.remove(stub));
}
synchronized (this.unorderedConnectionMap) {
closeCon(reason, this.unorderedConnectionMap.remove(stub));
}
{
ConcurrentMap cm = this.threadConnectionMap;
if (cm != null) {
ArrayList al = (ArrayList)cm.remove(stub);
if (al != null) {
synchronized (al) {
for (Iterator it=al.iterator(); it.hasNext();)
closeCon(reason, it.next());
al.clear();
}
}
}
}
// close any sockets that are in the process of being connected
Set toRemove = new HashSet();
synchronized(connectingSockets) {
for (Iterator it=connectingSockets.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
if (info.peerAddress.equals(stub.getInetAddress())) {
toRemove.add(entry.getKey());
it.remove();
}
}
}
for (Iterator it=toRemove.iterator(); it.hasNext(); ) {
Socket sock = (Socket)it.next();
try {
sock.close();
}
catch (IOException e) {
if (logger.isDebugEnabled()) {
logger.debug("caught exception while trying to close connecting socket for {}", stub, e);
}
}
}
// close any receivers
// avoid deadlock when a NIC has failed by closing connections outside
// of the receivers sync (bug 38731)
toRemove.clear();
synchronized (this.receivers) {
for (Iterator it=receivers.iterator(); it.hasNext();) {
Connection con = (Connection)it.next();
if (stub.equals(con.getRemoteId())) {
it.remove();
toRemove.add(con);
}
}
}
for (Iterator it=toRemove.iterator(); it.hasNext(); ) {
Connection con = (Connection)it.next();
closeCon(reason, con);
}
// call memberDeparted after doing the closeCon calls
// so it can recursively call removeEndpoint
if (notifyDisconnect) {
owner.getMemberForStub(stub, false);
}
}
}
/** check to see if there are still any receiver threads for the given end-point */
protected boolean hasReceiversFor(Stub endPoint) {
synchronized (this.receivers) {
for (Iterator it=receivers.iterator(); it.hasNext();) {
Connection con = (Connection)it.next();
if (endPoint.equals(con.getRemoteId())) {
return true;
}
}
}
return false;
}
private static void removeFromThreadConMap(ConcurrentMap cm, Stub stub, Connection c) {
if (cm != null) {
ArrayList al = (ArrayList)cm.get(stub);
if (al != null) {
synchronized (al) {
al.remove(c);
}
}
}
}
protected void removeThreadConnection(Stub stub, Connection c) {
/*if (this.closed) {
return;
}*/
removeFromThreadConMap(this.threadConnectionMap, stub, c);
Map m = (Map)this.threadOrderedConnMap.get();
if (m != null) {
// Static cleanup thread might intervene, so we MUST synchronize
synchronized (m) {
if (m.get(stub) == c) {
m.remove(stub);
}
} // synchronized
} // m != null
}
void removeSharedConnection(String reason, Stub stub, boolean ordered, Connection c) {
if (this.closed) {
return;
}
if (ordered) {
synchronized (this.orderedConnectionMap) {
if (this.orderedConnectionMap.get(stub) == c) {
closeCon(reason, this.orderedConnectionMap.remove(stub));
}
}
} else {
synchronized (this.unorderedConnectionMap) {
if (this.unorderedConnectionMap.get(stub) == c) {
closeCon(reason, this.unorderedConnectionMap.remove(stub));
}
}
}
}
/**
* Just ensure that this class gets loaded.
*
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
// don't go any further, Frodo!
}
/**
* Clears lastInstance. Does not yet close underlying sockets, but
* probably not strictly necessary.
*
* @see SystemFailure#emergencyClose()
*/
public static void emergencyClose() {
ConnectionTable ct = (ConnectionTable) lastInstance.get();
if (ct == null) {
return;
}
// ct.close(); // TODO implementing this is a quagmire, but not necessary,
// since recusing from JGroups takes care of our obligations
// to our peers.
lastInstance.set(null);
}
public void removeAndCloseThreadOwnedSockets() {
Map m = (Map) this.threadOrderedConnMap.get();
if (m != null) {
// Static cleanup may intervene; we MUST synchronize.
synchronized (m) {
Iterator it = m.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
Stub stub = (Stub)me.getKey();
Connection c = (Connection)me.getValue();
removeFromThreadConMap(this.threadConnectionMap, stub, c);
it.remove();
closeCon(LocalizedStrings.ConnectionTable_THREAD_FINALIZATION.toLocalizedString(), c);
} // while
} // synchronized m
}
}
public static void releaseThreadsSockets() {
ConnectionTable ct = (ConnectionTable) lastInstance.get();
if (ct == null) {
return;
}
ct.removeAndCloseThreadOwnedSockets();
// lastInstance = null;
}
/**
* records the current outgoing message count on all thread-owned
* ordered connections. This does not synchronize or stop new connections
* from being formed or new messages from being sent
* @since 5.1
*/
protected void getThreadOwnedOrderedConnectionState(Stub member,
HashMap result) {
ConcurrentMap cm = this.threadConnectionMap;
if (cm != null) {
ArrayList al = (ArrayList)cm.get(member);
if (al != null) {
synchronized(al) {
al = new ArrayList(al);
}
for (Iterator it=al.iterator(); it.hasNext(); ) {
Connection conn = (Connection)it.next();
if (!conn.isSharedResource() && conn.getOriginatedHere()
&& conn.getPreserveOrder()) {
result.put(Long.valueOf(conn.getUniqueId()), Long.valueOf(conn.getMessagesSent()));
}
}
}
}
}
/**
* wait for the given incoming connections to receive at least the associated
* number of messages
*/
protected void waitForThreadOwnedOrderedConnectionState(Stub member,
HashMap connectionStates) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException(); // wisest to do this before the synchronize below
List r = null;
synchronized(receivers) {
r = new ArrayList(receivers);
}
for (Iterator it=r.iterator(); it.hasNext();) {
Connection con = (Connection)it.next();
if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
&& member.equals(con.getRemoteId())) {
Long state = (Long)connectionStates.remove(Long.valueOf(con.getUniqueId()));
if (state != null) {
long count = state.longValue();
while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) {
if (logger.isDebugEnabled()) {
logger.debug("Waiting for connection {}/{} currently={} need={}",
con.getRemoteId(), con.getUniqueId(), con.getMessagesReceived(), count);
}
Thread.sleep(100);
}
}
}
}
if (connectionStates.size() > 0) {
if (logger.isDebugEnabled()) {
StringBuffer sb = new StringBuffer(1000);
sb.append("These connections from ");
sb.append(member);
sb.append("could not be located during waitForThreadOwnedOrderedConnectionState: ");
for (Iterator it=connectionStates.entrySet().iterator();
it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
sb.append(entry.getKey())
.append('(')
.append(entry.getValue())
.append(')');
if (it.hasNext()) {
sb.append(',');
}
}
logger.debug(sb);
}
}
}
protected DM getDM() {
return this.owner.getDM();
}
// public boolean isShuttingDown() {
// return this.owner.isShuttingDown();
// }
//protected void cleanupHighWater() {
// cleanup(highWater);
//}
//protected void cleanupLowWater() {
// cleanup(lowWater);
//}
//private void cleanup(int maxConnections) {
/* if (maxConnections == 0 || maxConnections >= connections.size()) {
return;
}
while (connections.size() > maxConnections) {
Connection oldest = null;
synchronized(connections) {
for (Iterator iter = connections.values().iterator(); iter.hasNext(); ) {
Connection c = (Connection)iter.next();
if (oldest == null || c.getTimeStamp() < oldest.getTimeStamp()) {
oldest = c;
}
}
}
// sanity check - don't close anything fresher than 10 seconds or
// we'll start thrashing
if (oldest.getTimeStamp() > (System.currentTimeMillis() - 10000)) {
if (owner.lowWaterConnectionCount > 0) {
owner.lowWaterConnectionCount += 10;
}
if (owner.highWaterConnectionCount > 0) {
owner.highWaterConnectionCount += 10;
}
new Object[] {
owner.lowWaterConnectionCount,
owner.highWaterConnectionCount
});
break;
}
if (oldest != null) {
oldest.close();
}
}*/
//}
/*
public void dumpConnectionTable() {
Iterator iter = connectionMap.keySet().iterator();
while (iter.hasNext()) {
Object key = iter.next();
Object val = connectionMap.get(key);
}
}
*/
private /*static*/ class PendingConnection {
/**
* true if this connection is still pending
*/
private boolean pending = true;
/**
* the connection we are waiting on
*/
private Connection conn = null;
/**
* whether the connection preserves message ordering
*/
private final boolean preserveOrder;
/**
* the stub we are connecting to
*/
private final Stub id;
private final Thread connectingThread;
public PendingConnection(boolean preserveOrder, Stub id) {
this.preserveOrder = preserveOrder;
this.id = id;
this.connectingThread = Thread.currentThread();
}
/**
* Synchronously set the connection and notify waiters that we are ready.
*
* @param c the new connection
*/
public synchronized void notifyWaiters(Connection c) {
if (!this.pending)
return; // already done.
this.conn = c;
this.pending = false;
if (logger.isDebugEnabled()) {
logger.debug("Notifying waiters that pending {} connection to {} is ready; {}",
((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
}
this.notifyAll();
}
/**
* Wait for a connection
* @param mgr the membership manager that can instigate suspect processing if necessary
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new connection
* @throws IOException
*/
public synchronized Connection waitForConnect(MembershipManager mgr,
long startTime, long ackTimeout, long ackSATimeout) throws IOException
{
if(connectingThread == Thread.currentThread()) {
throw new ReenteredConnectException("This thread is already trying to connect");
}
final Map m = this.preserveOrder ? orderedConnectionMap
: unorderedConnectionMap;
boolean severeAlertIssued = false;
boolean suspected = false;
InternalDistributedMember targetMember = null;
if (ackSATimeout > 0) {
targetMember =
((JGroupMembershipManager)mgr).getMemberForStub(this.id, false);
}
for (;;) {
if (!this.pending) break;
getConduit().getCancelCriterion().checkCancelInProgress(null);
// wait a little bit...
boolean interrupted = Thread.interrupted();
try {
this.wait(100); // spurious wakeup ok
}
catch (InterruptedException ignore) {
interrupted = true;
getConduit().getCancelCriterion().checkCancelInProgress(ignore);
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
if (!this.pending) break;
// Still pending...
long now = System.currentTimeMillis();
if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) {
if (startTime + ackTimeout + ackSATimeout < now) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS,
new Object[] { targetMember, (ackSATimeout+ackTimeout)/1000 }));
severeAlertIssued = true;
}
else if (!suspected) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS,
new Object[] { this.id, (ackTimeout)/1000 }));
((JGroupMembershipManager)mgr).suspectMember(targetMember,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
}
}
Object e;
//synchronized (m) {
e = m.get(this.id);
//}
if (e == this) {
if (logger.isDebugEnabled()) {
logger.debug("Waiting for pending connection to complete: {} connection to {}; {}",
((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
}
continue;
}
// Odd state change. Process and exit.
if (logger.isDebugEnabled()) {
logger.debug("Pending connection changed to {} unexpectedly", e);
}
if (e == null) {
// We were removed
notifyWaiters(null);
break;
}
else if (e instanceof Connection) {
notifyWaiters((Connection)e);
break;
}
else {
// defer to the new instance
return ((PendingConnection)e).waitForConnect(mgr, startTime,
ackTimeout, ackSATimeout);
}
} // for
return this.conn;
}
}
private static class IdleConnTT extends SystemTimer.SystemTimerTask {
private Connection c;
IdleConnTT(Connection c) {
this.c = c;
}
@Override
public boolean cancel() {
this.c = null;
return super.cancel();
}
@Override
public void run2() {
Connection con = this.c;
if (con != null) {
if (con.checkForIdleTimeout()) {
cancel();
}
}
}
}
public static ConnectionTable create(TCPConduit conduit) throws IOException {
ConnectionTable ct = new ConnectionTable(conduit);
lastInstance.set(ct);
return ct;
}
/** keep track of a socket that is trying to connect() for shutdown purposes */
public void addConnectingSocket(Socket socket, InetAddress addr) {
synchronized(connectingSockets) {
connectingSockets.put(socket, new ConnectingSocketInfo(addr));
}
}
/** remove a socket from the tracked set. It should be connected at this point */
public void removeConnectingSocket(Socket socket) {
synchronized(connectingSockets) {
connectingSockets.remove(socket);
}
}
private static class ConnectingSocketInfo {
InetAddress peerAddress;
Thread connectingThread;
public ConnectingSocketInfo(InetAddress addr) {
this.peerAddress = addr;
this.connectingThread = Thread.currentThread();
}
}
}