blob: 8006dae0a238e4beb7845e9249f984e341d7f66d [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.client.internal.pooling;
import static org.junit.Assert.fail;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.cache.client.AllConnectionsInUseException;
import com.gemstone.gemfire.cache.client.NoAvailableServersException;
import com.gemstone.gemfire.cache.client.internal.ClientUpdater;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.ConnectionFactory;
import com.gemstone.gemfire.cache.client.internal.ConnectionStats;
import com.gemstone.gemfire.cache.client.internal.Endpoint;
import com.gemstone.gemfire.cache.client.internal.EndpointManager;
import com.gemstone.gemfire.cache.client.internal.EndpointManagerImpl;
import com.gemstone.gemfire.cache.client.internal.Op;
import com.gemstone.gemfire.cache.client.internal.QueueManager;
import com.gemstone.gemfire.cache.client.internal.ServerBlackList;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.PoolStats;
import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LocalLogWriter;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
import dunit.DistributedTestCase;
import dunit.DistributedTestCase.WaitCriterion;
/**
* @author dsmith
*
*/
@Category(IntegrationTest.class)
public class ConnectionManagerJUnitTest {
private static final long TIMEOUT = 30 * 1000;
//This is added for some windows machines which think the connection expired
//before the idle timeout due to precision issues.
private static final long ALLOWABLE_ERROR_IN_EXPIRATION= 20; //milliseconds
ConnectionManager manager;
private InternalLogWriter logger;
protected DummyFactory factory;
private DistributedSystem ds;
private ScheduledExecutorService background;
protected EndpointManager endpointManager;
private CancelCriterion cancelCriterion;
private PoolStats poolStats;
@Before
public void setUp() {
this.logger = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out);
factory = new DummyFactory();
Properties properties = new Properties();
properties.put(DistributionConfig.MCAST_PORT_NAME, "0");
properties.put(DistributionConfig.LOCATORS_NAME, "");
ds = DistributedSystem.connect(properties);
background = Executors.newSingleThreadScheduledExecutor();
poolStats = new PoolStats(ds, "connectionManagerJUnitTest");
endpointManager = new EndpointManagerImpl("pool", ds, ds.getCancelCriterion(), poolStats);
cancelCriterion = new CancelCriterion() {
public String cancelInProgress() {
return null;
}
public RuntimeException generateCancelledException(Throwable e) {
return null;
}
};
}
@After
public void tearDown() throws InterruptedException {
ds.disconnect();
if(manager != null) {
manager.close(false);
}
background.shutdownNow();
}
@Test
public void testGet() throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
manager = new ConnectionManagerImpl("pool", factory,endpointManager, 3, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn[] = new Connection[4];
conn[0] = manager.borrowConnection(0);
Assert.assertEquals(1,factory.creates);
manager.returnConnection(conn[0]);
conn[0] = manager.borrowConnection(0);
Assert.assertEquals(1,factory.creates);
conn[1] = manager.borrowConnection(0);
manager.returnConnection(conn[0]);
manager.returnConnection(conn[1]);
Assert.assertEquals(2,factory.creates);
conn[0] = manager.borrowConnection(0);
conn[1] = manager.borrowConnection(0);
conn[2] = manager.borrowConnection(0);
Assert.assertEquals(3,factory.creates);
try {
conn[4] = manager.borrowConnection(10);
fail("Should have received an all connections in use exception");
} catch(AllConnectionsInUseException e) {
//expected exception
}
}
@Test
public void testPrefill() throws InterruptedException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 2, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
final String descrip = manager.toString();
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return factory.creates == 2 && factory.destroys == 0;
}
public String description() {
return "waiting for manager " + descrip;
}
};
DistributedTestCase.waitForCriterion(ev, 200, 200, true);
}
@Test
public void testInvalidateConnection() throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, 0L, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn = manager.borrowConnection(0);
Assert.assertEquals(1,factory.creates);
Assert.assertEquals(0,factory.destroys);
conn.destroy();
manager.returnConnection(conn);
Assert.assertEquals(1,factory.creates);
Assert.assertEquals(1,factory.destroys);
conn = manager.borrowConnection(0);
Assert.assertEquals(2,factory.creates);
Assert.assertEquals(1,factory.destroys);
}
@Test
public void testInvalidateServer() throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
ServerLocation server1 = new ServerLocation("localhost", 1);
ServerLocation server2 = new ServerLocation("localhost", 2);
factory.nextServer = server1;
Connection conn1 = manager.borrowConnection(0);
Connection conn2 = manager.borrowConnection(0);
Connection conn3 = manager.borrowConnection(0);
factory.nextServer = server2;
Connection conn4 = manager.borrowConnection(0);
Assert.assertEquals(4,factory.creates);
Assert.assertEquals(0,factory.destroys);
manager.returnConnection(conn2);
endpointManager.serverCrashed(conn2.getEndpoint());
Assert.assertEquals(3,factory.destroys);
conn1.destroy();
manager.returnConnection(conn1);
Assert.assertEquals(3,factory.destroys);
manager.returnConnection(conn3);
manager.returnConnection(conn4);
Assert.assertEquals(3,factory.destroys);
manager.borrowConnection(0);
Assert.assertEquals(4,factory.creates);
Assert.assertEquals(3,factory.destroys);
}
// public void testGetConnectionToSpecificServer() throws AllConnectionsInUseException, NoAvailableServersException, InterruptedException {
// DummySource source = new DummySource();
// manager = new ConnectionManager(source, factory, 10, -1, 10, logger, logger);
// manager.start();
//
// source.nextServer = new ServerLocation("localhost", 10);
// Connection conn1 = manager.borrowConnection(10);
// manager.returnConnection(conn1);
//
// try {
// Connection conn2 = manager.borrowConnection(new ServerLocation("locahost", 20), 10);
// Assert.fail("Should have received no servers available, because we asked for a server we can't get");
// } catch(NoAvailableServersException expected) {
//
// }
//
// }
@Test
public void testIdleExpiration() throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
final long idleTimeout = 300;
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 5, 2, idleTimeout, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
{
long start = System.currentTimeMillis();
synchronized(factory) {
long remaining = TIMEOUT;
while(factory.creates < 2 && remaining > 0) {
factory.wait(remaining);
remaining = TIMEOUT - (System.currentTimeMillis() - start);
}
}
Assert.assertEquals(2,factory.creates);
Assert.assertEquals(0,factory.destroys);
Assert.assertEquals(0,factory.closes);
Assert.assertEquals(0,poolStats.getIdleExpire());
// no need to wait; dangerous because it gives connections a chance to expire
// //wait for prefill task to finish.
// Thread.sleep(100);
}
Connection conn1 = manager.borrowConnection(500);
Connection conn2 = manager.borrowConnection(500);
Connection conn3 = manager.borrowConnection(500);
Connection conn4 = manager.borrowConnection(500);
Connection conn5 = manager.borrowConnection(500);
//wait to make sure checked out connections aren't timed out
Thread.sleep(idleTimeout*2);
Assert.assertEquals(5,factory.creates);
Assert.assertEquals(0,factory.destroys);
Assert.assertEquals(0,factory.closes);
Assert.assertEquals(0,poolStats.getIdleExpire());
// make sure a thread local connection that has been passivated can idle-expire
manager.passivate(conn1, true);
{
long start = System.currentTimeMillis();
synchronized(factory) {
long remaining = TIMEOUT;
while(factory.destroys < 1 && remaining > 0) {
factory.wait(remaining);
remaining = TIMEOUT - (System.currentTimeMillis() - start);
}
}
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Elapsed " + elapsed + " is less than idle timeout "
+ idleTimeout,
elapsed + ALLOWABLE_ERROR_IN_EXPIRATION >= idleTimeout);
Assert.assertEquals(5,factory.creates);
Assert.assertEquals(1,factory.destroys);
Assert.assertEquals(1,factory.closes);
Assert.assertEquals(1,poolStats.getIdleExpire());
}
// now return all other connections to pool and verify that just 2 expire
manager.returnConnection(conn2);
manager.returnConnection(conn3);
manager.returnConnection(conn4);
manager.returnConnection(conn5);
{
long start = System.currentTimeMillis();
synchronized(factory) {
long remaining = TIMEOUT;
while(factory.destroys < 3 && remaining > 0) {
factory.wait(remaining);
remaining = TIMEOUT - (System.currentTimeMillis() - start);
}
}
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Elapsed " + elapsed + " is less than idle timeout "
+ idleTimeout,
elapsed + ALLOWABLE_ERROR_IN_EXPIRATION >= idleTimeout);
Assert.assertEquals(5,factory.creates);
Assert.assertEquals(3,factory.destroys);
Assert.assertEquals(3,factory.closes);
Assert.assertEquals(3,poolStats.getIdleExpire());
}
//wait to make sure min-connections don't time out
Thread.sleep(idleTimeout*2);
Assert.assertEquals(5,factory.creates);
Assert.assertEquals(3,factory.destroys);
Assert.assertEquals(3,factory.closes);
Assert.assertEquals(3,poolStats.getIdleExpire());
}
@Test
public void testBug41516() throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException {
final long idleTimeout = 300;
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 2, 1, idleTimeout, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(500);
Connection conn2 = manager.borrowConnection(500);
//Return some connections, let them idle expire
manager.returnConnection(conn1);
manager.returnConnection(conn2);
{
long start = System.currentTimeMillis();
synchronized(factory) {
long remaining = TIMEOUT;
while(factory.destroys < 1 && remaining > 0) {
factory.wait(remaining);
remaining = TIMEOUT - (System.currentTimeMillis() - start);
}
}
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Elapsed " + elapsed + " is less than idle timeout "
+ idleTimeout,
elapsed + ALLOWABLE_ERROR_IN_EXPIRATION >= idleTimeout);
Assert.assertEquals(1,factory.destroys);
Assert.assertEquals(1,factory.closes);
Assert.assertEquals(1,poolStats.getIdleExpire());
}
//Ok, now get some connections that fill our queue
Connection ping1 = manager.borrowConnection(new ServerLocation("localhost", 5), 500, false);
Connection ping2 = manager.borrowConnection(new ServerLocation("localhost", 5), 500, false);
manager.returnConnection(ping1);
manager.returnConnection(ping2);
Connection conn3 = manager.borrowConnection(500);
Connection conn4 = manager.borrowConnection(500);
long start = System.currentTimeMillis();
try {
Connection conn5 = manager.borrowConnection(500);
fail("Didn't get an exception");
} catch(AllConnectionsInUseException e) {
//expected
}
long elapsed = System.currentTimeMillis() - start;
Assert.assertTrue("Elapsed = " + elapsed, elapsed >= 500);
}
@Test
public void testLifetimeExpiration() throws InterruptedException, AllConnectionsInUseException, NoAvailableServersException, Throwable {
int lifetimeTimeout = 500;
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 2, 2, -1, lifetimeTimeout, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
{
long start = System.currentTimeMillis();
synchronized(factory) {
long remaining = TIMEOUT;
while(factory.creates < 2 && remaining > 0) {
factory.wait(remaining);
remaining = TIMEOUT - (System.currentTimeMillis() - start);
}
}
Assert.assertEquals(2,factory.creates);
Assert.assertEquals(0,factory.destroys);
Assert.assertEquals(0,factory.finds);
}
// need to start a thread that keeps the connections busy
// so that their last access time keeps changing
AtomicReference exception = new AtomicReference();
int updaterCount = 2;
UpdaterThread[] updaters = new UpdaterThread[updaterCount];
for(int i = 0; i < updaterCount; i++) {
updaters[i] = new UpdaterThread(null, exception, i, (lifetimeTimeout/10)*2, true);
}
for(int i = 0; i < updaterCount; i++) {
updaters[i].start();
}
{
long start = System.currentTimeMillis();
synchronized(factory) {
long remaining = TIMEOUT;
while(factory.finds < 2 && remaining > 0) {
factory.wait(remaining);
remaining = TIMEOUT - (System.currentTimeMillis() - start);
}
}
long end = System.currentTimeMillis();
Assert.assertEquals(2,factory.finds);
// server shouldn't have changed so no increase in creates or destroys
Assert.assertEquals(2,factory.creates);
Assert.assertEquals(0,factory.destroys);
Assert.assertEquals(0,factory.closes);
Assert.assertTrue("took too long to expire lifetime; expected="
+ lifetimeTimeout + " but took=" + (end-start),
(end-start) < lifetimeTimeout*2);
}
for(int i = 0; i < updaterCount; i++) {
DistributedTestCase.join(updaters[i], 30 * 1000, null);
}
if(exception.get() !=null) {
throw (Throwable) exception.get();
}
for(int i = 0; i < updaterCount; i++) {
Assert.assertFalse("Updater [" + i + "] is still running", updaters[i].isAlive());
}
// //wait for prefill task to finish.
// Thread.sleep(100);
// Connection conn1 = manager.borrowConnection(0);
// Connection conn2 = manager.borrowConnection(0);
// Connection conn3 = manager.borrowConnection(0);
// Connection conn4 = manager.borrowConnection(0);
// Connection conn5 = manager.borrowConnection(0);
// //wait to make sure checked out connections aren't timed out
// Thread.sleep(idleTimeout + 100);
// Assert.assertEquals(5,factory.creates);
// Assert.assertEquals(0,factory.destroys);
// manager.returnConnection(conn1);
// manager.returnConnection(conn2);
// manager.returnConnection(conn3);
// manager.returnConnection(conn4);
// manager.returnConnection(conn5);
// long start = System.currentTimeMillis();
// synchronized(factory) {
// while(factory.destroys < 3 && remaining > 0) {
// factory.wait(remaining);
// remaining = TIMEOUT - (System.currentTimeMillis() - start);
// }
// }
// long elapsed = System.currentTimeMillis() - start;
// Assert.assertTrue(elapsed > idleTimeout);
// Assert.assertEquals(5,factory.creates);
// Assert.assertEquals(3,factory.destroys);
}
@Test
public void testExclusiveConnectionAccess() throws Throwable {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
AtomicReference exception = new AtomicReference();
AtomicBoolean haveConnection = new AtomicBoolean();
int updaterCount = 10;
UpdaterThread[] updaters = new UpdaterThread[updaterCount];
for(int i = 0; i < updaterCount; i++) {
updaters[i] = new UpdaterThread(haveConnection, exception, i);
}
for(int i = 0; i < updaterCount; i++) {
updaters[i].start();
}
for(int i = 0; i < updaterCount; i++) {
DistributedTestCase.join(updaters[i], 30 * 1000, null);
}
if(exception.get() !=null) {
throw (Throwable) exception.get();
}
for(int i = 0; i < updaterCount; i++) {
Assert.assertFalse("Updater [" + i + "] is still running", updaters[i].isAlive());
}
}
@Test
public void testClose() throws AllConnectionsInUseException, NoAvailableServersException, InterruptedException {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 10, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(0);
manager.borrowConnection(0);
manager.returnConnection(conn1);
Assert.assertEquals(2,factory.creates);
Assert.assertEquals(0,factory.destroys);
manager.close(false);
Assert.assertEquals(2, factory.closes);
Assert.assertEquals(2, factory.destroys);
}
@Test
public void testExchangeConnection() throws Exception {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 2, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(10);
Connection conn2 = manager.borrowConnection(10);
try {
manager.borrowConnection(10);
fail("Exepected no servers available");
} catch(AllConnectionsInUseException e) {
//expected
}
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
Connection conn3 = manager.exchangeConnection(conn1, Collections.EMPTY_SET, 10);
Assert.assertEquals(3, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
manager.returnConnection(conn2);
Assert.assertEquals(3, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
Connection conn4 = manager.exchangeConnection(conn3, Collections.singleton(conn3.getServer()), 10);
Assert.assertEquals(4, factory.creates);
Assert.assertEquals(2, factory.destroys);
Assert.assertEquals(2, manager.getConnectionCount());
manager.returnConnection(conn4);
}
@Test
public void testBlocking() throws Throwable {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
final Connection conn1 = manager.borrowConnection(10);
long startTime = System.currentTimeMillis();
try {
manager.borrowConnection(100);
fail("Should have received no servers available");
} catch(AllConnectionsInUseException expected) {
}
long elapsed = System.currentTimeMillis() - startTime;
Assert.assertTrue("Should have blocked for 100 millis for a connection", elapsed >= 100);
Thread returnThread = new Thread() {
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
fail("interrupted");
}
manager.returnConnection(conn1);
}
};
returnThread.start();
startTime = System.currentTimeMillis();
Connection conn2 = manager.borrowConnection(1000);
elapsed = System.currentTimeMillis() - startTime;
Assert.assertTrue("Should have blocked for less than 1 second", elapsed < 1000);
manager.returnConnection(conn2);
final Connection conn3 = manager.borrowConnection(10);
Thread invalidateThread = new Thread() {
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
fail("interrupted");
}
conn3.destroy();
manager.returnConnection(conn3);
}
};
invalidateThread.start();
startTime = System.currentTimeMillis();
conn2 = manager.borrowConnection(1000);
elapsed = System.currentTimeMillis() - startTime;
Assert.assertTrue("Should have blocked for less than 1 second", elapsed < 1000);
manager.returnConnection(conn2);
final Connection conn4 = manager.borrowConnection(10);
Thread invalidateThread2 = new Thread() {
public void run() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
fail("interrupted");
}
endpointManager.serverCrashed(conn4.getEndpoint());
manager.returnConnection(conn4);
}
};
invalidateThread2.start();
startTime = System.currentTimeMillis();
conn2 = manager.borrowConnection(1000);
elapsed = System.currentTimeMillis() - startTime;
Assert.assertTrue("Should have blocked for less than 1 second", elapsed < 1000);
manager.returnConnection(conn2);
}
@Test
public void testExplicitServer() throws Exception {
manager = new ConnectionManagerImpl("pool", factory, endpointManager, 1, 0, -1, -1, logger, 60 * 1000, cancelCriterion, poolStats);
manager.start(background);
Connection conn1 = manager.borrowConnection(0);
try {
manager.borrowConnection(10);
fail("Should have received an error");
} catch(AllConnectionsInUseException expected) {
//do nothing
}
Connection conn3 = manager.borrowConnection(new ServerLocation("localhost", -2), 10,false);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(0, factory.destroys);
Assert.assertEquals(0, factory.closes);
manager.returnConnection(conn3);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(1, factory.closes);
manager.returnConnection(conn1);
Assert.assertEquals(2, factory.creates);
Assert.assertEquals(1, factory.destroys);
Assert.assertEquals(1, factory.closes);
}
private class UpdaterThread extends Thread {
private AtomicReference exception;
private final AtomicBoolean haveConnection;
private int id;
private final int iterations;
/**
* If true then obtain the connection as if it is a thread local one
*/
private final boolean threadLocal;
public UpdaterThread(AtomicBoolean haveConnection, AtomicReference exception, int id) {
this(haveConnection, exception, id, 10, false);
}
public UpdaterThread(AtomicBoolean haveConnection, AtomicReference exception, int id, int iterations, boolean threadLocal) {
this.haveConnection = haveConnection;
this.exception =exception;
this.id = id;
this.iterations = iterations;
this.threadLocal = threadLocal;
}
private Connection borrow(int i) {
long startTime = System.currentTimeMillis();
Connection conn = manager.borrowConnection(2000);
if (haveConnection != null) {
Assert.assertTrue("Updater[" + id + "] loop[" + i + "] Someone else has the connection!", haveConnection.compareAndSet(false, true));
}
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed >= 2000) {
Assert.assertTrue("Elapsed time (" + elapsed + ") >= 2000", false);
}
return conn;
}
public void run() {
int i = 0;
Connection conn = null;
try {
if (threadLocal) {
conn = borrow(-1);
}
for(i = 0; i < iterations; i++) {
if (!threadLocal) {
conn = borrow(i);
} else {
if (i != 0) {
manager.activate(conn);
}
}
try {
Thread.sleep(10);
if (haveConnection != null) {
Assert.assertTrue("Updater[" + id + "] loop[" + i + "] Someone else changed the connection flag", haveConnection.compareAndSet(true, false));
}
} finally {
if (!threadLocal) {
manager.returnConnection(conn);
} else {
manager.passivate(conn, true);
}
}
}
} catch(Throwable t) {
this.exception.compareAndSet(null, new Exception("ERROR Updater[" + id + "] loop[" + i + "]" , t));
} finally {
if (threadLocal && conn != null ) {
manager.returnConnection(conn);
}
}
}
}
public class DummyFactory implements ConnectionFactory {
public ServerLocation nextServer = new ServerLocation("localhost", -1);
protected volatile int creates;
protected volatile int destroys;
protected volatile int closes;
protected volatile int finds;
public ServerBlackList getBlackList() {
return new ServerBlackList(1);
}
public ServerLocation findBestServer(ServerLocation currentServer, Set excludedServers) {
synchronized(this) {
finds++;
this.notifyAll();
}
if (excludedServers != null) {
if (excludedServers.contains(nextServer)) {
return null;
}
}
return nextServer;
}
public Connection createClientToServerConnection(Set excluded) {
return createClientToServerConnection(nextServer, true);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.cache.client.internal.ConnectionFactory#createClientToServerConnection(com.gemstone.gemfire.distributed.internal.ServerLocation)
*/
public Connection createClientToServerConnection(final ServerLocation location, boolean forQueue) {
synchronized(this) {
creates++;
this.notifyAll();
}
DistributedMember fakeMember = null;
try {
fakeMember = new InternalDistributedMember("localhost", 555);
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
final DistributedMember member = fakeMember;
return new Connection() {
private Endpoint endpoint = endpointManager.referenceEndpoint(location, member);
public void destroy() {
synchronized(DummyFactory.this) {
destroys++;
DummyFactory.this.notifyAll();
}
}
public ServerLocation getServer() {
return location;
}
public ByteBuffer getCommBuffer() {
return null;
}
public Socket getSocket() {
return null;
}
public ConnectionStats getStats() {
return null;
}
public void close(boolean keepAlive) throws Exception {
synchronized(DummyFactory.this) {
closes++;
DummyFactory.this.notifyAll();
}
}
public Endpoint getEndpoint() {
return endpoint;
}
public ServerQueueStatus getQueueStatus() {
return null;
}
public Object execute(Op op) throws Exception {
return op.attempt(this);
}
public boolean isDestroyed() {return false;}
public void emergencyClose() {
}
public short getWanSiteVersion(){
return -1;
}
public int getDistributedSystemId() {
return -1;
}
public void setWanSiteVersion(short wanSiteVersion){
}
public InputStream getInputStream() {
return null;
}
public OutputStream getOutputStream() {
return null;
}
public void setConnectionID(long id) {
}
public long getConnectionID() {
return 0;
}
};
}
public ClientUpdater createServerToClientConnection(Endpoint endpoint,
QueueManager manager, boolean isPrimary, ClientUpdater failedUpdater) {
return null;
}
}
}