/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.client.internal.pooling;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.logging.internal.spi.LogWriterLevel.FINEST;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.client.AllConnectionsInUseException;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.internal.ClientUpdater;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ConnectionFactory;
import org.apache.geode.cache.client.internal.ConnectionStats;
import org.apache.geode.cache.client.internal.Endpoint;
import org.apache.geode.cache.client.internal.EndpointManager;
import org.apache.geode.cache.client.internal.EndpointManagerImpl;
import org.apache.geode.cache.client.internal.Op;
import org.apache.geode.cache.client.internal.QueueManager;
import org.apache.geode.cache.client.internal.ServerDenyList;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.PoolStats;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LocalLogWriter;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.ClientServerTest;

@Category({ClientServerTest.class})
public class ConnectionManagerJUnitTest {

  private static final long TIMEOUT_MILLIS = 30 * 1000;
  // Some machines do not have a monotonic clock.
  private static final long ALLOWABLE_ERROR_IN_MILLIS = 100;
  ConnectionManager manager;
  private InternalLogWriter logger;
  protected DummyFactory factory;
  private DistributedSystem ds;
  private ScheduledExecutorService background;
  protected EndpointManager endpointManager;
  private CancelCriterion cancelCriterion;
  private PoolStats poolStats;

  @Before
  public void setUp() {
    this.logger = new LocalLogWriter(FINEST.intLevel(), System.out);
    factory = new DummyFactory();

    Properties properties = new Properties();
    properties.put(MCAST_PORT, "0");
    properties.put(LOCATORS, "");
    ds = DistributedSystem.connect(properties);
    background = Executors.newSingleThreadScheduledExecutor();
    poolStats = new PoolStats(ds, "connectionManagerJUnitTest");
    endpointManager = new EndpointManagerImpl("pool", ds, ds.getCancelCriterion(), poolStats);
    cancelCriterion = new CancelCriterion() {

      @Override
      public String cancelInProgress() {
        return null;
      }

      @Override
      public RuntimeException generateCancelledException(Throwable e) {
        return null;
      }
    };
  }

  @After
  public void tearDown() throws InterruptedException {
    ds.disconnect();
    if (manager != null) {
      manager.close(false);
    }
    background.shutdownNow();
  }

  @Test
  public void testAddVarianceToInterval() {
    assertThat(ConnectionManagerImpl.addVarianceToInterval(0)).as("Zero gets zero variance")
        .isEqualTo(0);
    assertThat(ConnectionManagerImpl.addVarianceToInterval(300000))
        .as("Large value gets +/-10% variance").isNotEqualTo(300000).isGreaterThanOrEqualTo(270000)
        .isLessThanOrEqualTo(330000);
    assertThat(ConnectionManagerImpl.addVarianceToInterval(9)).as("Small value gets +/-1 variance")
        .isNotEqualTo(9).isGreaterThanOrEqualTo(8).isLessThanOrEqualTo(10);
  }

  @Test
  public void testGet()
      throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 3, 0, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    Connection conn[] = new Connection[4];

    conn[0] = manager.borrowConnection(0);
    Assert.assertEquals(1, factory.creates);

    manager.returnConnection(conn[0]);
    conn[0] = manager.borrowConnection(0);
    Assert.assertEquals(1, factory.creates);
    conn[1] = manager.borrowConnection(0);
    manager.returnConnection(conn[0]);
    manager.returnConnection(conn[1]);
    Assert.assertEquals(2, factory.creates);

    conn[0] = manager.borrowConnection(0);
    conn[1] = manager.borrowConnection(0);
    conn[2] = manager.borrowConnection(0);
    Assert.assertEquals(3, factory.creates);

