blob: 413b1ba7c6be87fae2773bdb47c76038fd5e6166 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.client.internal;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.cache.RegionService;
import com.gemstone.gemfire.cache.NoSubscriptionServersAvailableException;
import com.gemstone.gemfire.cache.client.NoAvailableLocatorsException;
import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionRequest;
import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionResponse;
import com.gemstone.gemfire.cache.client.internal.locator.LocatorListResponse;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.PoolStatHelper;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.distributed.internal.SharedConfiguration;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpHandler;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.PoolStats;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
/**
* @author dsmith
*
*/
@SuppressWarnings("deprecation")
@Category(IntegrationTest.class)
public class AutoConnectionSourceImplJUnitTest {
private Cache cache;
private int port;
private FakeHandler handler;
private FakePool pool = new FakePool();
private AutoConnectionSourceImpl source;
private TcpServer server;
private ScheduledExecutorService background;
private PoolStats poolStats;
@Before
public void setUp() throws Exception {
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", "");
DistributedSystem ds = DistributedSystem.connect(props);
cache = CacheFactory.create(ds);
poolStats = new PoolStats(ds, "pool");
port = AvailablePortHelper.getRandomAvailableTCPPort();
handler = new FakeHandler();
ArrayList responseLocators = new ArrayList();
responseLocators.add(new ServerLocation(InetAddress.getLocalHost().getHostName(), port));
handler.nextLocatorListResponse = new LocatorListResponse(responseLocators, false);
//very irritating, the SystemTimer requires having a distributed system
Properties properties = new Properties();
properties.put(DistributionConfig.MCAST_PORT_NAME, "0");
properties.put(DistributionConfig.LOCATORS_NAME, "");
background = Executors.newSingleThreadScheduledExecutor();
List/*<InetSocketAddress>*/ locators = new ArrayList();
locators.add(new InetSocketAddress(InetAddress.getLocalHost(), port));
source = new AutoConnectionSourceImpl(locators, "", 60 * 1000);
source.start(pool);
}
@After
public void tearDown() {
background.shutdownNow();
try {
cache.close();
} catch (Exception e) {
// do nothing
}
try {
if(server != null && server.isAlive()) {
try {
TcpClient.stop(InetAddress.getLocalHost(), port);
} catch ( ConnectException ignore ) {
// must not be running
}
server.join(60 * 1000);
}
} catch(Exception e) {
//do nothing
}
try {
InternalDistributedSystem.getAnyInstance().disconnect();
} catch(Exception e) {
//do nothing
}
}
@Test
public void testNoRespondingLocators() {
try {
source.findServer(null);
fail("Should have gotten a NoAvailableLocatorsException");
} catch(NoAvailableLocatorsException expected) {
//do nothing
}
}
@Test
public void testNoServers() throws Exception {
startFakeLocator();
handler.nextConnectionResponse = new ClientConnectionResponse(null);
assertEquals(null, source.findServer(null));
}
@Test
public void testDiscoverServers() throws Exception {
startFakeLocator();
ServerLocation loc1 = new ServerLocation("localhost", 4423);
handler.nextConnectionResponse = new ClientConnectionResponse(loc1);
assertEquals(loc1, source.findServer(null));
}
@Test
public void testDiscoverLocators() throws Exception {
startFakeLocator();
int secondPort = AvailablePortHelper.getRandomAvailableTCPPort();
TcpServer server2 = new TcpServer(secondPort, InetAddress.getLocalHost(), null, null, handler, new FakeHelper(), Thread.currentThread().getThreadGroup(), "tcp server");
server2.start();
try {
ArrayList locators = new ArrayList();
locators.add(new ServerLocation(InetAddress.getLocalHost().getHostName(), secondPort));
handler.nextLocatorListResponse = new LocatorListResponse(locators,false);
Thread.sleep(500);
try {
TcpClient.stop(InetAddress.getLocalHost(), port);
} catch ( ConnectException ignore ) {
// must not be running
}
server.join(1000);
ServerLocation server1 = new ServerLocation("localhost", 10);
handler.nextConnectionResponse = new ClientConnectionResponse(server1);
assertEquals(server1, source.findServer(null));
} finally {
try {
TcpClient.stop(InetAddress.getLocalHost(), secondPort);
} catch ( ConnectException ignore ) {
// must not be running
}
server.join(60 * 1000);
}
}
private void startFakeLocator() throws UnknownHostException, IOException, InterruptedException {
server = new TcpServer(port, InetAddress.getLocalHost(), null, null, handler, new FakeHelper(), Thread.currentThread().getThreadGroup(), "Tcp Server" );
server.start();
Thread.sleep(500);
}
protected static class FakeHandler implements TcpHandler {
protected volatile ClientConnectionResponse nextConnectionResponse;
protected volatile LocatorListResponse nextLocatorListResponse;;
public void init(TcpServer tcpServer) {
}
public Object processRequest(Object request) throws IOException {
if(request instanceof ClientConnectionRequest) {
return nextConnectionResponse;
}
else {
return nextLocatorListResponse;
}
}
public void shutDown() {
}
public void endRequest(Object request,long startTime) { }
public void endResponse(Object request,long startTime) { }
public void restarting(DistributedSystem ds, GemFireCache cache, SharedConfiguration sharedConfig) { }
}
public static class FakeHelper implements PoolStatHelper {
public void endJob() {
}
public void startJob() {
}
}
public class FakePool implements InternalPool {
public String getPoolOrCacheCancelInProgress() {return null; }
public EndpointManager getEndpointManager() {
return null;
}
public String getName() {
return null;
}
public PoolStats getStats() {
return poolStats;
}
public void destroy() {
}
public void detach() {
}
public void destroy(boolean keepAlive) {
}
public boolean isDurableClient() {
return false;
}
public boolean isDestroyed() {
return false;
}
public int getFreeConnectionTimeout() {return 0;}
public int getLoadConditioningInterval() {return 0;}
public int getSocketBufferSize() {return 0;}
public int getReadTimeout() {return 0;}
public int getConnectionsPerServer() {return 0;}
public boolean getThreadLocalConnections() {return false;}
public boolean getSubscriptionEnabled() {return false;}
public boolean getPRSingleHopEnabled() {return false;}
public int getSubscriptionRedundancy() {return 0;}
public int getSubscriptionMessageTrackingTimeout() {return 0;}
public String getServerGroup() {return "";}
public List/*<InetSocketAddress>*/ getLocators() {return new ArrayList();}
public List/*<InetSocketAddress>*/ getServers() {return new ArrayList();}
public void releaseThreadLocalConnection() {}
public ConnectionStats getStats(ServerLocation location) { return null; }
public boolean getMultiuserAuthentication() {return false;}
public long getIdleTimeout() {
return 0;
}
public int getMaxConnections() {
return 0;
}
public int getMinConnections() {
return 0;
}
public long getPingInterval() {
return 100;
}
public int getStatisticInterval() {
return -1;
}
public int getRetryAttempts() {
return 0;
}
public Object execute(Op op) {
return null;
}
public Object executeOn(ServerLocation server, Op op) {
return null;
}
public Object executeOn(ServerLocation server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
return null;
}
public Object executeOnPrimary(Op op) {
return null;
}
public Map getEndpointMap() {
return null;
}
public ScheduledExecutorService getBackgroundProcessor() {
return background;
}
public Object executeOn(Connection con, Op op) {
return null;
}
public Object executeOn(Connection con, Op op, boolean timeoutFatal) {
return null;
}
public RegisterInterestTracker getRITracker() {
return null;
}
public int getSubscriptionAckInterval() {
return 0;
}
public Object executeOnQueuesAndReturnPrimaryResult(Op op) {
return null;
}
public CancelCriterion getCancelCriterion() {
return new CancelCriterion() {
public String cancelInProgress() {
return null;
}
public RuntimeException generateCancelledException(Throwable e) {
return null;
}
};
}
public void executeOnAllQueueServers(Op op)
throws NoSubscriptionServersAvailableException, SubscriptionNotEnabledException {
}
public Object execute(Op op, int retryAttempts) {
return null;
}
public QueryService getQueryService() {
return null;
}
public int getPendingEventCount() {
return 0;
}
public RegionService createAuthenticatedCacheView(Properties properties){
return null;
}
public void setupServerAffinity(boolean allowFailover) {
}
public void releaseServerAffinity() {
}
public ServerLocation getServerAffinityLocation() {
return null;
}
public void setServerAffinityLocation(ServerLocation serverLocation) {
}
}
}