/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.tcp;

import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.Logger;

import org.apache.geode.SystemFailure;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.distributed.internal.membership.adapter.GMSMembershipManager;
import org.apache.geode.distributed.internal.membership.gms.api.Membership;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * <p>
 * ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe
 * between two endpoints represented by generic DistributedMembers.
 * </p>
 *
 * @since GemFire 2.1
 */
public class ConnectionTable {
  private static final Logger logger = LogService.getLogger();

  /** warning when descriptor limit reached */
  @MakeNotStatic
  private static boolean ulimitWarningIssued;

  /**
   * true if the current thread wants non-shared resources
   */
  @MakeNotStatic
  private static ThreadLocal threadWantsOwnResources = new ThreadLocal();

  /**
   * Used for messages whose order must be preserved Only connections used for sending messages, and
   * receiving acks, will be put in this map.
   */
  protected final Map orderedConnectionMap = new ConcurrentHashMap();

  /**
   * ordered connections local to this thread. Note that accesses to the resulting map must be
   * synchronized because of static cleanup.
   */
  // ThreadLocal<Map>
  protected final ThreadLocal<Map> threadOrderedConnMap;

  /**
   * List of thread-owned ordered connection maps, for cleanup
   *
   * Accesses to the maps in this list need to be synchronized on their instance.
   */
  private final List threadConnMaps;

  /**
   * Timer to kill idle threads
   *
   * guarded.By this
   */
  private SystemTimer idleConnTimer;

  /**
   * Used to find connections owned by threads. The key is the same one used in
   * threadOrderedConnMap. The value is an ArrayList since we can have any number of connections
   * with the same key.
   */
  private ConcurrentMap threadConnectionMap;

  /**
   * Used for all non-ordered messages. Only connections used for sending messages, and receiving
   * acks, will be put in this map.
   */
  protected final Map unorderedConnectionMap = new ConcurrentHashMap();

  /**
   * Used for all accepted connections. These connections are read only; we never send messages,
   * except for acks; only receive.
   *
   * Consists of a list of Connection
   */
  private final List receivers = new ArrayList();

  /**
   * the conduit for this table
   */
  private final TCPConduit owner;

  private final BufferPool bufferPool;

  /**
   * true if this table is no longer in use
   */
  private volatile boolean closed = false;

  /**
   * Executor used by p2p reader and p2p handshaker threads.
   */
  private final Executor p2pReaderThreadPool;
  /**
   * Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2
   * minutes).
   */
  private static final long READER_POOL_KEEP_ALIVE_TIME =
      Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue();

  private final SocketCloser socketCloser;

  /**
   * The most recent instance to be created
   *
   * TODO this assumes no more than one instance is created at a time?
   */
  @MakeNotStatic
  private static final AtomicReference lastInstance = new AtomicReference();

  /**
   * A set of sockets that are in the process of being connected
   */
  private Map connectingSockets = new HashMap();

  /**
   * Cause calling thread to share communication resources with other threads.
   */
  public static void threadWantsSharedResources() {
    threadWantsOwnResources.set(Boolean.FALSE);
  }

  /**
   * Cause calling thread to acquire exclusive access to communication resources. Exclusive access
   * may not be available in which case this call is ignored.
   */
  public static void threadWantsOwnResources() {
    threadWantsOwnResources.set(Boolean.TRUE);
  }

  /**
   * Returns true if calling thread owns its own communication resources.
   */
  boolean threadOwnsResources() {
    DistributionManager d = getDM();
    if (d != null) {
      return d.getSystem().threadOwnsResources() && !AlertingAction.isThreadAlerting();
    }
    return false;
  }

  public static Boolean getThreadOwnsResourcesRegistration() {
    return (Boolean) threadWantsOwnResources.get();
  }

  public TCPConduit getOwner() {
    return owner;
  }


  private ConnectionTable(TCPConduit conduit) throws IOException {
    this.owner = conduit;
    this.idleConnTimer = (this.owner.idleConnectionTimeout != 0)
        ? new SystemTimer(conduit.getDM().getSystem(), true) : null;
    this.threadOrderedConnMap = new ThreadLocal();
    this.threadConnMaps = new ArrayList();
    this.threadConnectionMap = new ConcurrentHashMap();
    this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
    this.socketCloser = new SocketCloser();
    this.bufferPool = new BufferPool(owner.getStats());
  }

  private Executor createThreadPoolForIO(boolean conserveSockets) {
    if (conserveSockets) {
      return LoggingExecutors.newThreadOnEachExecute("SharedP2PReader");
    } else {
      return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("UnsharedP2PReader", 1,
          Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME);
    }
  }

