blob: 4f20d9130deaa0bf8b22d54748561fb2fe1bde4b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal.pooling;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.logging.internal.spi.LogWriterLevel.FINEST;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.client.AllConnectionsInUseException;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.internal.ClientUpdater;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ConnectionFactory;
import org.apache.geode.cache.client.internal.ConnectionStats;
import org.apache.geode.cache.client.internal.Endpoint;
import org.apache.geode.cache.client.internal.EndpointManager;
import org.apache.geode.cache.client.internal.EndpointManagerImpl;
import org.apache.geode.cache.client.internal.Op;
import org.apache.geode.cache.client.internal.QueueManager;
import org.apache.geode.cache.client.internal.ServerDenyList;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.PoolStats;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LocalLogWriter;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.ClientServerTest;
@Category({ClientServerTest.class})
public class ConnectionManagerJUnitTest {
private static final long TIMEOUT_MILLIS = 30 * 1000;
// Some machines do not have a monotonic clock.
private static final long ALLOWABLE_ERROR_IN_MILLIS = 100;
ConnectionManager manager;
private InternalLogWriter logger;
protected DummyFactory factory;
private DistributedSystem ds;
private ScheduledExecutorService background;
protected EndpointManager endpointManager;
private CancelCriterion cancelCriterion;
private PoolStats poolStats;
@Before
public void setUp() {
this.logger = new LocalLogWriter(FINEST.intLevel(), System.out);
factory = new DummyFactory();
Properties properties = new Properties();
properties.put(MCAST_PORT, "0");
properties.put(LOCATORS, "");
ds = DistributedSystem.connect(properties);
background = Executors.newSingleThreadScheduledExecutor();
poolStats = new PoolStats(ds, "connectionManagerJUnitTest");
endpointManager = new EndpointManagerImpl("pool", ds, ds.getCancelCriterion(), poolStats);
cancelCriterion = new CancelCriterion() {
@Override
public String cancelInProgress() {
return null;
}
@Override
public RuntimeException generateCancelledException(Throwable e) {
return null;
}
};
}
@After
public void tearDown() throws InterruptedException {
ds.disconnect();
if (manager != null) {
manager.close(false);
}
background.shutdownNow();
}
@Test
public void testAddVarianceToInterval() {
assertThat(ConnectionManagerImpl.addVarianceToInterval(0)).as("Zero gets zero variance")
.isEqualTo(0);
assertThat(ConnectionManagerImpl.addVarianceToInterval(300000))
.as("Large value gets +/-10% variance").isNotEqualTo(300000).isGreaterThanOrEqualTo(270000)
.isLessThanOrEqualTo(330000);
assertThat(ConnectionManagerImpl.addVarianceToInterval(9)).as("Small value gets +/-1 variance")
.isNotEqualTo(9).isGreaterThanOrEqualTo(8).isLessThanOrEqualTo(10);
}
@Test
public void testGet()
throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 3, 0, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn[] = new Connection[4];
conn[0] = manager.borrowConnection(0);
Assert.assertEquals(1, factory.creates);
manager.returnConnection(conn[0]);
conn[0] = manager.borrowConnection(0);
Assert.assertEquals(1, factory.creates);
conn[1] = manager.borrowConnection(0);
manager.returnConnection(conn[0]);
manager.returnConnection(conn[1]);
Assert.assertEquals(2, factory.creates);
conn[0] = manager.borrowConnection(0);
conn[1] = manager.borrowConnection(0);
conn[2] = manager.borrowConnection(0);
Assert.assertEquals(3, factory.creates);
try {
conn[4] = manager.borrowConnection(10);
fail("Should have received an all connections in use exception");
} catch (AllConnectionsInUseException e) {
// expected exception
}
}
@Test
public void testPrefill() throws InterruptedException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 2, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
final String descrip = manager.toString();
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return factory.creates == 2 && factory.destroys == 0;
}
@Override
public String description() {
return "waiting for manager " + descrip;
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
@Test
public void testInvalidateConnection()
throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, 0L, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn = manager.borrowConnection(0);
Assert.assertEquals(1, factory.creates);
Assert.assertEquals(0, factory.destroys);
conn.destroy();
manager.returnConnection(conn);
Assert.assertEquals(1, factory.creates);
Assert.assertEquals(1, factory.destroys);
conn = manager.borrowConnection(0);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(1, factory.destroys);
}
@Test
public void testInvalidateServer()
throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
ServerLocation server1 = new ServerLocation("localhost", 1);
ServerLocation server2 = new ServerLocation("localhost", 2);
factory.nextServer = server1;
Connection conn1 = manager.borrowConnection(0);
Connection conn2 = manager.borrowConnection(0);
Connection conn3 = manager.borrowConnection(0);
factory.nextServer = server2;
Connection conn4 = manager.borrowConnection(0);
Assert.assertEquals(4, factory.creates);
Assert.assertEquals(0, factory.destroys);
manager.returnConnection(conn2);
endpointManager.serverCrashed(conn2.getEndpoint());
Assert.assertEquals(3, factory.destroys);
conn1.destroy();
manager.returnConnection(conn1);
Assert.assertEquals(3, factory.destroys);
manager.returnConnection(conn3);
manager.returnConnection(conn4);
Assert.assertEquals(3, factory.destroys);
manager.borrowConnection(0);
Assert.assertEquals(4, factory.creates);
Assert.assertEquals(3, factory.destroys);
}
@Test
public void testIdleExpiration()
throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
final long idleTimeoutMillis = 300;
manager =
new ConnectionManagerImpl("pool", factory, endpointManager, 5, 2, idleTimeoutMillis, -1,
logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
{
factory.waitWhile(() -> factory.creates < 2);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(0, factory.closes);
Assert.assertEquals(0, poolStats.getIdleExpire());
// no need to wait; dangerous because it gives connections a chance to expire
// //wait for prefill task to finish.
// Thread.sleep(100);
}
Connection conn1 = manager.borrowConnection(500);
Connection conn2 = manager.borrowConnection(500);
Connection conn3 = manager.borrowConnection(500);
Connection conn4 = manager.borrowConnection(500);
Connection conn5 = manager.borrowConnection(500);
// wait to make sure checked out connections aren't timed out
Thread.sleep(idleTimeoutMillis * 2);
Assert.assertEquals(5, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(0, factory.closes);
Assert.assertEquals(0, poolStats.getIdleExpire());
{
// make sure a connection that has been passivated can idle-expire
conn1.passivate(true);
long elapsedMillis = factory.waitWhile(() -> factory.destroys < 1);
Assert.assertEquals(5, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(1, factory.closes);
Assert.assertEquals(1, poolStats.getIdleExpire());
checkIdleTimeout(idleTimeoutMillis, elapsedMillis);
}
// now return all other connections to pool and verify that just 2 expire
manager.returnConnection(conn2);
manager.returnConnection(conn3);
manager.returnConnection(conn4);
manager.returnConnection(conn5);
{
long elapsedMillis = factory.waitWhile(() -> factory.destroys < 3);
Assert.assertEquals(5, factory.creates);
Assert.assertEquals(3, factory.destroys);
Assert.assertEquals(3, factory.closes);
Assert.assertEquals(3, poolStats.getIdleExpire());
checkIdleTimeout(idleTimeoutMillis, elapsedMillis);
}
// wait to make sure min-connections don't time out
Thread.sleep(idleTimeoutMillis * 2);
Assert.assertEquals(5, factory.creates);
Assert.assertEquals(3, factory.destroys);
Assert.assertEquals(3, factory.closes);
Assert.assertEquals(3, poolStats.getIdleExpire());
}
private void checkIdleTimeout(final long idleTimeoutMillis, long elapsedMillis) {
Assert.assertTrue(
"Elapsed " + elapsedMillis + " is less than idle timeout " + idleTimeoutMillis,
elapsedMillis >= (idleTimeoutMillis - ALLOWABLE_ERROR_IN_MILLIS));
Assert.assertTrue(
"Elapsed " + elapsedMillis + " is greater than idle timeout " + idleTimeoutMillis,
elapsedMillis <= (idleTimeoutMillis + ALLOWABLE_ERROR_IN_MILLIS));
}
@Test
public void testBug41516()
throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
final long idleTimeoutMillis = 300;
final long BORROW_TIMEOUT_MILLIS = 500;
manager =
new ConnectionManagerImpl("pool", factory, endpointManager, 2, 1, idleTimeoutMillis, -1,
logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
Connection conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
// Return some connections, let them idle expire
manager.returnConnection(conn1);
manager.returnConnection(conn2);
{
long elapsedMillis = factory.waitWhile(() -> factory.destroys < 1);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(1, factory.closes);
Assert.assertEquals(1, poolStats.getIdleExpire());
Assert.assertTrue(
"Elapsed " + elapsedMillis + " is less than idle timeout " + idleTimeoutMillis,
elapsedMillis + ALLOWABLE_ERROR_IN_MILLIS >= idleTimeoutMillis);
}
// Ok, now get some connections that fill our queue
Connection ping1 =
manager.borrowConnection(new ServerLocation("localhost", 5), false);
Connection ping2 =
manager.borrowConnection(new ServerLocation("localhost", 5), false);
manager.returnConnection(ping1);
manager.returnConnection(ping2);
manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
long startNanos = nowNanos();
try {
manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
fail("Didn't get an exception");
} catch (AllConnectionsInUseException e) {
// expected
}
long elapsedMillis = elapsedMillis(startNanos);
Assert.assertTrue("Elapsed = " + elapsedMillis,
elapsedMillis >= BORROW_TIMEOUT_MILLIS - ALLOWABLE_ERROR_IN_MILLIS);
}
@Test
public void testLifetimeExpiration() throws InterruptedException, AllConnectionsInUseException,
NoAvailableServersException, Throwable {
int lifetimeTimeout = 500;
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 2, 2, -1, lifetimeTimeout,
logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
{
factory.waitWhile(() -> factory.creates < 2);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(0, factory.finds);
}
// need to start a thread that keeps the connections busy
// so that their last access time keeps changing
AtomicReference exception = new AtomicReference();
int updaterCount = 2;
UpdaterThread[] updaters = new UpdaterThread[updaterCount];
for (int i = 0; i < updaterCount; i++) {
updaters[i] = new UpdaterThread(null, exception, i, (lifetimeTimeout / 10) * 2);
}
for (int i = 0; i < updaterCount; i++) {
updaters[i].start();
}
{
long durationMillis = factory.waitWhile(() -> factory.finds < 2);
Assert.assertEquals(2, factory.finds);
// server shouldn't have changed so no increase in creates or destroys
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(0, factory.closes);
Assert.assertTrue("took too long to expire lifetime; expected=" + lifetimeTimeout
+ " but took=" + durationMillis, durationMillis < lifetimeTimeout * 5);
}
for (int i = 0; i < updaterCount; i++) {
ThreadUtils.join(updaters[i], 30 * 1000);
}
if (exception.get() != null) {
throw (Throwable) exception.get();
}
for (int i = 0; i < updaterCount; i++) {
Assert.assertFalse("Updater [" + i + "] is still running", updaters[i].isAlive());
}
}
@Test
public void testExclusiveConnectionAccess() throws Throwable {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
AtomicReference exception = new AtomicReference();
AtomicBoolean haveConnection = new AtomicBoolean();
int updaterCount = 10;
UpdaterThread[] updaters = new UpdaterThread[updaterCount];
for (int i = 0; i < updaterCount; i++) {
updaters[i] = new UpdaterThread(haveConnection, exception, i);
}
for (int i = 0; i < updaterCount; i++) {
updaters[i].start();
}
for (int i = 0; i < updaterCount; i++) {
ThreadUtils.join(updaters[i], 30 * 1000);
}
if (exception.get() != null) {
throw (Throwable) exception.get();
}
for (int i = 0; i < updaterCount; i++) {
Assert.assertFalse("Updater [" + i + "] is still running", updaters[i].isAlive());
}
}
@Test
public void testClose()
throws AllConnectionsInUseException, NoAvailableServersException, InterruptedException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(0);
manager.borrowConnection(0);
manager.returnConnection(conn1);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
manager.close(false);
Assert.assertEquals(2, factory.closes);
Assert.assertEquals(2, factory.destroys);
}
@Test
public void testExchangeConnection() throws Exception {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 2, 0, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(10);
Connection conn2 = manager.borrowConnection(10);
try {
manager.borrowConnection(10);
fail("Exepected no servers available");
} catch (AllConnectionsInUseException e) {
// expected
}
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
Connection conn3 = manager.exchangeConnection(conn1, Collections.emptySet());
Assert.assertEquals(3, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
manager.returnConnection(conn2);
Assert.assertEquals(3, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
Connection conn4 =
manager.exchangeConnection(conn3, Collections.singleton(conn3.getServer()));
Assert.assertEquals(4, factory.creates);
Assert.assertEquals(2, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
manager.returnConnection(conn4);
}
/**
* This tests that a deadlock between connection formation and connection pool closing has been
* fixed. See GEODE-4615
*/
@Test
public void testThatMapCloseCausesCacheClosedException() throws Exception {
final ConnectionManagerImpl connectionManager = new ConnectionManagerImpl("pool", factory,
endpointManager, 2, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager = connectionManager;
connectionManager.start(background);
final ConnectionManagerImpl.ConnectionMap connectionMap = connectionManager.allConnectionsMap;
final int thread1 = 0;
final int thread2 = 1;
final boolean[] ready = new boolean[2];
Thread thread = new Thread("ConnectionManagerJUnitTest thread") {
@Override
public void run() {
setReady(ready, thread1);
waitUntilReady(ready, thread2);
connectionMap.close(false);
}
};
thread.setDaemon(true);
thread.start();
try {
Connection firstConnection = connectionManager.borrowConnection(0);
synchronized (firstConnection) {
setReady(ready, thread2);
waitUntilReady(ready, thread1);
// the other thread will now try to close the connection map but it will block
// because this thread has locked one of the connections
await().until(() -> connectionMap.closing);
try {
connectionManager.borrowConnection(0);
fail("expected a CacheClosedException");
} catch (CacheClosedException e) {
// expected
}
}
} finally {
if (thread.isAlive()) {
System.out.println("stopping background thread");
thread.interrupt();
thread.join();
}
}
}
private void setReady(boolean[] ready, int index) {
System.out.println(
Thread.currentThread().getName() + ": setting that thread" + (index + 1) + " is ready");
synchronized (ready) {
ready[index] = true;
}
}
private void waitUntilReady(boolean[] ready, int index) {
System.out.println(
Thread.currentThread().getName() + ": waiting for thread" + (index + 1) + " to be ready");
await().until(() -> {
synchronized (ready) {
return (ready[index]);
}
});
}
private long nowNanos() {
return System.nanoTime();
}
private long elapsedNanos(long startNanos) {
return nowNanos() - startNanos;
}
private long elapsedMillis(long startNanos) {
return NANOSECONDS.toMillis(elapsedNanos(startNanos));
}
@Test
public void testBlocking() throws Throwable {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
final Connection conn1 = manager.borrowConnection(10);
long BORROW_TIMEOUT_MILLIS = 300;
long startNonos = nowNanos();
try {
manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
fail("Should have received no servers available");
} catch (AllConnectionsInUseException expected) {
}
long elapsedMillis = elapsedMillis(startNonos);
Assert.assertTrue(
"Should have blocked for " + BORROW_TIMEOUT_MILLIS + " millis for a connection",
elapsedMillis >= BORROW_TIMEOUT_MILLIS - ALLOWABLE_ERROR_IN_MILLIS);
Thread returnThread = new Thread() {
@Override
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
fail("interrupted");
}
manager.returnConnection(conn1);
}
};
returnThread.start();
BORROW_TIMEOUT_MILLIS = 5000;
startNonos = nowNanos();
Connection conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
elapsedMillis = elapsedMillis(startNonos);
Assert.assertTrue(
"Should have blocked for less than " + BORROW_TIMEOUT_MILLIS + " milliseconds",
elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
manager.returnConnection(conn2);
final Connection conn3 = manager.borrowConnection(10);
Thread invalidateThread = new Thread() {
@Override
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
fail("interrupted");
}
conn3.destroy();
manager.returnConnection(conn3);
}
};
invalidateThread.start();
startNonos = nowNanos();
conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
elapsedMillis = elapsedMillis(startNonos);
Assert.assertTrue(
"Should have blocked for less than " + BORROW_TIMEOUT_MILLIS + " milliseconds",
elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
manager.returnConnection(conn2);
final Connection conn4 = manager.borrowConnection(10);
Thread invalidateThread2 = new Thread() {
@Override
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
fail("interrupted");
}
endpointManager.serverCrashed(conn4.getEndpoint());
manager.returnConnection(conn4);
}
};
invalidateThread2.start();
startNonos = nowNanos();
conn2 = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
elapsedMillis = elapsedMillis(startNonos);
Assert.assertTrue(
"Should have blocked for less than " + BORROW_TIMEOUT_MILLIS + " milliseconds",
elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
manager.returnConnection(conn2);
}
@Test
public void testExplicitServer() throws Exception {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger,
60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(0);
try {
manager.borrowConnection(10);
fail("Should have received an error");
} catch (AllConnectionsInUseException expected) {
// do nothing
}
Connection conn3 = manager.borrowConnection(new ServerLocation("localhost", -2), false);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(0, factory.closes);
manager.returnConnection(conn3);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(1, factory.closes);
manager.returnConnection(conn1);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(1, factory.closes);
}
private class UpdaterThread extends Thread {
private AtomicReference exception;
private final AtomicBoolean haveConnection;
private int id;
private final int iterations;
public UpdaterThread(AtomicBoolean haveConnection, AtomicReference exception, int id) {
this(haveConnection, exception, id, 10);
}
public UpdaterThread(AtomicBoolean haveConnection, AtomicReference exception, int id,
int iterations) {
this.haveConnection = haveConnection;
this.exception = exception;
this.id = id;
this.iterations = iterations;
}
private Connection borrow(int i) {
long startNanos = nowNanos();
long BORROW_TIMEOUT_MILLIS = 2000;
Connection conn = manager.borrowConnection(BORROW_TIMEOUT_MILLIS);
if (haveConnection != null) {
Assert.assertTrue("Updater[" + id + "] loop[" + i + "] Someone else has the connection!",
haveConnection.compareAndSet(false, true));
}
long elapsedMillis = elapsedMillis(startNanos);
Assert.assertTrue("Elapsed time (" + elapsedMillis + ") >= " + BORROW_TIMEOUT_MILLIS,
elapsedMillis < BORROW_TIMEOUT_MILLIS + ALLOWABLE_ERROR_IN_MILLIS);
return conn;
}
@Override
public void run() {
int i = 0;
Connection conn = null;
try {
for (i = 0; i < iterations; i++) {
conn = borrow(i);
try {
Thread.sleep(10);
if (haveConnection != null) {
Assert.assertTrue(
"Updater[" + id + "] loop[" + i + "] Someone else changed the connection flag",
haveConnection.compareAndSet(true, false));
}
} finally {
manager.returnConnection(conn);
}
}
} catch (Throwable t) {
this.exception.compareAndSet(null,
new Exception("ERROR Updater[" + id + "] loop[" + i + "]", t));
}
}
}
public class DummyFactory implements ConnectionFactory {
public ServerLocation nextServer = new ServerLocation("localhost", -1);
protected volatile int creates;
protected volatile int destroys;
protected volatile int closes;
protected volatile int finds;
/**
* Wait as long as "whileCondition" is true.
* The wait will timeout after TIMEOUT_MILLIS has elapsed.
*
* @return the elapsed time in milliseconds.
*/
public synchronized long waitWhile(BooleanSupplier whileCondition) throws InterruptedException {
final long startNanos = nowNanos();
long remainingMillis = TIMEOUT_MILLIS;
while (whileCondition.getAsBoolean() && remainingMillis > 0) {
wait(remainingMillis);
remainingMillis = TIMEOUT_MILLIS - elapsedMillis(startNanos);
}
return elapsedMillis(startNanos);
}
@Override
public ServerDenyList getDenyList() {
return new ServerDenyList(1);
}
@Override
public ServerLocation findBestServer(ServerLocation currentServer, Set excludedServers) {
synchronized (this) {
finds++;
this.notifyAll();
}
if (excludedServers != null) {
if (excludedServers.contains(nextServer)) {
return null;
}
}
return nextServer;
}
@Override
public Connection createClientToServerConnection(Set excluded) {
return createClientToServerConnection(nextServer, true);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.cache.client.internal.ConnectionFactory#createClientToServerConnection(org.
* apache.geode.distributed.internal.ServerLocation)
*/
@Override
public Connection createClientToServerConnection(final ServerLocation location,
boolean forQueue) {
synchronized (this) {
creates++;
this.notifyAll();
}
DistributedMember fakeMember = null;
fakeMember = new InternalDistributedMember("localhost", 555);
final DistributedMember member = fakeMember;
return new Connection() {
private Endpoint endpoint = endpointManager.referenceEndpoint(location, member);
@Override
public void destroy() {
synchronized (DummyFactory.this) {
destroys++;
DummyFactory.this.notifyAll();
}
}
@Override
public ServerLocation getServer() {
return location;
}
@Override
public ByteBuffer getCommBuffer() {
return null;
}
@Override
public Socket getSocket() {
return null;
}
@Override
public ConnectionStats getStats() {
return null;
}
@Override
public void close(boolean keepAlive) throws Exception {
synchronized (DummyFactory.this) {
closes++;
DummyFactory.this.notifyAll();
}
}
@Override
public Endpoint getEndpoint() {
return endpoint;
}
@Override
public ServerQueueStatus getQueueStatus() {
return null;
}
@Override
public Object execute(Op op) throws Exception {
return op.attempt(this);
}
@Override
public boolean isDestroyed() {
return false;
}
@Override
public void emergencyClose() {}
@Override
public short getWanSiteVersion() {
return -1;
}
@Override
public int getDistributedSystemId() {
return -1;
}
@Override
public void setWanSiteVersion(short wanSiteVersion) {}
@Override
public InputStream getInputStream() {
return null;
}
@Override
public OutputStream getOutputStream() {
return null;
}
@Override
public void setConnectionID(long id) {}
@Override
public long getConnectionID() {
return 0;
}
};
}
@Override
public ClientUpdater createServerToClientConnection(Endpoint endpoint, QueueManager manager,
boolean isPrimary, ClientUpdater failedUpdater) {
return null;
}
}
}