    try {
      conn[4] = manager.borrowConnection(10);
      fail("Should have received an all connections in use exception");
    } catch (AllConnectionsInUseException e) {
      // expected exception
    }
  }

  @Test
  public void testPrefill() throws InterruptedException {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 2, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);
    final String descrip = manager.toString();
    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return factory.creates == 2 && factory.destroys == 0;
      }

      @Override
      public String description() {
        return "waiting for manager " + descrip;
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);
  }

  @Test
  public void testInvalidateConnection()
      throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, 0L, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    Connection conn = manager.borrowConnection(0);
    Assert.assertEquals(1, factory.creates);
    Assert.assertEquals(0, factory.destroys);
    conn.destroy();
    manager.returnConnection(conn);
    Assert.assertEquals(1, factory.creates);
    Assert.assertEquals(1, factory.destroys);
    conn = manager.borrowConnection(0);
    Assert.assertEquals(2, factory.creates);
    Assert.assertEquals(1, factory.destroys);
  }

  @Test
  public void testInvalidateServer()
      throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    ServerLocation server1 = new ServerLocation("localhost", 1);
    ServerLocation server2 = new ServerLocation("localhost", 2);
    factory.nextServer = server1;
    Connection conn1 = manager.borrowConnection(0);
    Connection conn2 = manager.borrowConnection(0);
    Connection conn3 = manager.borrowConnection(0);
    factory.nextServer = server2;
    Connection conn4 = manager.borrowConnection(0);

    Assert.assertEquals(4, factory.creates);
    Assert.assertEquals(0, factory.destroys);

    manager.returnConnection(conn2);
    endpointManager.serverCrashed(conn2.getEndpoint());
    Assert.assertEquals(3, factory.destroys);
    conn1.destroy();
    manager.returnConnection(conn1);
    Assert.assertEquals(3, factory.destroys);
    manager.returnConnection(conn3);
    manager.returnConnection(conn4);
    Assert.assertEquals(3, factory.destroys);

    manager.borrowConnection(0);
    Assert.assertEquals(4, factory.creates);
    Assert.assertEquals(3, factory.destroys);
  }

  @Test
  public void testIdleExpiration()
      throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
    final long idleTimeoutMillis = 300;
    manager =
        new ConnectionManagerImpl("pool", factory, endpointManager, 5, 2, idleTimeoutMillis, -1,
            logger, 60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    {
      factory.waitWhile(() -> factory.creates < 2);
      Assert.assertEquals(2, factory.creates);
      Assert.assertEquals(0, factory.destroys);
      Assert.assertEquals(0, factory.closes);
      Assert.assertEquals(0, poolStats.getIdleExpire());
      // no need to wait; dangerous because it gives connections a chance to expire
      // //wait for prefill task to finish.
      // Thread.sleep(100);
    }

    Connection conn1 = manager.borrowConnection(500);
    Connection conn2 = manager.borrowConnection(500);
    Connection conn3 = manager.borrowConnection(500);
    Connection conn4 = manager.borrowConnection(500);
    Connection conn5 = manager.borrowConnection(500);

    // wait to make sure checked out connections aren't timed out
    Thread.sleep(idleTimeoutMillis * 2);
    Assert.assertEquals(5, factory.creates);
    Assert.assertEquals(0, factory.destroys);
    Assert.assertEquals(0, factory.closes);
    Assert.assertEquals(0, poolStats.getIdleExpire());

    {
      // make sure a connection that has been passivated can idle-expire
      conn1.passivate(true);

      long elapsedMillis = factory.waitWhile(() -> factory.destroys < 1);
      Assert.assertEquals(5, factory.creates);
      Assert.assertEquals(1, factory.destroys);
      Assert.assertEquals(1, factory.closes);
      Assert.assertEquals(1, poolStats.getIdleExpire());
      checkIdleTimeout(idleTimeoutMillis, elapsedMillis);
    }

    // now return all other connections to pool and verify that just 2 expire
    manager.returnConnection(conn2);
    manager.returnConnection(conn3);
    manager.returnConnection(conn4);
    manager.returnConnection(conn5);

    {
      long elapsedMillis = factory.waitWhile(() -> factory.destroys < 3);
      Assert.assertEquals(5, factory.creates);
      Assert.assertEquals(3, factory.destroys);
      Assert.assertEquals(3, factory.closes);
      Assert.assertEquals(3, poolStats.getIdleExpire());
      checkIdleTimeout(idleTimeoutMillis, elapsedMillis);
    }

    // wait to make sure min-connections don't time out
    Thread.sleep(idleTimeoutMillis * 2);
    Assert.assertEquals(5, factory.creates);
    Assert.assertEquals(3, factory.destroys);
    Assert.assertEquals(3, factory.closes);
    Assert.assertEquals(3, poolStats.getIdleExpire());
  }

  private void checkIdleTimeout(final long idleTimeoutMillis, long elapsedMillis) {
    Assert.assertTrue(
        "Elapsed " + elapsedMillis + " is less than idle timeout " + idleTimeoutMillis,
        elapsedMillis >= (idleTimeoutMillis - ALLOWABLE_ERROR_IN_MILLIS));
    Assert.assertTrue(
        "Elapsed " + elapsedMillis + " is greater than idle timeout " + idleTimeoutMillis,
        elapsedMillis <= (idleTimeoutMillis + ALLOWABLE_ERROR_IN_MILLIS));
  }

  @Test
  public void testBug41516()
      throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
    final long idleTimeoutMillis = 300;
    final long BORROW_TIMEOUT_MILLIS = 500;
    manager =
        new ConnectionManagerImpl("pool", factory, endpointManager, 2, 1, idleTimeoutMillis, -1,
            logger, 60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    Connection conn1 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
    Connection conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);

    // Return some connections, let them idle expire
    manager.returnConnection(conn1);
    manager.returnConnection(conn2);

    {
      long elapsedMillis = factory.waitWhile(() -> factory.destroys < 1);
      Assert.assertEquals(1, factory.destroys);
      Assert.assertEquals(1, factory.closes);
      Assert.assertEquals(1, poolStats.getIdleExpire());
      Assert.assertTrue(
          "Elapsed " + elapsedMillis + " is less than idle timeout " + idleTimeoutMillis,
          elapsedMillis + ALLOWABLE_ERROR_IN_MILLIS >= idleTimeoutMillis);
    }

    // Ok, now get some connections that fill our queue
    Connection ping1 =
        manager.borrowConnection(new ServerLocation("localhost", 5), false);
    Connection ping2 =
        manager.borrowConnection(new ServerLocation("localhost", 5), false);
    manager.returnConnection(ping1);
    manager.returnConnection(ping2);

    manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
    manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
    long startNanos = nowNanos();
    try {
      manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
      fail("Didn't get an exception");
    } catch (AllConnectionsInUseException e) {
      // expected
    }
    long elapsedMillis = elapsedMillis(startNanos);
    Assert.assertTrue("Elapsed = " + elapsedMillis,
        elapsedMillis >= BORROW_TIMEOUT_MILLIS - ALLOWABLE_ERROR_IN_MILLIS);
  }

  @Test
  public void testLifetimeExpiration() throws InterruptedException, AllConnectionsInUseException,
      NoAvailableServersException, Throwable {
    int lifetimeTimeout = 500;
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 2, 2, -1, lifetimeTimeout,
        logger, 60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    {
      factory.waitWhile(() -> factory.creates < 2);
      Assert.assertEquals(2, factory.creates);
      Assert.assertEquals(0, factory.destroys);
      Assert.assertEquals(0, factory.finds);
    }

    // need to start a thread that keeps the connections busy
    // so that their last access time keeps changing
    AtomicReference exception = new AtomicReference();
    int updaterCount = 2;
    UpdaterThread[] updaters = new UpdaterThread[updaterCount];

    for (int i = 0; i < updaterCount; i++) {
      updaters[i] = new UpdaterThread(null, exception, i, (lifetimeTimeout / 10) * 2);
    }

    for (int i = 0; i < updaterCount; i++) {
      updaters[i].start();
    }

    {
      long durationMillis = factory.waitWhile(() -> factory.finds < 2);
      Assert.assertEquals(2, factory.finds);
      // server shouldn't have changed so no increase in creates or destroys
      Assert.assertEquals(2, factory.creates);
      Assert.assertEquals(0, factory.destroys);
      Assert.assertEquals(0, factory.closes);

      Assert.assertTrue("took too long to expire lifetime; expected=" + lifetimeTimeout
          + " but took=" + durationMillis, durationMillis < lifetimeTimeout * 5);
    }

    for (int i = 0; i < updaterCount; i++) {
      ThreadUtils.join(updaters[i], 30 * 1000);
    }

    if (exception.get() != null) {
      throw (Throwable) exception.get();
    }

    for (int i = 0; i < updaterCount; i++) {
      Assert.assertFalse("Updater [" + i + "] is still running", updaters[i].isAlive());
    }
  }

  @Test
  public void testExclusiveConnectionAccess() throws Throwable {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);
    AtomicReference exception = new AtomicReference();
    AtomicBoolean haveConnection = new AtomicBoolean();
    int updaterCount = 10;
    UpdaterThread[] updaters = new UpdaterThread[updaterCount];

    for (int i = 0; i < updaterCount; i++) {
      updaters[i] = new UpdaterThread(haveConnection, exception, i);
    }

    for (int i = 0; i < updaterCount; i++) {
      updaters[i].start();
    }

    for (int i = 0; i < updaterCount; i++) {
      ThreadUtils.join(updaters[i], 30 * 1000);
    }

    if (exception.get() != null) {
      throw (Throwable) exception.get();
    }

    for (int i = 0; i < updaterCount; i++) {
      Assert.assertFalse("Updater [" + i + "] is still running", updaters[i].isAlive());
    }
  }

  @Test
  public void testClose()
      throws AllConnectionsInUseException, NoAvailableServersException, InterruptedException {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    Connection conn1 = manager.borrowConnection(0);
    manager.borrowConnection(0);
    manager.returnConnection(conn1);
    Assert.assertEquals(2, factory.creates);
    Assert.assertEquals(0, factory.destroys);

    manager.close(false);

    Assert.assertEquals(2, factory.closes);
    Assert.assertEquals(2, factory.destroys);

  }

  @Test
  public void testExchangeConnection() throws Exception {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 2, 0, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    Connection conn1 = manager.borrowConnection(10);
    Connection conn2 = manager.borrowConnection(10);
    try {
      manager.borrowConnection(10);
      fail("Exepected no servers available");
    } catch (AllConnectionsInUseException e) {
      // expected
    }

    Assert.assertEquals(2, factory.creates);
    Assert.assertEquals(0, factory.destroys);
    Assert.assertEquals(2, manager.getConnectionCount());

    Connection conn3 = manager.exchangeConnection(conn1, Collections.emptySet());

    Assert.assertEquals(3, factory.creates);
    Assert.assertEquals(1, factory.destroys);
    Assert.assertEquals(2, manager.getConnectionCount());

    manager.returnConnection(conn2);

    Assert.assertEquals(3, factory.creates);
    Assert.assertEquals(1, factory.destroys);
    Assert.assertEquals(2, manager.getConnectionCount());

    Connection conn4 =
        manager.exchangeConnection(conn3, Collections.singleton(conn3.getServer()));

    Assert.assertEquals(4, factory.creates);
    Assert.assertEquals(2, factory.destroys);
    Assert.assertEquals(2, manager.getConnectionCount());

    manager.returnConnection(conn4);
  }

  /**
   * This tests that a deadlock between connection formation and connection pool closing has been
   * fixed. See GEODE-4615
   */
  @Test
  public void testThatMapCloseCausesCacheClosedException() throws Exception {
    final ConnectionManagerImpl connectionManager = new ConnectionManagerImpl("pool", factory,
        endpointManager, 2, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
    manager = connectionManager;
    connectionManager.start(background);
    final ConnectionManagerImpl.ConnectionMap connectionMap = connectionManager.allConnectionsMap;

    final int thread1 = 0;
    final int thread2 = 1;
    final boolean[] ready = new boolean[2];
    Thread thread = new Thread("ConnectionManagerJUnitTest thread") {
      @Override
      public void run() {
        setReady(ready, thread1);
        waitUntilReady(ready, thread2);
        connectionMap.close(false);
      }
    };
    thread.setDaemon(true);
    thread.start();
    try {
      Connection firstConnection = connectionManager.borrowConnection(0);
      synchronized (firstConnection) {
        setReady(ready, thread2);
        waitUntilReady(ready, thread1);
        // the other thread will now try to close the connection map but it will block
        // because this thread has locked one of the connections
        await().until(() -> connectionMap.closing);
        try {
          connectionManager.borrowConnection(0);
          fail("expected a CacheClosedException");
        } catch (CacheClosedException e) {
          // expected
        }
      }
    } finally {
      if (thread.isAlive()) {
        System.out.println("stopping background thread");
        thread.interrupt();
        thread.join();
      }
    }
  }

  private void setReady(boolean[] ready, int index) {
    System.out.println(
        Thread.currentThread().getName() + ": setting that thread" + (index + 1) + " is ready");
    synchronized (ready) {
      ready[index] = true;
    }
  }

  private void waitUntilReady(boolean[] ready, int index) {
    System.out.println(
        Thread.currentThread().getName() + ": waiting for thread" + (index + 1) + " to be ready");
    await().until(() -> {
      synchronized (ready) {
        return (ready[index]);
      }
    });
  }

  private long nowNanos() {
    return System.nanoTime();
  }

  private long elapsedNanos(long startNanos) {
    return nowNanos() - startNanos;
  }

  private long elapsedMillis(long startNanos) {
    return NANOSECONDS.toMillis(elapsedNanos(startNanos));
  }

  @Test
  public void testBlocking() throws Throwable {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    final Connection conn1 = manager.borrowConnection(10);

    long BORROW_TIMEOUT_MILLIS = 300;
    long startNonos = nowNanos();
    try {
      manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
      fail("Should have received no servers available");
    } catch (AllConnectionsInUseException expected) {

    }
    long elapsedMillis = elapsedMillis(startNonos);
    Assert.assertTrue(
        "Should have blocked for " + BORROW_TIMEOUT_MILLIS + " millis for a connection",
        elapsedMillis >= BORROW_TIMEOUT_MILLIS - ALLOWABLE_ERROR_IN_MILLIS);

    Thread returnThread = new Thread() {
      @Override
      public void run() {
        try {
          Thread.sleep(50);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
        manager.returnConnection(conn1);
      }
    };

    returnThread.start();
    BORROW_TIMEOUT_MILLIS = 5000;
    startNonos = nowNanos();
    Connection conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
    elapsedMillis = elapsedMillis(startNonos);
    Assert.assertTrue(
        "Should have blocked for less than " + BORROW_TIMEOUT_MILLIS + " milliseconds",
        elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
    manager.returnConnection(conn2);


    final Connection conn3 = manager.borrowConnection(10);
    Thread invalidateThread = new Thread() {
      @Override
      public void run() {
        try {
          Thread.sleep(50);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
        conn3.destroy();
        manager.returnConnection(conn3);
      }
    };

    invalidateThread.start();
    startNonos = nowNanos();
    conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
    elapsedMillis = elapsedMillis(startNonos);
    Assert.assertTrue(
        "Should have blocked for less than " + BORROW_TIMEOUT_MILLIS + " milliseconds",
        elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
    manager.returnConnection(conn2);

    final Connection conn4 = manager.borrowConnection(10);
    Thread invalidateThread2 = new Thread() {
      @Override
      public void run() {
        try {
          Thread.sleep(50);
        } catch (InterruptedException e) {
          fail("interrupted");
        }
        endpointManager.serverCrashed(conn4.getEndpoint());
        manager.returnConnection(conn4);
      }
    };

    invalidateThread2.start();
    startNonos = nowNanos();
    conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
    elapsedMillis = elapsedMillis(startNonos);
    Assert.assertTrue(
        "Should have blocked for less than " + BORROW_TIMEOUT_MILLIS + " milliseconds",
        elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
    manager.returnConnection(conn2);
  }

  @Test
  public void testExplicitServer() throws Exception {
    manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger,
        60 * 1000, cancelCriterion, poolStats);
    manager.start(background);

    Connection conn1 = manager.borrowConnection(0);

    try {
      manager.borrowConnection(10);
      fail("Should have received an error");
    } catch (AllConnectionsInUseException expected) {
      // do nothing
    }

    Connection conn3 = manager.borrowConnection(new ServerLocation("localhost", -2), false);
    Assert.assertEquals(2, factory.creates);
    Assert.assertEquals(0, factory.destroys);
    Assert.assertEquals(0, factory.closes);

    manager.returnConnection(conn3);
    Assert.assertEquals(2, factory.creates);
    Assert.assertEquals(1, factory.destroys);
    Assert.assertEquals(1, factory.closes);

    manager.returnConnection(conn1);
    Assert.assertEquals(2, factory.creates);
    Assert.assertEquals(1, factory.destroys);
    Assert.assertEquals(1, factory.closes);
  }

  private class UpdaterThread extends Thread {

    private AtomicReference exception;

    private final AtomicBoolean haveConnection;

    private int id;
    private final int iterations;

    public UpdaterThread(AtomicBoolean haveConnection, AtomicReference exception, int id) {
      this(haveConnection, exception, id, 10);
    }

    public UpdaterThread(AtomicBoolean haveConnection, AtomicReference exception, int id,
        int iterations) {
      this.haveConnection = haveConnection;
      this.exception = exception;
      this.id = id;
      this.iterations = iterations;
    }

    private Connection borrow(int i) {
      long startNanos = nowNanos();
      long BORROW_TIMEOUT_MILLIS = 2000;
      Connection conn = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
      if (haveConnection != null) {
        Assert.assertTrue("Updater[" + id + "] loop[" + i + "] Someone else has the connection!",
            haveConnection.compareAndSet(false, true));
      }
      long elapsedMillis = elapsedMillis(startNanos);
      Assert.assertTrue("Elapsed time (" + elapsedMillis + ") >= " + BORROW_TIMEOUT_MILLIS,
          elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
      return conn;
    }

    @Override
    public void run() {
      int i = 0;
      Connection conn = null;
      try {
        for (i = 0; i < iterations; i++) {
          conn = borrow(i);
          try {
            Thread.sleep(10);
            if (haveConnection != null) {
              Assert.assertTrue(
                  "Updater[" + id + "] loop[" + i + "] Someone else changed the connection flag",
                  haveConnection.compareAndSet(true, false));
            }
          } finally {
            manager.returnConnection(conn);
          }
        }
      } catch (Throwable t) {
        this.exception.compareAndSet(null,
            new Exception("ERROR Updater[" + id + "] loop[" + i + "]", t));
      }

    }
  }

  public class DummyFactory implements ConnectionFactory {
    public ServerLocation nextServer = new ServerLocation("localhost", -1);
    protected volatile int creates;
    protected volatile int destroys;
    protected volatile int closes;
    protected volatile int finds;

    /**
     * Wait as long as "whileCondition" is true.
     * The wait will timeout after TIMEOUT_MILLIS has elapsed.
     *
     * @return the elapsed time in milliseconds.
     */
    public synchronized long waitWhile(BooleanSupplier whileCondition) throws InterruptedException {
      final long startNanos = nowNanos();
      long remainingMillis = TIMEOUT_MILLIS;
      while (whileCondition.getAsBoolean() && remainingMillis > 0) {
        wait(remainingMillis);
        remainingMillis = TIMEOUT_MILLIS - elapsedMillis(startNanos);
      }
      return elapsedMillis(startNanos);
    }

    @Override
    public ServerDenyList getDenyList() {
      return new ServerDenyList(1);
    }


    @Override
    public ServerLocation findBestServer(ServerLocation currentServer, Set excludedServers) {
      synchronized (this) {
        finds++;
        this.notifyAll();
      }
      if (excludedServers != null) {
        if (excludedServers.contains(nextServer)) {
          return null;
        }
      }
      return nextServer;
    }

    @Override
    public Connection createClientToServerConnection(Set excluded) {
      return createClientToServerConnection(nextServer, true);
    }


    /*
     * (non-Javadoc)
     *
     * @see
     * org.apache.geode.cache.client.internal.ConnectionFactory#createClientToServerConnection(org.
     * apache.geode.distributed.internal.ServerLocation)
     */
    @Override
    public Connection createClientToServerConnection(final ServerLocation location,
        boolean forQueue) {
      synchronized (this) {
        creates++;
        this.notifyAll();
      }
      DistributedMember fakeMember = null;
      fakeMember = new InternalDistributedMember("localhost", 555);
      final DistributedMember member = fakeMember;

      return new Connection() {

        private Endpoint endpoint = endpointManager.referenceEndpoint(location, member);

        @Override
        public void destroy() {
          synchronized (DummyFactory.this) {
            destroys++;
            DummyFactory.this.notifyAll();
          }
        }

        @Override
        public ServerLocation getServer() {
          return location;
        }

        @Override
        public ByteBuffer getCommBuffer() {
          return null;
        }

        @Override
        public Socket getSocket() {
          return null;
        }

        @Override
        public ConnectionStats getStats() {
          return null;
        }

        @Override
        public void close(boolean keepAlive) throws Exception {
          synchronized (DummyFactory.this) {
            closes++;
            DummyFactory.this.notifyAll();
          }
        }

        @Override
        public Endpoint getEndpoint() {
          return endpoint;
        }

        @Override
        public ServerQueueStatus getQueueStatus() {
          return null;
        }

        @Override
        public Object execute(Op op) throws Exception {
          return op.attempt(this);
        }

        @Override
        public boolean isDestroyed() {
          return false;
        }

        @Override
        public void emergencyClose() {}

        @Override
        public short getWanSiteVersion() {
          return -1;
        }

        @Override
        public int getDistributedSystemId() {
          return -1;
        }

        @Override
        public void setWanSiteVersion(short wanSiteVersion) {}

        @Override
        public InputStream getInputStream() {
          return null;
        }

        @Override
        public OutputStream getOutputStream() {
          return null;
        }

        @Override
        public void setConnectionID(long id) {}

        @Override
        public long getConnectionID() {
          return 0;
        }
      };
    }

    @Override
    public ClientUpdater createServerToClientConnection(Endpoint endpoint, QueueManager manager,
        boolean isPrimary, ClientUpdater failedUpdater) {
      return null;
    }
  }
}