  /** conduit calls acceptConnection after an accept */
  protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
      throws IOException, ConnectionException, InterruptedException {
    InetAddress connAddress = sock.getInetAddress(); // for bug 44736
    boolean finishedConnecting = false;
    Connection connection = null;
    // boolean exceptionLogged = false;
    try {
      connection = peerConnectionFactory.createReceiver(this, sock);

      // check for shutdown (so it doesn't get missed in the finally block)
      this.owner.getCancelCriterion().checkCancelInProgress(null);
      finishedConnecting = true;
    } catch (IOException ex) {
      // check for shutdown...
      this.owner.getCancelCriterion().checkCancelInProgress(ex);
      logger.warn(String.format("Failed to accept connection from %s because: %s",
          new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
      throw ex;
    } catch (ConnectionException ex) {
      // check for shutdown...
      this.owner.getCancelCriterion().checkCancelInProgress(ex);
      logger.warn(String.format("Failed to accept connection from %s because: %s",
          new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
      throw ex;
    } finally {
      // note: no need to call incFailedAccept here because it will be done
      // in our caller.
      // no need to log error here since caller will log warning

      if (connection != null && !finishedConnecting) {
        // we must be throwing from checkCancelInProgress so close the connection
        closeCon("cancel after accept",
            connection);
        connection = null;
      }
    }

    if (connection != null) {
      synchronized (this.receivers) {
        this.owner.getStats().incReceivers();
        if (this.closed) {
          closeCon("Connection table no longer in use", connection);
          return;
        }
        // If connection.stopped is false, any connection cleanup thread will not yet have acquired
        // the receiver synchronization to remove the receiver. Therefore we can safely add it here.
        if (!(connection.isSocketClosed() || connection.isReceiverStopped())) {
          this.receivers.add(connection);
        }
      }
      if (logger.isDebugEnabled()) {
        logger.debug("Accepted {} myAddr={} theirAddr={}", connection, getConduit().getMemberId(),
            connection.remoteAddr);
      }
    }
  }



  /**
   * Process a newly created PendingConnection
   *
   * @param id DistributedMember on which the connection is created
   * @param sharedResource whether the connection is used by multiple threads
   * @param preserveOrder whether to preserve order
   * @param m map to add the connection to
   * @param pc the PendingConnection to process
   * @param startTime the ms clock start time for the operation
   * @param ackThreshold the ms ack-wait-threshold, or zero
   * @param ackSAThreshold the ms ack-severe_alert-threshold, or zero
   * @return the Connection, or null if someone else already created or closed it
   * @throws IOException if unable to connect
   */
  private Connection handleNewPendingConnection(DistributedMember id, boolean sharedResource,
      boolean preserveOrder, Map m, PendingConnection pc, long startTime, long ackThreshold,
      long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException {
    // handle new pending connection
    Connection con = null;
    try {
      con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, id,
          sharedResource, startTime, ackThreshold, ackSAThreshold);
      this.owner.getStats().incSenders(sharedResource, preserveOrder);
    } finally {
      // our connection failed to notify anyone waiting for our pending con
      if (con == null) {
        this.owner.getStats().incFailedConnect();
        synchronized (m) {
          Object rmObj = m.remove(id);
          if (rmObj != pc && rmObj != null) {
            // put it back since it was not our pc
            m.put(id, rmObj);
          }
        }
        pc.notifyWaiters(null);
        // we must be throwing an exception
      }
    } // finally

    // Update our list of connections -- either the
    // orderedConnectionMap or unorderedConnectionMap
    //
    // Note that we added the entry _before_ we attempted the connect,
    // so it's possible something else got through in the mean time...
    synchronized (m) {
      Object e = m.get(id);
      if (e == pc) {
        m.put(id, con);
      } else if (e == null) {
        // someone closed our pending connection
        // so cleanup the connection we created
        con.requestClose(
            "pending connection cancelled");
        con = null;
      } else {
        if (e instanceof Connection) {
          Connection newCon = (Connection) e;
          if (!newCon.connected) {
            // Fix for bug 31590
            // someone closed our pending connect
            // so cleanup the connection we created
            if (con != null) {
              con.requestClose(
                  "pending connection closed");
              con = null;
            }
          } else {
            // This should not happen. It means that someone else
            // created the connection which should only happen if
            // our Connection was rejected.
            // Assert.assertTrue(false);
            // The above assertion was commented out to try the
            // following with bug 32680
            if (con != null) {
              con.requestClose("someone else created the connection");
            }
            con = newCon;
          }
        }
      }
    }
    pc.notifyWaiters(con);
    if (con != null && logger.isDebugEnabled()) {
      logger.debug("handleNewPendingConnection {} myAddr={} theirAddr={}", con,
          getConduit().getMemberId(), con.remoteAddr);
    }

    return con;
  }

  /**
   * unordered or conserve-sockets=true note that unordered connections are currently always shared
   *
   * @param id the DistributedMember on which we are creating a connection
   * @param scheduleTimeout whether unordered connection should time out
   * @param preserveOrder whether to preserve order
   * @param startTime the ms clock start time for the operation
   * @param ackTimeout the ms ack-wait-threshold, or zero
   * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
   * @return the new Connection, or null if an error
   * @throws IOException if unable to create the connection
   */
  private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout,
      boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
      throws IOException, DistributedSystemDisconnectedException {
    Connection result = null;

    final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;

    PendingConnection pc = null; // new connection, if needed
    Object mEntry = null; // existing connection (if we don't create a new one)

    // Look for pending connection
    synchronized (m) {
      mEntry = m.get(id);
      if (mEntry != null && (mEntry instanceof Connection)) {
        Connection existingCon = (Connection) mEntry;
        if (!existingCon.connected) {
          mEntry = null;
        }
      }
      if (mEntry == null) {
        pc = new PendingConnection(preserveOrder, id);
        m.put(id, pc);
      }
    } // synchronized

    if (pc != null) {
      if (logger.isDebugEnabled()) {
        logger.debug("created PendingConnection {}", pc);
      }
      result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
          startTime, ackTimeout, ackSATimeout);
      if (!preserveOrder && scheduleTimeout) {
        scheduleIdleTimeout(result);
      }
    } else { // we have existing connection
      if (mEntry instanceof PendingConnection) {

        if (AlertingAction.isThreadAlerting()) {
          // do not change the text of this exception - it is looked for in exception handlers
          throw new IOException("Cannot form connection to alert listener " + id);
        }

        result = ((PendingConnection) mEntry).waitForConnect(this.owner.getMembershipManager(),
            startTime, ackTimeout, ackSATimeout);
        if (logger.isDebugEnabled()) {
          if (result != null) {
            logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result,
                getConduit().getMemberId(), result.remoteAddr);
          } else {
            logger.debug("getSharedConnection: Connect failed");
          }
        }
      } else {
        result = (Connection) mEntry;
      }
    } // we have existing connection

    return result;
  }

  /**
   * Must be looking for an ordered connection that this thread owns
   *
   * @param id stub on which to create the connection
   * @param startTime the ms clock start time for the operation
   * @param ackTimeout the ms ack-wait-threshold, or zero
   * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
   * @return the connection, or null if an error
   * @throws IOException if the connection could not be created
   */
  Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
      long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
    Connection result = null;

    // Look for result in the thread local
    Map m = (Map) this.threadOrderedConnMap.get();
    if (m == null) {
      // First time for this thread. Create thread local
      m = new HashMap();
      synchronized (this.threadConnMaps) {
        if (this.closed) {
          owner.getCancelCriterion().checkCancelInProgress(null);
          throw new DistributedSystemDisconnectedException(
              "Connection table is closed");
        }
        // check for stale references and remove them.
        for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
          Reference r = (Reference) it.next();
          if (r.get() == null) {
            it.remove();
          }
        } // for
        this.threadConnMaps.add(new WeakReference(m)); // ref added for bug 38011
      } // synchronized
      this.threadOrderedConnMap.set(m);
    } else {
      // Consult thread local.
      synchronized (m) {
        result = (Connection) m.get(id);
      }
      if (result != null && result.timedOut) {
        result = null;
      }
    }
    if (result != null)
      return result;

    // OK, we have to create a new connection.
    result = Connection.createSender(owner.getMembershipManager(), this, true /* preserveOrder */,
        id, false /* shared */, startTime, ackTimeout, ackSATimeout);
    if (logger.isDebugEnabled()) {
      logger.debug("ConnectionTable: created an ordered connection: {}", result);
    }
    this.owner.getStats().incSenders(false/* shared */, true /* preserveOrder */);

    // Update the list of connections owned by this thread....

    if (this.threadConnectionMap == null) {
      // This instance is being destroyed; fail the operation
      closeCon(
          "Connection table being destroyed",
          result);
      return null;
    }

    ArrayList al = (ArrayList) this.threadConnectionMap.get(id);
    if (al == null) {
      // First connection for this DistributedMember. Make sure list for this
      // stub is created if it isn't already there.
      al = new ArrayList();

      // Since it's a concurrent map, we just try to put it and then
      // return whichever we got.
      Object o = this.threadConnectionMap.putIfAbsent(id, al);
      if (o != null) {
        al = (ArrayList) o;
      }
    }

    // Add our Connection to the list
    synchronized (al) {
      al.add(result);
    }

    // Finally, add the connection to our thread local map.
    synchronized (m) {
      m.put(id, result);
    }

    scheduleIdleTimeout(result);
    return result;
  }

