| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.tcp; |
| |
| import java.io.IOException; |
| import java.lang.ref.Reference; |
| import java.lang.ref.WeakReference; |
| import java.net.InetAddress; |
| import java.net.Socket; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.alerting.internal.spi.AlertingAction; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.distributed.internal.membership.MembershipManager; |
| import org.apache.geode.distributed.internal.membership.adapter.GMSMembershipManager; |
| import org.apache.geode.distributed.internal.membership.gms.api.Membership; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.SystemTimer; |
| import org.apache.geode.internal.logging.CoreLoggingExecutors; |
| import org.apache.geode.internal.net.BufferPool; |
| import org.apache.geode.internal.net.SocketCloser; |
| import org.apache.geode.logging.internal.executors.LoggingExecutors; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * <p> |
| * ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe |
| * between two endpoints represented by generic DistributedMembers. |
| * </p> |
| * |
| * @since GemFire 2.1 |
| */ |
| public class ConnectionTable { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** warning when descriptor limit reached */ |
| @MakeNotStatic |
| private static boolean ulimitWarningIssued; |
| |
| /** |
| * true if the current thread wants non-shared resources |
| */ |
| @MakeNotStatic |
| private static ThreadLocal threadWantsOwnResources = new ThreadLocal(); |
| |
| /** |
| * Used for messages whose order must be preserved Only connections used for sending messages, and |
| * receiving acks, will be put in this map. |
| */ |
| protected final Map orderedConnectionMap = new ConcurrentHashMap(); |
| |
| /** |
| * ordered connections local to this thread. Note that accesses to the resulting map must be |
| * synchronized because of static cleanup. |
| */ |
| // ThreadLocal<Map> |
| protected final ThreadLocal<Map> threadOrderedConnMap; |
| |
| /** |
| * List of thread-owned ordered connection maps, for cleanup |
| * |
| * Accesses to the maps in this list need to be synchronized on their instance. |
| */ |
| private final List threadConnMaps; |
| |
| /** |
| * Timer to kill idle threads |
| * |
| * guarded.By this |
| */ |
| private SystemTimer idleConnTimer; |
| |
| /** |
| * Used to find connections owned by threads. The key is the same one used in |
| * threadOrderedConnMap. The value is an ArrayList since we can have any number of connections |
| * with the same key. |
| */ |
| private ConcurrentMap threadConnectionMap; |
| |
| /** |
| * Used for all non-ordered messages. Only connections used for sending messages, and receiving |
| * acks, will be put in this map. |
| */ |
| protected final Map unorderedConnectionMap = new ConcurrentHashMap(); |
| |
| /** |
| * Used for all accepted connections. These connections are read only; we never send messages, |
| * except for acks; only receive. |
| * |
| * Consists of a list of Connection |
| */ |
| private final List receivers = new ArrayList(); |
| |
| /** |
| * the conduit for this table |
| */ |
| private final TCPConduit owner; |
| |
| private final BufferPool bufferPool; |
| |
| /** |
| * true if this table is no longer in use |
| */ |
| private volatile boolean closed = false; |
| |
| /** |
| * Executor used by p2p reader and p2p handshaker threads. |
| */ |
| private final Executor p2pReaderThreadPool; |
| /** |
| * Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2 |
| * minutes). |
| */ |
| private static final long READER_POOL_KEEP_ALIVE_TIME = |
| Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue(); |
| |
| private final SocketCloser socketCloser; |
| |
| /** |
| * The most recent instance to be created |
| * |
| * TODO this assumes no more than one instance is created at a time? |
| */ |
| @MakeNotStatic |
| private static final AtomicReference lastInstance = new AtomicReference(); |
| |
| /** |
| * A set of sockets that are in the process of being connected |
| */ |
| private Map connectingSockets = new HashMap(); |
| |
| /** |
| * Cause calling thread to share communication resources with other threads. |
| */ |
| public static void threadWantsSharedResources() { |
| threadWantsOwnResources.set(Boolean.FALSE); |
| } |
| |
| /** |
| * Cause calling thread to acquire exclusive access to communication resources. Exclusive access |
| * may not be available in which case this call is ignored. |
| */ |
| public static void threadWantsOwnResources() { |
| threadWantsOwnResources.set(Boolean.TRUE); |
| } |
| |
| /** |
| * Returns true if calling thread owns its own communication resources. |
| */ |
| boolean threadOwnsResources() { |
| DistributionManager d = getDM(); |
| if (d != null) { |
| return d.getSystem().threadOwnsResources() && !AlertingAction.isThreadAlerting(); |
| } |
| return false; |
| } |
| |
| public static Boolean getThreadOwnsResourcesRegistration() { |
| return (Boolean) threadWantsOwnResources.get(); |
| } |
| |
| public TCPConduit getOwner() { |
| return owner; |
| } |
| |
| |
| private ConnectionTable(TCPConduit conduit) throws IOException { |
| this.owner = conduit; |
| this.idleConnTimer = (this.owner.idleConnectionTimeout != 0) |
| ? new SystemTimer(conduit.getDM().getSystem(), true) : null; |
| this.threadOrderedConnMap = new ThreadLocal(); |
| this.threadConnMaps = new ArrayList(); |
| this.threadConnectionMap = new ConcurrentHashMap(); |
| this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets()); |
| this.socketCloser = new SocketCloser(); |
| this.bufferPool = new BufferPool(owner.getStats()); |
| } |
| |
| private Executor createThreadPoolForIO(boolean conserveSockets) { |
| if (conserveSockets) { |
| return LoggingExecutors.newThreadOnEachExecute("SharedP2PReader"); |
| } else { |
| return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("UnsharedP2PReader", 1, |
| Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME); |
| } |
| } |
| |
| /** conduit calls acceptConnection after an accept */ |
| protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory) |
| throws IOException, ConnectionException, InterruptedException { |
| InetAddress connAddress = sock.getInetAddress(); // for bug 44736 |
| boolean finishedConnecting = false; |
| Connection connection = null; |
| // boolean exceptionLogged = false; |
| try { |
| connection = peerConnectionFactory.createReceiver(this, sock); |
| |
| // check for shutdown (so it doesn't get missed in the finally block) |
| this.owner.getCancelCriterion().checkCancelInProgress(null); |
| finishedConnecting = true; |
| } catch (IOException ex) { |
| // check for shutdown... |
| this.owner.getCancelCriterion().checkCancelInProgress(ex); |
| logger.warn(String.format("Failed to accept connection from %s because: %s", |
| new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex})); |
| throw ex; |
| } catch (ConnectionException ex) { |
| // check for shutdown... |
| this.owner.getCancelCriterion().checkCancelInProgress(ex); |
| logger.warn(String.format("Failed to accept connection from %s because: %s", |
| new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex})); |
| throw ex; |
| } finally { |
| // note: no need to call incFailedAccept here because it will be done |
| // in our caller. |
| // no need to log error here since caller will log warning |
| |
| if (connection != null && !finishedConnecting) { |
| // we must be throwing from checkCancelInProgress so close the connection |
| closeCon("cancel after accept", |
| connection); |
| connection = null; |
| } |
| } |
| |
| if (connection != null) { |
| synchronized (this.receivers) { |
| this.owner.getStats().incReceivers(); |
| if (this.closed) { |
| closeCon("Connection table no longer in use", connection); |
| return; |
| } |
| // If connection.stopped is false, any connection cleanup thread will not yet have acquired |
| // the receiver synchronization to remove the receiver. Therefore we can safely add it here. |
| if (!(connection.isSocketClosed() || connection.isReceiverStopped())) { |
| this.receivers.add(connection); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Accepted {} myAddr={} theirAddr={}", connection, getConduit().getMemberId(), |
| connection.remoteAddr); |
| } |
| } |
| } |
| |
| |
| |
| /** |
| * Process a newly created PendingConnection |
| * |
| * @param id DistributedMember on which the connection is created |
| * @param sharedResource whether the connection is used by multiple threads |
| * @param preserveOrder whether to preserve order |
| * @param m map to add the connection to |
| * @param pc the PendingConnection to process |
| * @param startTime the ms clock start time for the operation |
| * @param ackThreshold the ms ack-wait-threshold, or zero |
| * @param ackSAThreshold the ms ack-severe_alert-threshold, or zero |
| * @return the Connection, or null if someone else already created or closed it |
| * @throws IOException if unable to connect |
| */ |
| private Connection handleNewPendingConnection(DistributedMember id, boolean sharedResource, |
| boolean preserveOrder, Map m, PendingConnection pc, long startTime, long ackThreshold, |
| long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException { |
| // handle new pending connection |
| Connection con = null; |
| try { |
| con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, id, |
| sharedResource, startTime, ackThreshold, ackSAThreshold); |
| this.owner.getStats().incSenders(sharedResource, preserveOrder); |
| } finally { |
| // our connection failed to notify anyone waiting for our pending con |
| if (con == null) { |
| this.owner.getStats().incFailedConnect(); |
| synchronized (m) { |
| Object rmObj = m.remove(id); |
| if (rmObj != pc && rmObj != null) { |
| // put it back since it was not our pc |
| m.put(id, rmObj); |
| } |
| } |
| pc.notifyWaiters(null); |
| // we must be throwing an exception |
| } |
| } // finally |
| |
| // Update our list of connections -- either the |
| // orderedConnectionMap or unorderedConnectionMap |
| // |
| // Note that we added the entry _before_ we attempted the connect, |
| // so it's possible something else got through in the mean time... |
| synchronized (m) { |
| Object e = m.get(id); |
| if (e == pc) { |
| m.put(id, con); |
| } else if (e == null) { |
| // someone closed our pending connection |
| // so cleanup the connection we created |
| con.requestClose( |
| "pending connection cancelled"); |
| con = null; |
| } else { |
| if (e instanceof Connection) { |
| Connection newCon = (Connection) e; |
| if (!newCon.connected) { |
| // Fix for bug 31590 |
| // someone closed our pending connect |
| // so cleanup the connection we created |
| if (con != null) { |
| con.requestClose( |
| "pending connection closed"); |
| con = null; |
| } |
| } else { |
| // This should not happen. It means that someone else |
| // created the connection which should only happen if |
| // our Connection was rejected. |
| // Assert.assertTrue(false); |
| // The above assertion was commented out to try the |
| // following with bug 32680 |
| if (con != null) { |
| con.requestClose("someone else created the connection"); |
| } |
| con = newCon; |
| } |
| } |
| } |
| } |
| pc.notifyWaiters(con); |
| if (con != null && logger.isDebugEnabled()) { |
| logger.debug("handleNewPendingConnection {} myAddr={} theirAddr={}", con, |
| getConduit().getMemberId(), con.remoteAddr); |
| } |
| |
| return con; |
| } |
| |
| /** |
| * unordered or conserve-sockets=true note that unordered connections are currently always shared |
| * |
| * @param id the DistributedMember on which we are creating a connection |
| * @param scheduleTimeout whether unordered connection should time out |
| * @param preserveOrder whether to preserve order |
| * @param startTime the ms clock start time for the operation |
| * @param ackTimeout the ms ack-wait-threshold, or zero |
| * @param ackSATimeout the ms ack-severe-alert-threshold, or zero |
| * @return the new Connection, or null if an error |
| * @throws IOException if unable to create the connection |
| */ |
| private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout, |
| boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) |
| throws IOException, DistributedSystemDisconnectedException { |
| Connection result = null; |
| |
| final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap; |
| |
| PendingConnection pc = null; // new connection, if needed |
| Object mEntry = null; // existing connection (if we don't create a new one) |
| |
| // Look for pending connection |
| synchronized (m) { |
| mEntry = m.get(id); |
| if (mEntry != null && (mEntry instanceof Connection)) { |
| Connection existingCon = (Connection) mEntry; |
| if (!existingCon.connected) { |
| mEntry = null; |
| } |
| } |
| if (mEntry == null) { |
| pc = new PendingConnection(preserveOrder, id); |
| m.put(id, pc); |
| } |
| } // synchronized |
| |
| if (pc != null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("created PendingConnection {}", pc); |
| } |
| result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc, |
| startTime, ackTimeout, ackSATimeout); |
| if (!preserveOrder && scheduleTimeout) { |
| scheduleIdleTimeout(result); |
| } |
| } else { // we have existing connection |
| if (mEntry instanceof PendingConnection) { |
| |
| if (AlertingAction.isThreadAlerting()) { |
| // do not change the text of this exception - it is looked for in exception handlers |
| throw new IOException("Cannot form connection to alert listener " + id); |
| } |
| |
| result = ((PendingConnection) mEntry).waitForConnect(this.owner.getMembershipManager(), |
| startTime, ackTimeout, ackSATimeout); |
| if (logger.isDebugEnabled()) { |
| if (result != null) { |
| logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result, |
| getConduit().getMemberId(), result.remoteAddr); |
| } else { |
| logger.debug("getSharedConnection: Connect failed"); |
| } |
| } |
| } else { |
| result = (Connection) mEntry; |
| } |
| } // we have existing connection |
| |
| return result; |
| } |
| |
| /** |
| * Must be looking for an ordered connection that this thread owns |
| * |
| * @param id stub on which to create the connection |
| * @param startTime the ms clock start time for the operation |
| * @param ackTimeout the ms ack-wait-threshold, or zero |
| * @param ackSATimeout the ms ack-severe-alert-threshold, or zero |
| * @return the connection, or null if an error |
| * @throws IOException if the connection could not be created |
| */ |
| Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout, |
| long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { |
| Connection result = null; |
| |
| // Look for result in the thread local |
| Map m = (Map) this.threadOrderedConnMap.get(); |
| if (m == null) { |
| // First time for this thread. Create thread local |
| m = new HashMap(); |
| synchronized (this.threadConnMaps) { |
| if (this.closed) { |
| owner.getCancelCriterion().checkCancelInProgress(null); |
| throw new DistributedSystemDisconnectedException( |
| "Connection table is closed"); |
| } |
| // check for stale references and remove them. |
| for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) { |
| Reference r = (Reference) it.next(); |
| if (r.get() == null) { |
| it.remove(); |
| } |
| } // for |
| this.threadConnMaps.add(new WeakReference(m)); // ref added for bug 38011 |
| } // synchronized |
| this.threadOrderedConnMap.set(m); |
| } else { |
| // Consult thread local. |
| synchronized (m) { |
| result = (Connection) m.get(id); |
| } |
| if (result != null && result.timedOut) { |
| result = null; |
| } |
| } |
| if (result != null) |
| return result; |
| |
| // OK, we have to create a new connection. |
| result = Connection.createSender(owner.getMembershipManager(), this, true /* preserveOrder */, |
| id, false /* shared */, startTime, ackTimeout, ackSATimeout); |
| if (logger.isDebugEnabled()) { |
| logger.debug("ConnectionTable: created an ordered connection: {}", result); |
| } |
| this.owner.getStats().incSenders(false/* shared */, true /* preserveOrder */); |
| |
| // Update the list of connections owned by this thread.... |
| |
| if (this.threadConnectionMap == null) { |
| // This instance is being destroyed; fail the operation |
| closeCon( |
| "Connection table being destroyed", |
| result); |
| return null; |
| } |
| |
| ArrayList al = (ArrayList) this.threadConnectionMap.get(id); |
| if (al == null) { |
| // First connection for this DistributedMember. Make sure list for this |
| // stub is created if it isn't already there. |
| al = new ArrayList(); |
| |
| // Since it's a concurrent map, we just try to put it and then |
| // return whichever we got. |
| Object o = this.threadConnectionMap.putIfAbsent(id, al); |
| if (o != null) { |
| al = (ArrayList) o; |
| } |
| } |
| |
| // Add our Connection to the list |
| synchronized (al) { |
| al.add(result); |
| } |
| |
| // Finally, add the connection to our thread local map. |
| synchronized (m) { |
| m.put(id, result); |
| } |
| |
| scheduleIdleTimeout(result); |
| return result; |
| } |
| |
| /** schedule an idle-connection timeout task */ |
| private void scheduleIdleTimeout(Connection conn) { |
| if (conn == null) { |
| // fix for bug 43529 |
| return; |
| } |
| // Set the idle timeout |
| if (this.owner.idleConnectionTimeout != 0) { |
| try { |
| synchronized (this) { |
| if (!this.closed) { |
| IdleConnTT task = new IdleConnTT(conn); |
| conn.setIdleTimeoutTask(task); |
| this.getIdleConnTimer().scheduleAtFixedRate(task, this.owner.idleConnectionTimeout, |
| this.owner.idleConnectionTimeout); |
| } |
| } |
| } catch (IllegalStateException e) { |
| if (conn.isClosing()) { |
| // bug #45077 - connection is closed before we schedule the timeout task, |
| // causing the task to be canceled |
| return; |
| } |
| logger.debug("Got an illegal state exception: {}", e.getMessage(), e); |
| // Unfortunately, cancelInProgress() is not set until *after* |
| // the shutdown message has been sent, so we need to check the |
| // "closeInProgress" bit instead. |
| owner.getCancelCriterion().checkCancelInProgress(null); |
| Throwable cause = owner.getShutdownCause(); |
| if (cause == null) { |
| cause = e; |
| } |
| throw new DistributedSystemDisconnectedException( |
| "The distributed system is shutting down", |
| cause); |
| } |
| } |
| } |
| |
| /** |
| * Get a new connection |
| * |
| * @param id the DistributedMember on which to create the connection |
| * @param preserveOrder whether order should be preserved |
| * @param startTime the ms clock start time |
| * @param ackTimeout the ms ack-wait-threshold, or zero |
| * @param ackSATimeout the ms ack-severe-alert-threshold, or zero |
| * @return the new Connection, or null if a problem |
| * @throws java.io.IOException if the connection could not be created |
| */ |
| protected Connection get(DistributedMember id, boolean preserveOrder, long startTime, |
| long ackTimeout, long ackSATimeout) |
| throws java.io.IOException, DistributedSystemDisconnectedException { |
| if (this.closed) { |
| this.owner.getCancelCriterion().checkCancelInProgress(null); |
| throw new DistributedSystemDisconnectedException( |
| "Connection table is closed"); |
| } |
| Connection result = null; |
| boolean threadOwnsResources = threadOwnsResources(); |
| if (!preserveOrder || !threadOwnsResources) { |
| result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout, |
| ackSATimeout); |
| } else { |
| result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout); |
| } |
| if (result != null) { |
| Assert.assertTrue(result.preserveOrder == preserveOrder); |
| } |
| return result; |
| } |
| |
| protected synchronized void fileDescriptorsExhausted() { |
| if (!ulimitWarningIssued) { |
| ulimitWarningIssued = true; |
| logger.fatal( |
| "This process is out of file descriptors.This will hamper communications and slow down the system.Any conserve-sockets setting is now being ignored.Please consider raising the descriptor limit.This alert is only issued once per process."); |
| InternalDistributedSystem.getAnyInstance().setShareSockets(true); |
| threadWantsOwnResources = new ThreadLocal(); |
| } |
| } |
| |
| protected TCPConduit getConduit() { |
| return owner; |
| } |
| |
| public BufferPool getBufferPool() { |
| return bufferPool; |
| } |
| |
| public boolean isClosed() { |
| return this.closed; |
| } |
| |
| private static void closeCon(String reason, Object c) { |
| closeCon(reason, c, false); |
| } |
| |
| private static void closeCon(String reason, Object c, boolean beingSick) { |
| if (c == null) { |
| return; |
| } |
| if (c instanceof Connection) { |
| ((Connection) c).closePartialConnect(reason, beingSick); // fix for bug 31666 |
| } else { |
| ((PendingConnection) c).notifyWaiters(null); |
| } |
| } |
| |
| /** |
| * returns the idle connection timer, or null if the connection table is closed. guarded by a sync |
| * on the connection table |
| */ |
| protected synchronized SystemTimer getIdleConnTimer() { |
| if (this.closed) { |
| return null; |
| } |
| if (this.idleConnTimer == null) { |
| this.idleConnTimer = new SystemTimer(getDM().getSystem(), true); |
| } |
| return this.idleConnTimer; |
| } |
| |
| protected void close() { |
| /* |
| * NOMUX if (inputMuxManager != null) { inputMuxManager.stop(); } |
| */ |
| if (this.closed) { |
| return; |
| } |
| this.closed = true; |
| synchronized (this) { |
| if (this.idleConnTimer != null) { |
| this.idleConnTimer.cancel(); |
| } |
| } |
| synchronized (this.orderedConnectionMap) { |
| for (Iterator it = this.orderedConnectionMap.values().iterator(); it.hasNext();) { |
| closeCon( |
| "Connection table being destroyed", |
| it.next()); |
| } |
| this.orderedConnectionMap.clear(); |
| } |
| synchronized (this.unorderedConnectionMap) { |
| for (Iterator it = this.unorderedConnectionMap.values().iterator(); it.hasNext();) { |
| closeCon( |
| "Connection table being destroyed", |
| it.next()); |
| } |
| this.unorderedConnectionMap.clear(); |
| } |
| if (this.threadConnectionMap != null) { |
| this.threadConnectionMap = null; |
| } |
| if (this.threadConnMaps != null) { |
| synchronized (this.threadConnMaps) { |
| for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) { |
| Reference r = (Reference) it.next(); |
| Map m = (Map) r.get(); |
| if (m != null) { |
| synchronized (m) { |
| for (Iterator mit = m.values().iterator(); mit.hasNext();) { |
| closeCon("Connection table being destroyed", mit.next()); |
| } |
| } |
| } |
| } |
| this.threadConnMaps.clear(); |
| } |
| } |
| { |
| Executor localExec = this.p2pReaderThreadPool; |
| if (localExec != null) { |
| if (localExec instanceof ExecutorService) { |
| ((ExecutorService) localExec).shutdown(); |
| } |
| } |
| } |
| closeReceivers(false); |
| |
| Map m = (Map) this.threadOrderedConnMap.get(); |
| if (m != null) { |
| synchronized (m) { |
| m.clear(); |
| } |
| } |
| this.socketCloser.close(); |
| } |
| |
| public void executeCommand(Runnable runnable) { |
| Executor local = this.p2pReaderThreadPool; |
| if (local != null) { |
| local.execute(runnable); |
| } |
| } |
| |
| /** |
| * Close all receiving threads. This is used during shutdown and is also used by a test hook that |
| * makes us deaf to incoming messages. |
| * |
| * @param beingSick a test hook to simulate a sick process |
| */ |
| protected void closeReceivers(boolean beingSick) { |
| synchronized (this.receivers) { |
| for (Iterator it = this.receivers.iterator(); it.hasNext();) { |
| Connection con = (Connection) it.next(); |
| if (!beingSick || con.preserveOrder) { |
| closeCon( |
| "Connection table being destroyed", |
| con, beingSick); |
| it.remove(); |
| } |
| } |
| // now close any sockets being formed |
| synchronized (connectingSockets) { |
| for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| // ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue(); |
| try { |
| ((Socket) entry.getKey()).close(); |
| } catch (IOException e) { |
| // ignored - we're shutting down |
| } |
| it.remove(); |
| } |
| } |
| } |
| } |
| |
| |
| protected void removeReceiver(Object con) { |
| synchronized (this.receivers) { |
| this.receivers.remove(con); |
| } |
| } |
| |
| /** |
| * Return true if our owner already knows that this endpoint is departing |
| */ |
| protected boolean isEndpointShuttingDown(DistributedMember id) { |
| return giveUpOnMember(owner.getDM().getMembershipManager(), id); |
| } |
| |
| protected boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) { |
| return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress(); |
| } |
| |
| /** remove an endpoint and notify the membership manager of the departure */ |
| protected void removeEndpoint(DistributedMember stub, String reason) { |
| removeEndpoint(stub, reason, true); |
| } |
| |
| protected void removeEndpoint(DistributedMember memberID, String reason, |
| boolean notifyDisconnect) { |
| if (this.closed) { |
| return; |
| } |
| boolean needsRemoval = false; |
| synchronized (this.orderedConnectionMap) { |
| if (this.orderedConnectionMap.get(memberID) != null) |
| needsRemoval = true; |
| } |
| if (!needsRemoval) { |
| synchronized (this.unorderedConnectionMap) { |
| if (this.unorderedConnectionMap.get(memberID) != null) |
| needsRemoval = true; |
| } |
| } |
| if (!needsRemoval) { |
| ConcurrentMap cm = this.threadConnectionMap; |
| if (cm != null) { |
| ArrayList al = (ArrayList) cm.get(memberID); |
| needsRemoval = al != null && al.size() > 0; |
| } |
| } |
| |
| if (needsRemoval) { |
| InternalDistributedMember remoteAddress = null; |
| synchronized (this.orderedConnectionMap) { |
| Object c = this.orderedConnectionMap.remove(memberID); |
| if (c instanceof Connection) { |
| remoteAddress = ((Connection) c).getRemoteAddress(); |
| } |
| closeCon(reason, c); |
| } |
| synchronized (this.unorderedConnectionMap) { |
| Object c = this.unorderedConnectionMap.remove(memberID); |
| if (remoteAddress == null && (c instanceof Connection)) { |
| remoteAddress = ((Connection) c).getRemoteAddress(); |
| } |
| closeCon(reason, c); |
| } |
| |
| { |
| ConcurrentMap cm = this.threadConnectionMap; |
| if (cm != null) { |
| ArrayList al = (ArrayList) cm.remove(memberID); |
| if (al != null) { |
| synchronized (al) { |
| for (Iterator it = al.iterator(); it.hasNext();) { |
| Object c = it.next(); |
| if (remoteAddress == null && (c instanceof Connection)) { |
| remoteAddress = ((Connection) c).getRemoteAddress(); |
| } |
| closeCon(reason, c); |
| } |
| al.clear(); |
| } |
| } |
| } |
| } |
| |
| // close any sockets that are in the process of being connected |
| Set toRemove = new HashSet(); |
| synchronized (connectingSockets) { |
| for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| ConnectingSocketInfo info = (ConnectingSocketInfo) entry.getValue(); |
| if (info.peerAddress.equals(((InternalDistributedMember) memberID).getInetAddress())) { |
| toRemove.add(entry.getKey()); |
| it.remove(); |
| } |
| } |
| } |
| for (Iterator it = toRemove.iterator(); it.hasNext();) { |
| Socket sock = (Socket) it.next(); |
| try { |
| sock.close(); |
| } catch (IOException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("caught exception while trying to close connecting socket for {}", |
| memberID, e); |
| } |
| } |
| } |
| |
| // close any receivers |
| // avoid deadlock when a NIC has failed by closing connections outside |
| // of the receivers sync (bug 38731) |
| toRemove.clear(); |
| synchronized (this.receivers) { |
| for (Iterator it = receivers.iterator(); it.hasNext();) { |
| Connection con = (Connection) it.next(); |
| if (memberID.equals(con.getRemoteAddress())) { |
| it.remove(); |
| toRemove.add(con); |
| } |
| } |
| } |
| for (Iterator it = toRemove.iterator(); it.hasNext();) { |
| Connection con = (Connection) it.next(); |
| closeCon(reason, con); |
| } |
| if (notifyDisconnect) { |
| // Before the removal of TCPConduit Stub addresses this used |
| // to call MembershipManager.getMemberForStub, which checked |
| // for a shutdown in progress and threw this exception: |
| if (owner.getDM().shutdownInProgress()) { |
| throw new DistributedSystemDisconnectedException("Shutdown in progress", |
| owner.getDM().getMembershipManager().getShutdownCause()); |
| } |
| } |
| |
| if (remoteAddress != null) { |
| this.socketCloser.releaseResourcesForAddress(remoteAddress.toString()); |
| } |
| } |
| } |
| |
| SocketCloser getSocketCloser() { |
| return this.socketCloser; |
| } |
| |
| /** check to see if there are still any receiver threads for the given end-point */ |
| protected boolean hasReceiversFor(DistributedMember endPoint) { |
| synchronized (this.receivers) { |
| for (Iterator it = receivers.iterator(); it.hasNext();) { |
| Connection con = (Connection) it.next(); |
| if (endPoint.equals(con.getRemoteAddress())) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub, |
| Connection c) { |
| if (cm != null) { |
| ArrayList al = (ArrayList) cm.get(stub); |
| if (al != null) { |
| synchronized (al) { |
| al.remove(c); |
| } |
| } |
| } |
| } |
| |
| protected void removeThreadConnection(DistributedMember stub, Connection c) { |
| /* |
| * if (this.closed) { return; } |
| */ |
| removeFromThreadConMap(this.threadConnectionMap, stub, c); |
| Map m = (Map) this.threadOrderedConnMap.get(); |
| if (m != null) { |
| // Static cleanup thread might intervene, so we MUST synchronize |
| synchronized (m) { |
| if (m.get(stub) == c) { |
| m.remove(stub); |
| } |
| } // synchronized |
| } // m != null |
| } |
| |
| void removeSharedConnection(String reason, DistributedMember stub, boolean ordered, |
| Connection c) { |
| if (this.closed) { |
| return; |
| } |
| if (ordered) { |
| synchronized (this.orderedConnectionMap) { |
| if (this.orderedConnectionMap.get(stub) == c) { |
| closeCon(reason, this.orderedConnectionMap.remove(stub)); |
| } |
| } |
| } else { |
| synchronized (this.unorderedConnectionMap) { |
| if (this.unorderedConnectionMap.get(stub) == c) { |
| closeCon(reason, this.unorderedConnectionMap.remove(stub)); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Just ensure that this class gets loaded. |
| * |
| * @see SystemFailure#loadEmergencyClasses() |
| */ |
| public static void loadEmergencyClasses() { |
| // don't go any further, Frodo! |
| } |
| |
| /** |
| * Clears lastInstance. Does not yet close underlying sockets, but probably not strictly |
| * necessary. |
| * |
| * @see SystemFailure#emergencyClose() |
| */ |
| public static void emergencyClose() { |
| ConnectionTable ct = (ConnectionTable) lastInstance.get(); |
| if (ct == null) { |
| return; |
| } |
| lastInstance.set(null); |
| } |
| |
| public void removeAndCloseThreadOwnedSockets() { |
| Map m = (Map) this.threadOrderedConnMap.get(); |
| if (m != null) { |
| // Static cleanup may intervene; we MUST synchronize. |
| synchronized (m) { |
| Iterator it = m.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry me = (Map.Entry) it.next(); |
| DistributedMember stub = (DistributedMember) me.getKey(); |
| Connection c = (Connection) me.getValue(); |
| removeFromThreadConMap(this.threadConnectionMap, stub, c); |
| it.remove(); |
| closeCon("thread finalization", c); |
| } // while |
| } // synchronized m |
| } |
| } |
| |
| public static void releaseThreadsSockets() { |
| ConnectionTable ct = (ConnectionTable) lastInstance.get(); |
| if (ct == null) { |
| return; |
| } |
| ct.removeAndCloseThreadOwnedSockets(); |
| // lastInstance = null; |
| } |
| |
| /** |
| * records the current outgoing message count on all thread-owned ordered connections. This does |
| * not synchronize or stop new connections from being formed or new messages from being sent |
| * |
| * @since GemFire 5.1 |
| */ |
| protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) { |
| |
| ConcurrentMap cm = this.threadConnectionMap; |
| if (cm != null) { |
| ArrayList al = (ArrayList) cm.get(member); |
| if (al != null) { |
| synchronized (al) { |
| al = new ArrayList(al); |
| } |
| |
| for (Iterator it = al.iterator(); it.hasNext();) { |
| Connection conn = (Connection) it.next(); |
| if (!conn.isSharedResource() && conn.getOriginatedHere() && conn.getPreserveOrder()) { |
| result.put(Long.valueOf(conn.getUniqueId()), Long.valueOf(conn.getMessagesSent())); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * wait for the given incoming connections to receive at least the associated number of messages |
| */ |
| protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member, |
| Map connectionStates) throws InterruptedException { |
| if (Thread.interrupted()) |
| throw new InterruptedException(); // wisest to do this before the synchronize below |
| List r = null; |
| synchronized (receivers) { |
| r = new ArrayList(receivers); |
| } |
| for (Iterator it = r.iterator(); it.hasNext();) { |
| Connection con = (Connection) it.next(); |
| if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder() |
| && member.equals(con.getRemoteAddress())) { |
| Long state = (Long) connectionStates.remove(Long.valueOf(con.getUniqueId())); |
| if (state != null) { |
| long count = state.longValue(); |
| while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Waiting for connection {}/{} currently={} need={}", |
| con.getRemoteAddress(), con.getUniqueId(), con.getMessagesReceived(), count); |
| } |
| Thread.sleep(100); |
| } |
| } |
| } |
| } |
| if (connectionStates.size() > 0) { |
| if (logger.isDebugEnabled()) { |
| StringBuffer sb = new StringBuffer(1000); |
| sb.append("These connections from "); |
| sb.append(member); |
| sb.append("could not be located during waitForThreadOwnedOrderedConnectionState: "); |
| for (Iterator it = connectionStates.entrySet().iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| sb.append(entry.getKey()).append('(').append(entry.getValue()).append(')'); |
| if (it.hasNext()) { |
| sb.append(','); |
| } |
| } |
| logger.debug(sb); |
| } |
| } |
| } |
| |
| protected DistributionManager getDM() { |
| return this.owner.getDM(); |
| } |
| |
| // public boolean isShuttingDown() { |
| // return this.owner.isShuttingDown(); |
| // } |
| |
| // protected void cleanupHighWater() { |
| // cleanup(highWater); |
| // } |
| |
| // protected void cleanupLowWater() { |
| // cleanup(lowWater); |
| // } |
| |
| // private void cleanup(int maxConnections) { |
| /* |
| * if (maxConnections == 0 || maxConnections >= connections.size()) { return; } while |
| * (connections.size() > maxConnections) { Connection oldest = null; synchronized(connections) { |
| * for (Iterator iter = connections.values().iterator(); iter.hasNext(); ) { Connection c = |
| * (Connection)iter.next(); if (oldest == null || c.getTimeStamp() < oldest.getTimeStamp()) { |
| * oldest = c; } } } // sanity check - don't close anything fresher than 10 seconds or // we'll |
| * start thrashing if (oldest.getTimeStamp() > (System.currentTimeMillis() - 10000)) { if |
| * (owner.lowWaterConnectionCount > 0) { owner.lowWaterConnectionCount += 10; } if |
| * (owner.highWaterConnectionCount > 0) { owner.highWaterConnectionCount += 10; } new Object[] { |
| * owner.lowWaterConnectionCount, owner.highWaterConnectionCount }); break; } if (oldest != null) |
| * { oldest.close(); } } |
| */ |
| // } |
| |
| /* |
| * public void dumpConnectionTable() { Iterator iter = connectionMap.keySet().iterator(); while |
| * (iter.hasNext()) { Object key = iter.next(); Object val = connectionMap.get(key); } } |
| */ |
| private /* static */ class PendingConnection { |
| /** |
| * true if this connection is still pending |
| */ |
| private boolean pending = true; |
| |
| /** |
| * the connection we are waiting on |
| */ |
| private Connection conn = null; |
| |
| /** |
| * whether the connection preserves message ordering |
| */ |
| private final boolean preserveOrder; |
| |
| /** |
| * the stub we are connecting to |
| */ |
| private final DistributedMember id; |
| |
| private final Thread connectingThread; |
| |
| public PendingConnection(boolean preserveOrder, DistributedMember id) { |
| this.preserveOrder = preserveOrder; |
| this.id = id; |
| this.connectingThread = Thread.currentThread(); |
| } |
| |
| /** |
| * Synchronously set the connection and notify waiters that we are ready. |
| * |
| * @param c the new connection |
| */ |
| public synchronized void notifyWaiters(Connection c) { |
| if (!this.pending) |
| return; // already done. |
| |
| this.conn = c; |
| this.pending = false; |
| if (logger.isDebugEnabled()) { |
| logger.debug("Notifying waiters that pending {} connection to {} is ready; {}", |
| ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this); |
| } |
| this.notifyAll(); |
| } |
| |
| /** |
| * Wait for a connection |
| * |
| * @param mgr the membership manager that can instigate suspect processing if necessary |
| * @param startTime the ms clock start time for the operation |
| * @param ackTimeout the ms ack-wait-threshold, or zero |
| * @param ackSATimeout the ms ack-severe-alert-threshold, or zero |
| * @return the new connection |
| */ |
| public synchronized Connection waitForConnect(Membership mgr, long startTime, |
| long ackTimeout, long ackSATimeout) throws IOException { |
| if (connectingThread == Thread.currentThread()) { |
| throw new ReenteredConnectException("This thread is already trying to connect"); |
| } |
| |
| final Map m = this.preserveOrder ? orderedConnectionMap : unorderedConnectionMap; |
| |
| boolean severeAlertIssued = false; |
| boolean suspected = false; |
| DistributedMember targetMember = null; |
| if (ackSATimeout > 0) { |
| targetMember = this.id; |
| } |
| |
| int attempt = 0; |
| for (;;) { |
| if (!this.pending) { |
| break; |
| } |
| getConduit().getCancelCriterion().checkCancelInProgress(null); |
| |
| // wait a little bit... |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.wait(100); // spurious wakeup ok |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| getConduit().getCancelCriterion().checkCancelInProgress(ignore); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| if (!this.pending) |
| break; |
| |
| // Still pending... |
| long now = System.currentTimeMillis(); |
| if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) { |
| if (startTime + ackTimeout + ackSATimeout < now) { |
| if (targetMember != null) { |
| logger.fatal("Unable to form a TCP/IP connection to {} in over {} seconds", |
| targetMember, (ackSATimeout + ackTimeout) / 1000); |
| } |
| severeAlertIssued = true; |
| } else if (!suspected) { |
| logger.warn("Unable to form a TCP/IP connection to %s in over %s seconds", |
| this.id, (ackTimeout) / 1000); |
| ((GMSMembershipManager) mgr).suspectMember((InternalDistributedMember) targetMember, |
| "Unable to form a TCP/IP connection in a reasonable amount of time"); |
| suspected = true; |
| } |
| } |
| |
| Object e; |
| // synchronized (m) { |
| e = m.get(this.id); |
| // } |
| if (e == this) { |
| attempt += 1; |
| if (logger.isDebugEnabled() && (attempt % 20 == 1)) { |
| logger.debug("Waiting for pending connection to complete: {} connection to {}; {}", |
| ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this); |
| } |
| continue; |
| } |
| |
| // Odd state change. Process and exit. |
| if (logger.isDebugEnabled()) { |
| logger.debug("Pending connection changed to {} unexpectedly", e); |
| } |
| |
| if (e == null) { |
| // We were removed |
| notifyWaiters(null); |
| break; |
| } else if (e instanceof Connection) { |
| notifyWaiters((Connection) e); |
| break; |
| } else { |
| // defer to the new instance |
| return ((PendingConnection) e).waitForConnect(mgr, startTime, ackTimeout, ackSATimeout); |
| } |
| |
| } // for |
| return this.conn; |
| |
| } |
| |
| public String toString() { |
| return super.toString() + " created by " + connectingThread.getName(); |
| } |
| } |
| |
| |
| private static class IdleConnTT extends SystemTimer.SystemTimerTask { |
| |
| private Connection c; |
| |
| IdleConnTT(Connection c) { |
| this.c = c; |
| } |
| |
| @Override |
| public boolean cancel() { |
| Connection con = this.c; |
| if (con != null) { |
| con.cleanUpOnIdleTaskCancel(); |
| } |
| this.c = null; |
| return super.cancel(); |
| } |
| |
| @Override |
| public void run2() { |
| Connection con = this.c; |
| if (con != null) { |
| if (con.checkForIdleTimeout()) { |
| cancel(); |
| } |
| } |
| } |
| } |
| |
| public static ConnectionTable create(TCPConduit conduit) throws IOException { |
| ConnectionTable ct = new ConnectionTable(conduit); |
| lastInstance.set(ct); |
| return ct; |
| } |
| |
| /** keep track of a socket that is trying to connect() for shutdown purposes */ |
| public void addConnectingSocket(Socket socket, InetAddress addr) { |
| synchronized (connectingSockets) { |
| connectingSockets.put(socket, new ConnectingSocketInfo(addr)); |
| } |
| } |
| |
| /** remove a socket from the tracked set. It should be connected at this point */ |
| public void removeConnectingSocket(Socket socket) { |
| synchronized (connectingSockets) { |
| connectingSockets.remove(socket); |
| } |
| } |
| |
| |
| private static class ConnectingSocketInfo { |
| InetAddress peerAddress; |
| Thread connectingThread; |
| |
| public ConnectingSocketInfo(InetAddress addr) { |
| this.peerAddress = addr; |
| this.connectingThread = Thread.currentThread(); |
| } |
| } |
| |
| public int getNumberOfReceivers() { |
| return receivers.size(); |
| } |
| } |