blob: f54f7bd9cd4d636b2585fdce24a4d020ac91d27e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.tcp;
import static java.lang.Integer.MAX_VALUE;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.lang.utils.JavaWorkarounds;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe
* between two endpoints represented by generic DistributedMembers.
*
* @since GemFire 2.1
*/
public class ConnectionTable {
private static final Logger logger = LogService.getLogger();
/**
* warning when descriptor limit reached
*/
@MakeNotStatic
private static boolean ulimitWarningIssued;
/**
* true if the current thread wants non-shared resources
*/
private static final ThreadLocal<Boolean> threadWantsOwnResources = new ThreadLocal<>();
/**
* Used for messages whose order must be preserved Only connections used for sending messages, and
* receiving acks, will be put in this map.
* Value may be {@link Connection} or {@link PendingConnection}.
*/
private final Map<DistributedMember, Object> orderedConnectionMap = new ConcurrentHashMap<>();
/**
* ordered connections local to this thread.
*/
static final ThreadLocal<Map<DistributedMember, Connection>> threadOrderedConnMap =
new ThreadLocal<>();
/**
* Timer to kill idle threads. Guarded by this.
*/
private SystemTimer idleConnTimer;
/**
* Used to find connections owned by threads. The key is the same one used in
* threadOrderedConnMap. The value is an ArrayList since we can have any number of connections
* with the same key.
*/
private final ConcurrentMap<DistributedMember, List<Connection>> threadConnectionMap;
/**
* Used for all non-ordered messages. Only connections used for sending messages, and receiving
* acks, will be put in this map.
*/
private final Map<DistributedMember, Object> unorderedConnectionMap = new ConcurrentHashMap<>();
/**
* Used for all accepted connections. These connections are read only; we never send messages,
* except for acks; only receive. Consists of a list of Connection.
*/
private final List<Connection> receivers = new ArrayList<>();
/**
* the conduit for this table
*/
private final TCPConduit owner;
private final BufferPool bufferPool;
/**
* true if this table is no longer in use
*/
private volatile boolean closed;
/**
* Executor used by p2p reader and p2p handshaker threads.
*/
private final Executor p2pReaderThreadPool;
/**
* Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2
* minutes).
*/
private static final long READER_POOL_KEEP_ALIVE_TIME =
Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120);
private final SocketCloser socketCloser;
/**
* The most recent instance to be created
*
* <p>
* TODO this assumes no more than one instance is created at a time?
*/
@MakeNotStatic
private static final AtomicReference<ConnectionTable> lastInstance = new AtomicReference<>();
/**
* A set of sockets that are in the process of being connected
*/
private final Map<Socket, ConnectingSocketInfo> connectingSockets = new HashMap<>();
/**
* Cause calling thread to share communication resources with other threads.
*/
public static void threadWantsSharedResources() {
threadWantsOwnResources.set(Boolean.FALSE);
}
/**
* Cause calling thread to acquire exclusive access to communication resources. Exclusive access
* may not be available in which case this call is ignored.
*/
public static void threadWantsOwnResources() {
threadWantsOwnResources.set(Boolean.TRUE);
}
/**
* Returns true if calling thread owns its own communication resources.
*/
private boolean threadOwnsResources() {
DistributionManager d = getDM();
if (d != null) {
return d.getSystem().threadOwnsResources() && !AlertingAction.isThreadAlerting();
}
return false;
}
public static Boolean getThreadOwnsResourcesRegistration() {
return threadWantsOwnResources.get();
}
public TCPConduit getOwner() {
return owner;
}
public static ConnectionTable create(TCPConduit conduit) {
ConnectionTable ct = new ConnectionTable(conduit);
lastInstance.set(ct);
return ct;
}
private ConnectionTable(TCPConduit conduit) {
owner = conduit;
idleConnTimer = owner.idleConnectionTimeout != 0
? new SystemTimer(conduit.getDM().getSystem()) : null;
threadConnectionMap = new ConcurrentHashMap<>();
p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
socketCloser = new SocketCloser();
bufferPool = conduit.getBufferPool();
}
private Executor createThreadPoolForIO(boolean conserveSockets) {
if (conserveSockets) {
return LoggingExecutors.newThreadOnEachExecute("SharedP2PReader");
}
return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(1, MAX_VALUE,
READER_POOL_KEEP_ALIVE_TIME, SECONDS, "UnsharedP2PReader");
}
/** conduit calls acceptConnection after an accept */
void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
throws IOException, ConnectionException {
InetAddress connAddress = sock.getInetAddress();
boolean finishedConnecting = false;
Connection connection = null;
try {
connection = peerConnectionFactory.createReceiver(this, sock);
// check for shutdown (so it doesn't get missed in the finally block)
owner.getCancelCriterion().checkCancelInProgress(null);
finishedConnecting = true;
} catch (ConnectionException | IOException ex) {
// check for shutdown...
owner.getCancelCriterion().checkCancelInProgress(ex);
logger.warn("Failed to accept connection from {} because: {}",
connAddress != null ? connAddress : "unavailable address", ex);
throw ex;
} finally {
// note: no need to call incFailedAccept here because it will be done in our caller.
// no need to log error here since caller will log warning
if (connection != null && !finishedConnecting) {
// we must be throwing from checkCancelInProgress so close the connection
closeCon("cancel after accept", connection);
connection = null;
}
}
if (connection != null) {
synchronized (receivers) {
owner.getStats().incReceivers();
if (closed) {
closeCon("Connection table no longer in use", connection);
return;
}
// If connection.stopped is false, any connection cleanup thread will not yet have acquired
// the receiver synchronization to remove the receiver. Therefore we can safely add it here.
if (!(connection.isSocketClosed() || connection.isReceiverStopped())) {
receivers.add(connection);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Accepted {} myAddr={} theirAddr={}", connection, getConduit().getMemberId(),
connection.getRemoteAddress());
}
}
}
/**
* Process a newly created PendingConnection
*
* @param id DistributedMember on which the connection is created
* @param sharedResource whether the connection is used by multiple threads
* @param preserveOrder whether to preserve order
* @param m map to add the connection to
* @param pc the PendingConnection to process
* @param startTime the ms clock start time for the operation
* @param ackThreshold the ms ack-wait-threshold, or zero
* @param ackSAThreshold the ms ack-severe_alert-threshold, or zero
* @return the Connection, or null if someone else already created or closed it
* @throws IOException if unable to connect
*/
private Connection handleNewPendingConnection(InternalDistributedMember id,
boolean sharedResource,
boolean preserveOrder, Map<DistributedMember, Object> m, PendingConnection pc, long startTime,
long ackThreshold,
long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException {
// handle new pending connection
Connection con = null;
try {
long senderCreateStartTime = owner.getStats().startSenderCreate();
con = Connection.createSender(owner.getMembership(), this, preserveOrder, id,
sharedResource, startTime, ackThreshold, ackSAThreshold);
owner.getStats().incSenders(sharedResource, preserveOrder, senderCreateStartTime);
} finally {
// our connection failed to notify anyone waiting for our pending con
if (con == null) {
owner.getStats().incFailedConnect();
synchronized (m) {
Object rmObj = m.remove(id);
if (rmObj != pc && rmObj != null) {
// put it back since it was not our pc
m.put(id, rmObj);
}
}
pc.notifyWaiters(null);
// we must be throwing an exception
}
}
// Update our list of connections -- either the orderedConnectionMap or unorderedConnectionMap
//
// Note that we added the entry _before_ we attempted the connect,
// so it's possible something else got through in the mean time...
synchronized (m) {
Object e = m.get(id);
if (e == pc) {
m.put(id, con);
} else if (e == null) {
// someone closed our pending connection so cleanup the connection we created
con.requestClose("pending connection cancelled");
con = null;
} else {
if (e instanceof Connection) {
Connection newCon = (Connection) e;
if (!newCon.connected) {
// someone closed our pending connect so cleanup the connection we created
if (con != null) {
con.requestClose("pending connection closed");
con = null;
}
} else {
// This should not happen. It means that someone else created the connection which
// should only happen if our Connection was rejected.
if (con != null) {
con.requestClose("someone else created the connection");
}
con = newCon;
}
}
}
}
pc.notifyWaiters(con);
if (con != null && logger.isDebugEnabled()) {
logger.debug("handleNewPendingConnection {} myAddr={} theirAddr={}", con,
getConduit().getMemberId(), con.getRemoteAddress());
}
return con;
}
/**
* unordered or conserve-sockets=true note that unordered connections are currently always shared
*
* @param id the DistributedMember on which we are creating a connection
* @param scheduleTimeout whether unordered connection should time out
* @param preserveOrder whether to preserve order
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new Connection, or null if an error
* @throws IOException if unable to create the connection
*/
private Connection getSharedConnection(InternalDistributedMember id, boolean scheduleTimeout,
boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
final Map<DistributedMember, Object> m =
preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
// new connection, if needed
PendingConnection pc = null;
// existing connection (if we don't create a new one)
Object mEntry;
// Look for pending connection
synchronized (m) {
mEntry = m.get(id);
if (mEntry instanceof Connection) {
Connection existingCon = (Connection) mEntry;
if (!existingCon.connected) {
mEntry = null;
}
}
if (mEntry == null) {
pc = new PendingConnection(preserveOrder, id);
m.put(id, pc);
}
}
Connection result;
if (pc != null) {
if (logger.isDebugEnabled()) {
logger.debug("created PendingConnection {}", pc);
}
result = handleNewPendingConnection(id, true, preserveOrder, m, pc,
startTime, ackTimeout, ackSATimeout);
if (!preserveOrder && scheduleTimeout) {
scheduleIdleTimeout(result);
}
} else { // we have existing connection
if (mEntry instanceof PendingConnection) {
if (AlertingAction.isThreadAlerting()) {
// do not change the text of this exception - it is looked for in exception handlers
throw new IOException("Cannot form connection to alert listener " + id);
}
result = ((PendingConnection) mEntry).waitForConnect(owner.getMembership(),
startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
if (result != null) {
logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result,
getConduit().getMemberId(), result.getRemoteAddress());
} else {
logger.debug("getSharedConnection: Connect failed");
}
}
} else {
result = (Connection) mEntry;
}
} // we have existing connection
return result;
}
/**
* Must be looking for an ordered connection that this thread owns
*
* @param id stub on which to create the connection
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the connection, or null if an error
* @throws IOException if the connection could not be created
*/
Connection getThreadOwnedConnection(InternalDistributedMember id, long startTime, long ackTimeout,
long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
Connection result;
// Look for result in the thread local
Map<DistributedMember, Connection> m = threadOrderedConnMap.get();
if (m == null) {
// First time for this thread. Create thread local
m = new HashMap<>();
threadOrderedConnMap.set(m);
} else {
// No need to sync map since it is only referenced by ThreadLocal.
result = m.get(id);
if (result != null && !result.timedOut) {
return result;
}
}
// OK, we have to create a new connection.
long senderCreateStartTime = owner.getStats().startSenderCreate();
result = Connection.createSender(owner.getMembership(), this, true, id, false, startTime,
ackTimeout, ackSATimeout);
owner.getStats().incSenders(false, true, senderCreateStartTime);
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}", result);
}
// Update the list of connections owned by this thread....
final List<Connection> al =
JavaWorkarounds.computeIfAbsent(threadConnectionMap, id, k -> new ArrayList<>());
// Add our Connection to the list
synchronized (al) {
al.add(result);
}
// Finally, add the connection to our thread local map.
// No need to sync map since it is only referenced by ThreadLocal.
m.put(id, result);
scheduleIdleTimeout(result);
return result;
}
/** schedule an idle-connection timeout task */
private void scheduleIdleTimeout(Connection conn) {
if (conn == null) {
return;
}
// Set the idle timeout
if (owner.idleConnectionTimeout != 0) {
try {
synchronized (this) {
if (!closed) {
IdleConnTT task = new IdleConnTT(conn);
conn.setIdleTimeoutTask(task);
synchronized (task) {
if (!task.isCancelled()) {
getIdleConnTimer().scheduleAtFixedRate(task, owner.idleConnectionTimeout,
owner.idleConnectionTimeout);
}
}
}
}
} catch (IllegalStateException e) {
if (conn.isClosing()) {
// connection is closed before we schedule the timeout task,
// causing the task to be canceled
return;
}
logger.debug("Got an illegal state exception: {}", e.getMessage(), e);
// Unfortunately, cancelInProgress() is not set until *after* the shutdown message has been
// sent, so we need to check the "closeInProgress" bit instead.
owner.getCancelCriterion().checkCancelInProgress(null);
Throwable cause = owner.getShutdownCause();
if (cause == null) {
cause = e;
}
throw new DistributedSystemDisconnectedException("The distributed system is shutting down",
cause);
}
}
}
/**
* Get a new connection
*
* @param id the DistributedMember on which to create the connection
* @param preserveOrder whether order should be preserved
* @param startTime the ms clock start time
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new Connection, or null if a problem
* @throws IOException if the connection could not be created
*/
protected Connection get(InternalDistributedMember id, boolean preserveOrder, long startTime,
long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
if (closed) {
owner.getCancelCriterion().checkCancelInProgress(null);
throw new DistributedSystemDisconnectedException("Connection table is closed");
}
Connection result;
boolean threadOwnsResources = threadOwnsResources();
if (!preserveOrder || !threadOwnsResources) {
result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
ackSATimeout);
} else {
result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout);
}
if (result != null) {
Assert.assertTrue(result.getPreserveOrder() == preserveOrder);
}
return result;
}
synchronized void fileDescriptorsExhausted() {
if (!ulimitWarningIssued) {
ulimitWarningIssued = true;
logger.fatal(
"This process is out of file descriptors.This will hamper communications and slow down the system.Any conserve-sockets setting is now being ignored.Please consider raising the descriptor limit.This alert is only issued once per process.");
InternalDistributedSystem.getAnyInstance().setShareSockets(true);
}
}
TCPConduit getConduit() {
return owner;
}
BufferPool getBufferPool() {
return bufferPool;
}
public boolean isClosed() {
return closed;
}
private static void closeCon(String reason, Object c) {
closeCon(reason, c, false);
}
private static void closeCon(String reason, Object c, boolean beingSick) {
if (c == null) {
return;
}
if (c instanceof Connection) {
((Connection) c).closePartialConnect(reason, beingSick);
} else {
((PendingConnection) c).notifyWaiters(null);
}
}
/**
* returns the idle connection timer, or null if the connection table is closed. guarded by a sync
* on the connection table
*/
synchronized SystemTimer getIdleConnTimer() {
if (closed) {
return null;
}
if (idleConnTimer == null) {
idleConnTimer = new SystemTimer(getDM().getSystem());
}
return idleConnTimer;
}
protected void close() {
if (closed) {
return;
}
closed = true;
synchronized (this) {
if (idleConnTimer != null) {
idleConnTimer.cancel();
}
}
synchronized (orderedConnectionMap) {
for (Object o : orderedConnectionMap.values()) {
closeCon("Connection table being destroyed", o);
}
orderedConnectionMap.clear();
}
synchronized (unorderedConnectionMap) {
for (Object o : unorderedConnectionMap.values()) {
closeCon("Connection table being destroyed", o);
}
unorderedConnectionMap.clear();
}
if (threadConnectionMap != null) {
synchronized (threadConnectionMap) {
for (final List<Connection> connections : threadConnectionMap.values()) {
synchronized (connections) {
for (Object o : connections) {
closeCon("Connection table being destroyed", o);
}
}
}
threadConnectionMap.clear();
}
}
Executor localExec = p2pReaderThreadPool;
if (localExec != null) {
if (localExec instanceof ExecutorService) {
((ExecutorService) localExec).shutdown();
}
}
closeReceivers(false);
Map<DistributedMember, Connection> map = threadOrderedConnMap.get();
if (map != null) {
// No need to synchronize map since it is only referenced by ThreadLocal.
map.clear();
}
socketCloser.close();
}
public void executeCommand(Runnable runnable) {
Executor local = p2pReaderThreadPool;
if (local != null) {
local.execute(runnable);
}
}
/**
* Close all receiving threads. This is used during shutdown and is also used by a test hook that
* makes us deaf to incoming messages.
*
* @param beingSick a test hook to simulate a sick process
*/
void closeReceivers(boolean beingSick) {
synchronized (receivers) {
for (Iterator<Connection> it = receivers.iterator(); it.hasNext();) {
Connection con = it.next();
if (!beingSick || con.getPreserveOrder()) {
closeCon("Connection table being destroyed", con, beingSick);
it.remove();
}
}
// now close any sockets being formed
synchronized (connectingSockets) {
for (final Iterator<Map.Entry<Socket, ConnectingSocketInfo>> it =
connectingSockets.entrySet().iterator(); it.hasNext();) {
final Map.Entry<Socket, ?> entry = it.next();
try {
entry.getKey().close();
} catch (IOException e) {
// ignored - we're shutting down
}
it.remove();
}
}
}
}
void removeReceiver(Connection con) {
synchronized (receivers) {
receivers.remove(con);
}
}
/**
* remove an endpoint and notify the membership manager of the departure
*/
protected void removeEndpoint(DistributedMember stub, String reason) {
removeEndpoint(stub, reason, true);
}
void removeEndpoint(DistributedMember memberID, String reason, boolean notifyDisconnect) {
if (closed) {
return;
}
boolean needsRemoval = false;
synchronized (orderedConnectionMap) {
if (orderedConnectionMap.get(memberID) != null) {
needsRemoval = true;
}
}
if (!needsRemoval) {
synchronized (unorderedConnectionMap) {
if (unorderedConnectionMap.get(memberID) != null) {
needsRemoval = true;
}
}
}
if (!needsRemoval) {
final ConcurrentMap<DistributedMember, List<Connection>> cm = threadConnectionMap;
if (cm != null) {
final List<Connection> al = cm.get(memberID);
needsRemoval = al != null && !al.isEmpty();
}
}
if (needsRemoval) {
InternalDistributedMember remoteAddress = null;
synchronized (orderedConnectionMap) {
Object c = orderedConnectionMap.remove(memberID);
if (c instanceof Connection) {
remoteAddress = ((Connection) c).getRemoteAddress();
}
closeCon(reason, c);
}
synchronized (unorderedConnectionMap) {
Object c = unorderedConnectionMap.remove(memberID);
if (remoteAddress == null && c instanceof Connection) {
remoteAddress = ((Connection) c).getRemoteAddress();
}
closeCon(reason, c);
}
final ConcurrentMap<DistributedMember, List<Connection>> cm = threadConnectionMap;
if (cm != null) {
final List<Connection> al = cm.remove(memberID);
if (al != null) {
synchronized (al) {
for (Connection c : al) {
if (remoteAddress == null && c != null) {
remoteAddress = c.getRemoteAddress();
}
closeCon(reason, c);
}
al.clear();
}
}
}
// close any sockets that are in the process of being connected
final Set<Socket> toRemove = new HashSet<>();
synchronized (connectingSockets) {
for (final Iterator<Map.Entry<Socket, ConnectingSocketInfo>> it =
connectingSockets.entrySet().iterator(); it.hasNext();) {
final Map.Entry<Socket, ConnectingSocketInfo> entry = it.next();
final ConnectingSocketInfo info = entry.getValue();
if (info.peerAddress.equals(((MemberIdentifier) memberID).getInetAddress())) {
toRemove.add(entry.getKey());
it.remove();
}
}
}
for (final Socket sock : toRemove) {
try {
sock.close();
} catch (IOException e) {
if (logger.isDebugEnabled()) {
logger.debug("caught exception while trying to close connecting socket for {}",
memberID, e);
}
}
}
// close any receivers
// avoid deadlock when a NIC has failed by closing connections outside of the receivers sync
final Set<Connection> connectionsToClose = new HashSet<>();
synchronized (receivers) {
for (final Iterator<Connection> it = receivers.iterator(); it.hasNext();) {
final Connection con = it.next();
if (memberID.equals(con.getRemoteAddress())) {
it.remove();
connectionsToClose.add(con);
}
}
}
for (final Connection con : connectionsToClose) {
closeCon(reason, con);
}
if (notifyDisconnect) {
if (owner.getDM().shutdownInProgress()) {
throw new DistributedSystemDisconnectedException("Shutdown in progress",
owner.getDM().getDistribution().getShutdownCause());
}
}
if (remoteAddress != null) {
socketCloser.releaseResourcesForAddress(remoteAddress.toString());
}
}
}
SocketCloser getSocketCloser() {
return socketCloser;
}
/** check to see if there are still any receiver threads for the given end-point */
boolean hasReceiversFor(DistributedMember endPoint) {
synchronized (receivers) {
for (Connection receiver : receivers) {
if (endPoint.equals(receiver.getRemoteAddress())) {
return true;
}
}
}
return false;
}
private static void removeFromThreadConMap(
final ConcurrentMap<DistributedMember, List<Connection>> cm, final DistributedMember stub,
final Connection c) {
if (cm != null) {
List<Connection> al = cm.get(stub);
if (al != null) {
synchronized (al) {
al.remove(c);
}
}
}
}
void removeThreadConnection(final DistributedMember stub, final Connection c) {
removeFromThreadConMap(threadConnectionMap, stub, c);
final Map<DistributedMember, Connection> m = threadOrderedConnMap.get();
if (m != null) {
// No need to synchronize map since it is only referenced by ThreadLocal.
if (m.get(stub) == c) {
m.remove(stub);
}
}
}
void removeSharedConnection(String reason, DistributedMember stub, boolean ordered,
Connection c) {
if (closed) {
return;
}
if (ordered) {
synchronized (orderedConnectionMap) {
if (orderedConnectionMap.get(stub) == c) {
closeCon(reason, orderedConnectionMap.remove(stub));
}
}
} else {
synchronized (unorderedConnectionMap) {
if (unorderedConnectionMap.get(stub) == c) {
closeCon(reason, unorderedConnectionMap.remove(stub));
}
}
}
}
@VisibleForTesting
public static long getNumSenderSharedConnections() {
ConnectionTable ct = lastInstance.get();
if (ct == null) {
return 0;
}
return (ct.getConduit().getStats().getSendersSU());
}
/**
* Clears lastInstance. Does not yet close underlying sockets, but probably not strictly
* necessary.
*
* @see SystemFailure#emergencyClose()
*/
public static void emergencyClose() {
ConnectionTable ct = lastInstance.get();
if (ct == null) {
return;
}
lastInstance.set(null);
}
void removeAndCloseThreadOwnedSockets() {
Map<DistributedMember, Connection> m = threadOrderedConnMap.get();
if (m != null) {
// No need to synchronize map since it is only referenced by ThreadLocal.
for (final Iterator<Map.Entry<DistributedMember, Connection>> it =
m.entrySet().iterator(); it.hasNext();) {
final Map.Entry<DistributedMember, Connection> me = it.next();
DistributedMember stub = me.getKey();
Connection c = me.getValue();
removeFromThreadConMap(threadConnectionMap, stub, c);
it.remove();
closeCon("thread finalization", c);
}
}
}
public static void releaseThreadsSockets() {
ConnectionTable ct = lastInstance.get();
if (ct == null) {
return;
}
ct.removeAndCloseThreadOwnedSockets();
}
/**
* records the current outgoing message count on all thread-owned ordered connections. This does
* not synchronize or stop new connections from being formed or new messages from being sent
*
* @since GemFire 5.1
*/
void getThreadOwnedOrderedConnectionState(final DistributedMember member,
final Map<Long, Long> result) {
if (threadConnectionMap != null) {
List<Connection> al = threadConnectionMap.get(member);
if (al != null) {
synchronized (al) {
al = new ArrayList<>(al);
}
for (final Connection connection : al) {
if (!connection.isSharedResource() && connection.getOriginatedHere()
&& connection.getPreserveOrder()) {
result.put(connection.getUniqueId(), connection.getMessagesSent());
}
}
}
}
}
/**
* wait for the given incoming connections to receive at least the associated number of messages
*/
void waitForThreadOwnedOrderedConnectionState(final DistributedMember member,
final Map<Long, Long> connectionStates)
throws InterruptedException {
if (Thread.interrupted()) {
// wisest to do this before the synchronize below
throw new InterruptedException();
}
List<Connection> r;
synchronized (receivers) {
r = new ArrayList<>(receivers);
}
for (Connection con : r) {
if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
&& member.equals(con.getRemoteAddress())) {
final Long state = connectionStates.remove(con.getUniqueId());
if (state != null) {
long count = state;
while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) {
if (logger.isDebugEnabled()) {
logger.debug("Waiting for connection {}/{} currently={} need={}",
con.getRemoteAddress(), con.getUniqueId(), con.getMessagesReceived(), count);
}
Thread.sleep(100);
}
}
}
}
if (!connectionStates.isEmpty()) {
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(1000);
sb.append("These connections from ");
sb.append(member);
sb.append("could not be located during waitForThreadOwnedOrderedConnectionState: ");
for (Iterator<Map.Entry<Long, Long>> it = connectionStates.entrySet().iterator(); it
.hasNext();) {
Map.Entry<Long, Long> entry = it.next();
sb.append(entry.getKey()).append('(').append(entry.getValue()).append(')');
if (it.hasNext()) {
sb.append(',');
}
}
logger.debug(sb);
}
}
}
protected DistributionManager getDM() {
return owner.getDM();
}
/** keep track of a socket that is trying to connect() for shutdown purposes */
void addConnectingSocket(Socket socket, InetAddress addr) {
synchronized (connectingSockets) {
connectingSockets.put(socket, new ConnectingSocketInfo(addr));
}
}
/** remove a socket from the tracked set. It should be connected at this point */
void removeConnectingSocket(Socket socket) {
synchronized (connectingSockets) {
connectingSockets.remove(socket);
}
}
int getNumberOfReceivers() {
return receivers.size();
}
private class PendingConnection {
/**
* true if this connection is still pending
*/
private boolean pending = true;
/**
* the connection we are waiting on
*/
private Connection conn;
/**
* whether the connection preserves message ordering
*/
private final boolean preserveOrder;
/**
* the stub we are connecting to
*/
private final DistributedMember id;
private final Thread connectingThread;
private PendingConnection(boolean preserveOrder, DistributedMember id) {
this.preserveOrder = preserveOrder;
this.id = id;
connectingThread = Thread.currentThread();
}
/**
* Synchronously set the connection and notify waiters that we are ready.
*
* @param c the new connection
*/
private synchronized void notifyWaiters(Connection c) {
if (!pending) {
return;
}
conn = c;
pending = false;
if (logger.isDebugEnabled()) {
logger.debug("Notifying waiters that pending {} connection to {} is ready; {}",
preserveOrder ? "ordered" : "unordered", id, this);
}
notifyAll();
}
/**
* Wait for a connection
*
* @param mgr the membership manager that can instigate suspect processing if necessary
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new connection
*/
private synchronized Connection waitForConnect(Membership<InternalDistributedMember> mgr,
long startTime, long ackTimeout,
long ackSATimeout) {
if (connectingThread == Thread.currentThread()) {
throw new ReenteredConnectException("This thread is already trying to connect");
}
final Map<DistributedMember, Object> m =
preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
DistributedMember targetMember = null;
if (ackSATimeout > 0) {
targetMember = id;
}
boolean suspected = false;
boolean severeAlertIssued = false;
for (int attempt = 0;;) {
if (!pending) {
break;
}
getConduit().getCancelCriterion().checkCancelInProgress(null);
// wait a little bit...
boolean interrupted = Thread.interrupted();
try {
wait(100);
} catch (InterruptedException e) {
interrupted = true;
getConduit().getCancelCriterion().checkCancelInProgress(e);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
if (!pending) {
break;
}
// Still pending...
long now = System.currentTimeMillis();
if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) {
if (startTime + ackTimeout + ackSATimeout < now) {
if (targetMember != null) {
logger.fatal("Unable to form a TCP/IP connection to {} in over {} seconds",
targetMember, (ackSATimeout + ackTimeout) / 1000);
}
severeAlertIssued = true;
} else if (!suspected) {
logger.warn("Unable to form a TCP/IP connection to {} in over {} seconds",
id, ackTimeout / 1000);
mgr.suspectMember((InternalDistributedMember) targetMember,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
}
}
Object e = m.get(id);
if (e == this) {
attempt += 1;
if (logger.isDebugEnabled() && attempt % 20 == 1) {
logger.debug("Waiting for pending connection to complete: {} connection to {}; {}",
preserveOrder ? "ordered" : "unordered", id, this);
}
continue;
}
// Odd state change. Process and exit.
if (logger.isDebugEnabled()) {
logger.debug("Pending connection changed to {} unexpectedly", e);
}
if (e == null) {
// We were removed
notifyWaiters(null);
break;
}
if (e instanceof Connection) {
notifyWaiters((Connection) e);
break;
}
// defer to the new instance
return ((PendingConnection) e).waitForConnect(mgr, startTime, ackTimeout, ackSATimeout);
}
return conn;
}
public String toString() {
return super.toString() + " created by " + connectingThread.getName();
}
}
private static class IdleConnTT extends SystemTimer.SystemTimerTask {
private Connection connection;
private IdleConnTT(Connection c) {
connection = c;
}
@Override
public boolean cancel() {
Connection con = connection;
if (con != null) {
con.cleanUpOnIdleTaskCancel();
}
connection = null;
return super.cancel();
}
@Override
public void run2() {
Connection con = connection;
if (con != null) {
if (con.checkForIdleTimeout()) {
cancel();
}
}
}
}
private static class ConnectingSocketInfo {
private final InetAddress peerAddress;
private ConnectingSocketInfo(InetAddress addr) {
peerAddress = addr;
}
}
}