  /** schedule an idle-connection timeout task */
  private void scheduleIdleTimeout(Connection conn) {
    if (conn == null) {
      // fix for bug 43529
      return;
    }
    // Set the idle timeout
    if (this.owner.idleConnectionTimeout != 0) {
      try {
        synchronized (this) {
          if (!this.closed) {
            IdleConnTT task = new IdleConnTT(conn);
            conn.setIdleTimeoutTask(task);
            this.getIdleConnTimer().scheduleAtFixedRate(task, this.owner.idleConnectionTimeout,
                this.owner.idleConnectionTimeout);
          }
        }
      } catch (IllegalStateException e) {
        if (conn.isClosing()) {
          // bug #45077 - connection is closed before we schedule the timeout task,
          // causing the task to be canceled
          return;
        }
        logger.debug("Got an illegal state exception: {}", e.getMessage(), e);
        // Unfortunately, cancelInProgress() is not set until *after*
        // the shutdown message has been sent, so we need to check the
        // "closeInProgress" bit instead.
        owner.getCancelCriterion().checkCancelInProgress(null);
        Throwable cause = owner.getShutdownCause();
        if (cause == null) {
          cause = e;
        }
        throw new DistributedSystemDisconnectedException(
            "The distributed system is shutting down",
            cause);
      }
    }
  }

  /**
   * Get a new connection
   *
   * @param id the DistributedMember on which to create the connection
   * @param preserveOrder whether order should be preserved
   * @param startTime the ms clock start time
   * @param ackTimeout the ms ack-wait-threshold, or zero
   * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
   * @return the new Connection, or null if a problem
   * @throws java.io.IOException if the connection could not be created
   */
  protected Connection get(DistributedMember id, boolean preserveOrder, long startTime,
      long ackTimeout, long ackSATimeout)
      throws java.io.IOException, DistributedSystemDisconnectedException {
    if (this.closed) {
      this.owner.getCancelCriterion().checkCancelInProgress(null);
      throw new DistributedSystemDisconnectedException(
          "Connection table is closed");
    }
    Connection result = null;
    boolean threadOwnsResources = threadOwnsResources();
    if (!preserveOrder || !threadOwnsResources) {
      result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
          ackSATimeout);
    } else {
      result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout);
    }
    if (result != null) {
      Assert.assertTrue(result.preserveOrder == preserveOrder);
    }
    return result;
  }

  protected synchronized void fileDescriptorsExhausted() {
    if (!ulimitWarningIssued) {
      ulimitWarningIssued = true;
      logger.fatal(
          "This process is out of file descriptors.This will hamper communications and slow down the system.Any conserve-sockets setting is now being ignored.Please consider raising the descriptor limit.This alert is only issued once per process.");
      InternalDistributedSystem.getAnyInstance().setShareSockets(true);
      threadWantsOwnResources = new ThreadLocal();
    }
  }

  protected TCPConduit getConduit() {
    return owner;
  }

  public BufferPool getBufferPool() {
    return bufferPool;
  }

  public boolean isClosed() {
    return this.closed;
  }

  private static void closeCon(String reason, Object c) {
    closeCon(reason, c, false);
  }

  private static void closeCon(String reason, Object c, boolean beingSick) {
    if (c == null) {
      return;
    }
    if (c instanceof Connection) {
      ((Connection) c).closePartialConnect(reason, beingSick); // fix for bug 31666
    } else {
      ((PendingConnection) c).notifyWaiters(null);
    }
  }

  /**
   * returns the idle connection timer, or null if the connection table is closed. guarded by a sync
   * on the connection table
   */
  protected synchronized SystemTimer getIdleConnTimer() {
    if (this.closed) {
      return null;
    }
    if (this.idleConnTimer == null) {
      this.idleConnTimer = new SystemTimer(getDM().getSystem(), true);
    }
    return this.idleConnTimer;
  }

  protected void close() {
    /*
     * NOMUX if (inputMuxManager != null) { inputMuxManager.stop(); }
     */
    if (this.closed) {
      return;
    }
    this.closed = true;
    synchronized (this) {
      if (this.idleConnTimer != null) {
        this.idleConnTimer.cancel();
      }
    }
    synchronized (this.orderedConnectionMap) {
      for (Iterator it = this.orderedConnectionMap.values().iterator(); it.hasNext();) {
        closeCon(
            "Connection table being destroyed",
            it.next());
      }
      this.orderedConnectionMap.clear();
    }
    synchronized (this.unorderedConnectionMap) {
      for (Iterator it = this.unorderedConnectionMap.values().iterator(); it.hasNext();) {
        closeCon(
            "Connection table being destroyed",
            it.next());
      }
      this.unorderedConnectionMap.clear();
    }
    if (this.threadConnectionMap != null) {
      this.threadConnectionMap = null;
    }
    if (this.threadConnMaps != null) {
      synchronized (this.threadConnMaps) {
        for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
          Reference r = (Reference) it.next();
          Map m = (Map) r.get();
          if (m != null) {
            synchronized (m) {
              for (Iterator mit = m.values().iterator(); mit.hasNext();) {
                closeCon("Connection table being destroyed", mit.next());
              }
            }
          }
        }
        this.threadConnMaps.clear();
      }
    }
    {
      Executor localExec = this.p2pReaderThreadPool;
      if (localExec != null) {
        if (localExec instanceof ExecutorService) {
          ((ExecutorService) localExec).shutdown();
        }
      }
    }
    closeReceivers(false);

    Map m = (Map) this.threadOrderedConnMap.get();
    if (m != null) {
      synchronized (m) {
        m.clear();
      }
    }
    this.socketCloser.close();
  }

  public void executeCommand(Runnable runnable) {
    Executor local = this.p2pReaderThreadPool;
    if (local != null) {
      local.execute(runnable);
    }
  }

  /**
   * Close all receiving threads. This is used during shutdown and is also used by a test hook that
   * makes us deaf to incoming messages.
   *
   * @param beingSick a test hook to simulate a sick process
   */
  protected void closeReceivers(boolean beingSick) {
    synchronized (this.receivers) {
      for (Iterator it = this.receivers.iterator(); it.hasNext();) {
        Connection con = (Connection) it.next();
        if (!beingSick || con.preserveOrder) {
          closeCon(
              "Connection table being destroyed",
              con, beingSick);
          it.remove();
        }
      }
      // now close any sockets being formed
      synchronized (connectingSockets) {
        for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
          Map.Entry entry = (Map.Entry) it.next();
          // ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
          try {
            ((Socket) entry.getKey()).close();
          } catch (IOException e) {
            // ignored - we're shutting down
          }
          it.remove();
        }
      }
    }
  }


  protected void removeReceiver(Object con) {
    synchronized (this.receivers) {
      this.receivers.remove(con);
    }
  }

  /**
   * Return true if our owner already knows that this endpoint is departing
   */
  protected boolean isEndpointShuttingDown(DistributedMember id) {
    return giveUpOnMember(owner.getDM().getMembershipManager(), id);
  }

  protected boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) {
    return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
  }

  /** remove an endpoint and notify the membership manager of the departure */
  protected void removeEndpoint(DistributedMember stub, String reason) {
    removeEndpoint(stub, reason, true);
  }

  protected void removeEndpoint(DistributedMember memberID, String reason,
      boolean notifyDisconnect) {
    if (this.closed) {
      return;
    }
    boolean needsRemoval = false;
    synchronized (this.orderedConnectionMap) {
      if (this.orderedConnectionMap.get(memberID) != null)
        needsRemoval = true;
    }
    if (!needsRemoval) {
      synchronized (this.unorderedConnectionMap) {
        if (this.unorderedConnectionMap.get(memberID) != null)
          needsRemoval = true;
      }
    }
    if (!needsRemoval) {
      ConcurrentMap cm = this.threadConnectionMap;
      if (cm != null) {
        ArrayList al = (ArrayList) cm.get(memberID);
        needsRemoval = al != null && al.size() > 0;
      }
    }

    if (needsRemoval) {
      InternalDistributedMember remoteAddress = null;
      synchronized (this.orderedConnectionMap) {
        Object c = this.orderedConnectionMap.remove(memberID);
        if (c instanceof Connection) {
          remoteAddress = ((Connection) c).getRemoteAddress();
        }
        closeCon(reason, c);
      }
      synchronized (this.unorderedConnectionMap) {
        Object c = this.unorderedConnectionMap.remove(memberID);
        if (remoteAddress == null && (c instanceof Connection)) {
          remoteAddress = ((Connection) c).getRemoteAddress();
        }
        closeCon(reason, c);
      }

      {
        ConcurrentMap cm = this.threadConnectionMap;
        if (cm != null) {
          ArrayList al = (ArrayList) cm.remove(memberID);
          if (al != null) {
            synchronized (al) {
              for (Iterator it = al.iterator(); it.hasNext();) {
                Object c = it.next();
                if (remoteAddress == null && (c instanceof Connection)) {
                  remoteAddress = ((Connection) c).getRemoteAddress();
                }
                closeCon(reason, c);
              }
              al.clear();
            }
          }
        }
      }

      // close any sockets that are in the process of being connected
      Set toRemove = new HashSet();
      synchronized (connectingSockets) {
        for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
          Map.Entry entry = (Map.Entry) it.next();
          ConnectingSocketInfo info = (ConnectingSocketInfo) entry.getValue();
          if (info.peerAddress.equals(((InternalDistributedMember) memberID).getInetAddress())) {
            toRemove.add(entry.getKey());
            it.remove();
          }
        }
      }
      for (Iterator it = toRemove.iterator(); it.hasNext();) {
        Socket sock = (Socket) it.next();
        try {
          sock.close();
        } catch (IOException e) {
          if (logger.isDebugEnabled()) {
            logger.debug("caught exception while trying to close connecting socket for {}",
                memberID, e);
          }
        }
      }

      // close any receivers
      // avoid deadlock when a NIC has failed by closing connections outside
      // of the receivers sync (bug 38731)
      toRemove.clear();
      synchronized (this.receivers) {
        for (Iterator it = receivers.iterator(); it.hasNext();) {
          Connection con = (Connection) it.next();
          if (memberID.equals(con.getRemoteAddress())) {
            it.remove();
            toRemove.add(con);
          }
        }
      }
      for (Iterator it = toRemove.iterator(); it.hasNext();) {
        Connection con = (Connection) it.next();
        closeCon(reason, con);
      }
      if (notifyDisconnect) {
        // Before the removal of TCPConduit Stub addresses this used
        // to call MembershipManager.getMemberForStub, which checked
        // for a shutdown in progress and threw this exception:
        if (owner.getDM().shutdownInProgress()) {
          throw new DistributedSystemDisconnectedException("Shutdown in progress",
              owner.getDM().getMembershipManager().getShutdownCause());
        }
      }

      if (remoteAddress != null) {
        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
      }
    }
  }

  SocketCloser getSocketCloser() {
    return this.socketCloser;
  }

  /** check to see if there are still any receiver threads for the given end-point */
  protected boolean hasReceiversFor(DistributedMember endPoint) {
    synchronized (this.receivers) {
      for (Iterator it = receivers.iterator(); it.hasNext();) {
        Connection con = (Connection) it.next();
        if (endPoint.equals(con.getRemoteAddress())) {
          return true;
        }
      }
    }
    return false;
  }

  private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub,
      Connection c) {
    if (cm != null) {
      ArrayList al = (ArrayList) cm.get(stub);
      if (al != null) {
        synchronized (al) {
          al.remove(c);
        }
      }
    }
  }

  protected void removeThreadConnection(DistributedMember stub, Connection c) {
    /*
     * if (this.closed) { return; }
     */
    removeFromThreadConMap(this.threadConnectionMap, stub, c);
    Map m = (Map) this.threadOrderedConnMap.get();
    if (m != null) {
      // Static cleanup thread might intervene, so we MUST synchronize
      synchronized (m) {
        if (m.get(stub) == c) {
          m.remove(stub);
        }
      } // synchronized
    } // m != null
  }

  void removeSharedConnection(String reason, DistributedMember stub, boolean ordered,
      Connection c) {
    if (this.closed) {
      return;
    }
    if (ordered) {
      synchronized (this.orderedConnectionMap) {
        if (this.orderedConnectionMap.get(stub) == c) {
          closeCon(reason, this.orderedConnectionMap.remove(stub));
        }
      }
    } else {
      synchronized (this.unorderedConnectionMap) {
        if (this.unorderedConnectionMap.get(stub) == c) {
          closeCon(reason, this.unorderedConnectionMap.remove(stub));
        }
      }
    }
  }

  /**
   * Just ensure that this class gets loaded.
   *
   * @see SystemFailure#loadEmergencyClasses()
   */
  public static void loadEmergencyClasses() {
    // don't go any further, Frodo!
  }

  /**
   * Clears lastInstance. Does not yet close underlying sockets, but probably not strictly
   * necessary.
   *
   * @see SystemFailure#emergencyClose()
   */
  public static void emergencyClose() {
    ConnectionTable ct = (ConnectionTable) lastInstance.get();
    if (ct == null) {
      return;
    }
    lastInstance.set(null);
  }

  public void removeAndCloseThreadOwnedSockets() {
    Map m = (Map) this.threadOrderedConnMap.get();
    if (m != null) {
      // Static cleanup may intervene; we MUST synchronize.
      synchronized (m) {
        Iterator it = m.entrySet().iterator();
        while (it.hasNext()) {
          Map.Entry me = (Map.Entry) it.next();
          DistributedMember stub = (DistributedMember) me.getKey();
          Connection c = (Connection) me.getValue();
          removeFromThreadConMap(this.threadConnectionMap, stub, c);
          it.remove();
          closeCon("thread finalization", c);
        } // while
      } // synchronized m
    }
  }

  public static void releaseThreadsSockets() {
    ConnectionTable ct = (ConnectionTable) lastInstance.get();
    if (ct == null) {
      return;
    }
    ct.removeAndCloseThreadOwnedSockets();
    // lastInstance = null;
  }

  /**
   * records the current outgoing message count on all thread-owned ordered connections. This does
   * not synchronize or stop new connections from being formed or new messages from being sent
   *
   * @since GemFire 5.1
   */
  protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {

    ConcurrentMap cm = this.threadConnectionMap;
    if (cm != null) {
      ArrayList al = (ArrayList) cm.get(member);
      if (al != null) {
        synchronized (al) {
          al = new ArrayList(al);
        }

        for (Iterator it = al.iterator(); it.hasNext();) {
          Connection conn = (Connection) it.next();
          if (!conn.isSharedResource() && conn.getOriginatedHere() && conn.getPreserveOrder()) {
            result.put(Long.valueOf(conn.getUniqueId()), Long.valueOf(conn.getMessagesSent()));
          }
        }
      }
    }
  }

  /**
   * wait for the given incoming connections to receive at least the associated number of messages
   */
  protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member,
      Map connectionStates) throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException(); // wisest to do this before the synchronize below
    List r = null;
    synchronized (receivers) {
      r = new ArrayList(receivers);
    }
    for (Iterator it = r.iterator(); it.hasNext();) {
      Connection con = (Connection) it.next();
      if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
          && member.equals(con.getRemoteAddress())) {
        Long state = (Long) connectionStates.remove(Long.valueOf(con.getUniqueId()));
        if (state != null) {
          long count = state.longValue();
          while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) {
            if (logger.isDebugEnabled()) {
              logger.debug("Waiting for connection {}/{} currently={} need={}",
                  con.getRemoteAddress(), con.getUniqueId(), con.getMessagesReceived(), count);
            }
            Thread.sleep(100);
          }
        }
      }
    }
    if (connectionStates.size() > 0) {
      if (logger.isDebugEnabled()) {
        StringBuffer sb = new StringBuffer(1000);
        sb.append("These connections from ");
        sb.append(member);
        sb.append("could not be located during waitForThreadOwnedOrderedConnectionState: ");
        for (Iterator it = connectionStates.entrySet().iterator(); it.hasNext();) {
          Map.Entry entry = (Map.Entry) it.next();
          sb.append(entry.getKey()).append('(').append(entry.getValue()).append(')');
          if (it.hasNext()) {
            sb.append(',');
          }
        }
        logger.debug(sb);
      }
    }
  }

  protected DistributionManager getDM() {
    return this.owner.getDM();
  }

  // public boolean isShuttingDown() {
  // return this.owner.isShuttingDown();
  // }

  // protected void cleanupHighWater() {
  // cleanup(highWater);
  // }

  // protected void cleanupLowWater() {
  // cleanup(lowWater);
  // }

  // private void cleanup(int maxConnections) {
  /*
   * if (maxConnections == 0 || maxConnections >= connections.size()) { return; } while
   * (connections.size() > maxConnections) { Connection oldest = null; synchronized(connections) {
   * for (Iterator iter = connections.values().iterator(); iter.hasNext(); ) { Connection c =
   * (Connection)iter.next(); if (oldest == null || c.getTimeStamp() < oldest.getTimeStamp()) {
   * oldest = c; } } } // sanity check - don't close anything fresher than 10 seconds or // we'll
   * start thrashing if (oldest.getTimeStamp() > (System.currentTimeMillis() - 10000)) { if
   * (owner.lowWaterConnectionCount > 0) { owner.lowWaterConnectionCount += 10; } if
   * (owner.highWaterConnectionCount > 0) { owner.highWaterConnectionCount += 10; } new Object[] {
   * owner.lowWaterConnectionCount, owner.highWaterConnectionCount }); break; } if (oldest != null)
   * { oldest.close(); } }
   */
  // }

  /*
   * public void dumpConnectionTable() { Iterator iter = connectionMap.keySet().iterator(); while
   * (iter.hasNext()) { Object key = iter.next(); Object val = connectionMap.get(key); } }
   */
  private /* static */ class PendingConnection {
    /**
     * true if this connection is still pending
     */
    private boolean pending = true;

    /**
     * the connection we are waiting on
     */
    private Connection conn = null;

    /**
     * whether the connection preserves message ordering
     */
    private final boolean preserveOrder;

    /**
     * the stub we are connecting to
     */
    private final DistributedMember id;

    private final Thread connectingThread;

    public PendingConnection(boolean preserveOrder, DistributedMember id) {
      this.preserveOrder = preserveOrder;
      this.id = id;
      this.connectingThread = Thread.currentThread();
    }

    /**
     * Synchronously set the connection and notify waiters that we are ready.
     *
     * @param c the new connection
     */
    public synchronized void notifyWaiters(Connection c) {
      if (!this.pending)
        return; // already done.

      this.conn = c;
      this.pending = false;
      if (logger.isDebugEnabled()) {
        logger.debug("Notifying waiters that pending {} connection to {} is ready; {}",
            ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
      }
      this.notifyAll();
    }

    /**
     * Wait for a connection
     *
     * @param mgr the membership manager that can instigate suspect processing if necessary
     * @param startTime the ms clock start time for the operation
     * @param ackTimeout the ms ack-wait-threshold, or zero
     * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
     * @return the new connection
     */
    public synchronized Connection waitForConnect(Membership mgr, long startTime,
        long ackTimeout, long ackSATimeout) throws IOException {
      if (connectingThread == Thread.currentThread()) {
        throw new ReenteredConnectException("This thread is already trying to connect");
      }

      final Map m = this.preserveOrder ? orderedConnectionMap : unorderedConnectionMap;

      boolean severeAlertIssued = false;
      boolean suspected = false;
      DistributedMember targetMember = null;
      if (ackSATimeout > 0) {
        targetMember = this.id;
      }

      int attempt = 0;
      for (;;) {
        if (!this.pending) {
          break;
        }
        getConduit().getCancelCriterion().checkCancelInProgress(null);

        // wait a little bit...
        boolean interrupted = Thread.interrupted();
        try {
          this.wait(100); // spurious wakeup ok
        } catch (InterruptedException ignore) {
          interrupted = true;
          getConduit().getCancelCriterion().checkCancelInProgress(ignore);
        } finally {
          if (interrupted) {
            Thread.currentThread().interrupt();
          }
        }

        if (!this.pending)
          break;

        // Still pending...
        long now = System.currentTimeMillis();
        if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) {
          if (startTime + ackTimeout + ackSATimeout < now) {
            if (targetMember != null) {
              logger.fatal("Unable to form a TCP/IP connection to {} in over {} seconds",
                  targetMember, (ackSATimeout + ackTimeout) / 1000);
            }
            severeAlertIssued = true;
          } else if (!suspected) {
            logger.warn("Unable to form a TCP/IP connection to %s in over %s seconds",
                this.id, (ackTimeout) / 1000);
            ((GMSMembershipManager) mgr).suspectMember((InternalDistributedMember) targetMember,
                "Unable to form a TCP/IP connection in a reasonable amount of time");
            suspected = true;
          }
        }

        Object e;
        // synchronized (m) {
        e = m.get(this.id);
        // }
        if (e == this) {
          attempt += 1;
          if (logger.isDebugEnabled() && (attempt % 20 == 1)) {
            logger.debug("Waiting for pending connection to complete: {} connection to {}; {}",
                ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
          }
          continue;
        }

        // Odd state change. Process and exit.
        if (logger.isDebugEnabled()) {
          logger.debug("Pending connection changed to {} unexpectedly", e);
        }

        if (e == null) {
          // We were removed
          notifyWaiters(null);
          break;
        } else if (e instanceof Connection) {
          notifyWaiters((Connection) e);
          break;
        } else {
          // defer to the new instance
          return ((PendingConnection) e).waitForConnect(mgr, startTime, ackTimeout, ackSATimeout);
        }

      } // for
      return this.conn;

    }

    public String toString() {
      return super.toString() + " created by " + connectingThread.getName();
    }
  }


  private static class IdleConnTT extends SystemTimer.SystemTimerTask {

    private Connection c;

    IdleConnTT(Connection c) {
      this.c = c;
    }

    @Override
    public boolean cancel() {
      Connection con = this.c;
      if (con != null) {
        con.cleanUpOnIdleTaskCancel();
      }
      this.c = null;
      return super.cancel();
    }

    @Override
    public void run2() {
      Connection con = this.c;
      if (con != null) {
        if (con.checkForIdleTimeout()) {
          cancel();
        }
      }
    }
  }

  public static ConnectionTable create(TCPConduit conduit) throws IOException {
    ConnectionTable ct = new ConnectionTable(conduit);
    lastInstance.set(ct);
    return ct;
  }

  /** keep track of a socket that is trying to connect() for shutdown purposes */
  public void addConnectingSocket(Socket socket, InetAddress addr) {
    synchronized (connectingSockets) {
      connectingSockets.put(socket, new ConnectingSocketInfo(addr));
    }
  }

  /** remove a socket from the tracked set. It should be connected at this point */
  public void removeConnectingSocket(Socket socket) {
    synchronized (connectingSockets) {
      connectingSockets.remove(socket);
    }
  }


  private static class ConnectingSocketInfo {
    InetAddress peerAddress;
    Thread connectingThread;

    public ConnectingSocketInfo(InetAddress addr) {
      this.peerAddress = addr;
      this.connectingThread = Thread.currentThread();
    }
  }

  public int getNumberOfReceivers() {
    return receivers.size();
  }
}
