| /** |
| * |
| */ |
| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import java.util.Iterator; |
| import java.util.Properties; |
| |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionFactory; |
| import com.gemstone.gemfire.cache.RegionShortcut; |
| import com.gemstone.gemfire.cache.client.ClientCacheFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionFactory; |
| import com.gemstone.gemfire.cache.client.ClientRegionShortcut; |
| import com.gemstone.gemfire.cache.client.PoolFactory; |
| import com.gemstone.gemfire.cache.client.PoolManager; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.VM; |
| |
| /** |
| * @author ashetkar |
| * |
| */ |
| @SuppressWarnings("serial") |
| public class DurableClientQueueSizeDUnitTest extends DistributedTestCase { |
| |
| private static VM vm0 = null; |
| private static VM vm1 = null; |
| private static VM vm2 = null; |
| private static VM vm3 = null; |
| |
| private static GemFireCacheImpl cache; |
| |
| private static int mcastPort; |
| |
| private static int port0; |
| |
| private static int port1; |
| |
| private static final int EXCEPTION = -5; |
| |
| public static final String REGION_NAME = "DurableClientQueueSizeDunitTest_region"; |
| |
| public static final String NEW_REGION = "DurableClientQueueSizeDunitTest_region_2"; |
| |
| public static final String POOL_NAME = "my-pool"; |
| |
| public static final String DEFAULT_POOL_NAME = "DEFAULT"; |
| |
| /** |
| * @param name |
| */ |
| public DurableClientQueueSizeDUnitTest(String name) { |
| super(name); |
| } |
| |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| vm0 = Host.getHost(0).getVM(0); |
| vm1 = Host.getHost(0).getVM(1); |
| vm2 = Host.getHost(0).getVM(2); |
| vm3 = Host.getHost(0).getVM(3); |
| |
| mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS); |
| port0 = (Integer) vm0.invoke(DurableClientQueueSizeDUnitTest.class, |
| "createCacheServer", new Object[] { mcastPort }); |
| port1 = (Integer) vm1.invoke(DurableClientQueueSizeDUnitTest.class, |
| "createCacheServer", new Object[] { mcastPort }); |
| addExpectedException("java.net.SocketException"); |
| addExpectedException("Unexpected IOException"); |
| } |
| |
| public void tearDown2() throws Exception { |
| closeCache(); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache"); |
| vm3.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache"); |
| |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache"); |
| vm1.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache"); |
| } |
| |
| public void testNonDurableClientFails() throws Exception { |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }, false }); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { EXCEPTION }); |
| } |
| |
| // this test is disabled due to a high rate of failure. It fails with |
| // the queue size being 11 instead of 10 in the first verifyQueueSize check. |
| // See internal ticket #52227. |
| public void disabledtestSinglePoolClientReconnectsBeforeTimeOut() throws Exception { |
| int num = 10; |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }}); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE }); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "doRI"); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "readyForEvents"); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache", new Object[] {Boolean.TRUE}); |
| |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "doPuts", new Object[] {num}); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }}); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { num + 1 /* +1 for marker */}); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "readyForEvents"); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { EXCEPTION }); |
| |
| } |
| |
| public void testSinglePoolClientReconnectsAfterTimeOut() throws Exception { |
| int num = 10; |
| long timeoutSeconds = 10; |
| vm2.invoke( |
| DurableClientQueueSizeDUnitTest.class, |
| "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }, |
| String.valueOf(timeoutSeconds), true }); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "doRI"); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "readyForEvents"); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache", new Object[] {Boolean.TRUE}); |
| |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "doPuts", new Object[] {num}); |
| Thread.sleep(timeoutSeconds * 1000); // TODO use a waitCriterion |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }}); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { PoolImpl.PRIMARY_QUEUE_TIMED_OUT }); |
| } |
| |
| public void testPrimaryServerRebootReturnsCorrectResponse() throws Exception { |
| int num = 10; |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 } }); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "doRI"); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "readyForEvents"); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache", |
| new Object[] { Boolean.TRUE }); |
| |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "doPuts", |
| new Object[] { num }); |
| |
| // Identify primary and restart it |
| boolean isVM0Primary = (Boolean) vm0.invoke( |
| DurableClientQueueSizeDUnitTest.class, "isPrimary"); |
| int port = 0; |
| if (isVM0Primary) { |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache"); |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "createCacheServer", |
| new Object[] { mcastPort, port0 }); |
| port = port0; |
| } else { // vm1 is primary |
| vm1.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache"); |
| vm1.invoke(DurableClientQueueSizeDUnitTest.class, "createCacheServer", |
| new Object[] { mcastPort, port1 }); |
| port = port1; |
| } |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port } }); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE }); |
| } |
| |
| public void bug51854_testMultiPoolClientReconnectsBeforeTimeOut() throws Exception { |
| int num = 10; |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }, "300", |
| true/* durable */, true /* multiPool */}); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE, PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE }); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "doRI"); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "readyForEvents"); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache", new Object[] {Boolean.TRUE}); |
| |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "doPuts", new Object[] {num}); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }, "300", |
| true/* durable */, true /* multiPool */}); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { num + 1 /* +1 for marker */, (num * 2) + 1}); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "readyForEvents"); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { EXCEPTION, EXCEPTION }); |
| } |
| |
| public void bug51854_testMultiPoolClientReconnectsAfterTimeOut() throws Exception { |
| int num = 10; |
| long timeout = 10; |
| vm2.invoke( |
| DurableClientQueueSizeDUnitTest.class, |
| "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }, |
| String.valueOf(timeout), true/* durable */, true /* multiPool */}); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE, PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE }); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "doRI"); |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "readyForEvents"); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "closeCache", new Object[] {Boolean.TRUE}); |
| |
| vm0.invoke(DurableClientQueueSizeDUnitTest.class, "doPuts", new Object[] {num}); |
| // vm0.invoke(DurableClientQueueSizeDUnitTest.class, |
| // "verifyQueueSizeAtServer", new Object[] { DEFAULT_POOL_NAME, num + 1 }); |
| // vm0.invoke(DurableClientQueueSizeDUnitTest.class, |
| // "verifyQueueSizeAtServer", new Object[] { POOL_NAME, num * 2 + 1 }); |
| Thread.sleep(timeout * 1000); // TODO use a waitCriterion |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache", |
| new Object[] { vm2.getHost(), new Integer[] { port0, port1 }, "300", |
| true/* durable */, true /* multiPool */}); |
| |
| vm2.invoke(DurableClientQueueSizeDUnitTest.class, "verifyQueueSize", |
| new Object[] { PoolImpl.PRIMARY_QUEUE_TIMED_OUT, PoolImpl.PRIMARY_QUEUE_TIMED_OUT}); |
| } |
| |
| public void _testMultiPoolClientFailsOver() throws Exception { |
| } |
| |
| public static void closeCache() throws Exception { |
| closeCache(false); |
| } |
| |
| public static void closeCache(Boolean keepAlive) throws Exception { |
| setSpecialDurable(false); |
| if (cache != null) { |
| cache.close(keepAlive); |
| } |
| } |
| |
| public static Boolean isPrimary() throws Exception { |
| return CacheClientNotifier.getInstance().getClientProxies().iterator().next().isPrimary(); |
| } |
| |
| public static Integer createCacheServer(Integer mcastPort) throws Exception { |
| return createCacheServer(mcastPort, |
| AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET)); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static Integer createCacheServer(Integer mcastPort, Integer serverPort) |
| throws Exception { |
| Properties props = new Properties(); |
| props.setProperty("locators", ""); |
| props.setProperty("mcast-port", String.valueOf(mcastPort)); |
| // props.setProperty("log-level", "fine"); |
| // props.setProperty("log-file", "server_" + OSProcess.getId() + ".log"); |
| // props.setProperty("statistic-archive-file", "server_" + OSProcess.getId() |
| // + ".gfs"); |
| // props.setProperty("statistic-sampling-enabled", "true"); |
| |
| DurableClientQueueSizeDUnitTest test = new DurableClientQueueSizeDUnitTest( |
| "DurableClientQueueSizeDUnitTest"); |
| DistributedSystem ds = test.getSystem(props); |
| ds.disconnect(); |
| cache = (GemFireCacheImpl)CacheFactory.create(test.getSystem()); |
| // cache = (GemFireCacheImpl) new CacheFactory(props).create(); |
| |
| RegionFactory<String, String> rf = cache |
| .createRegionFactory(RegionShortcut.REPLICATE); |
| |
| rf.create(REGION_NAME); |
| rf.create(NEW_REGION); |
| |
| CacheServer server = cache.addCacheServer(); |
| server.setPort(serverPort); |
| server.start(); |
| return server.getPort(); |
| } |
| |
| public static void createClientCache(Host host, Integer[] ports) |
| throws Exception { |
| createClientCache(host, ports, "300", Boolean.TRUE); |
| } |
| |
| public static void createClientCache(Host host, Integer[] ports, Boolean durable) |
| throws Exception { |
| createClientCache(host, ports, "300", durable); |
| } |
| |
| public static void createClientCache(Host host, Integer[] ports, |
| String timeoutMilis, Boolean durable) throws Exception { |
| createClientCache(host, ports, timeoutMilis, durable, false); |
| } |
| |
| public static void setSpecialDurable(Boolean bool) { |
| System.setProperty("gemfire.SPECIAL_DURABLE", bool.toString()); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static void createClientCache(Host host, Integer[] ports, |
| String timeoutSeconds, Boolean durable, Boolean multiPool) throws Exception { |
| if (multiPool) { |
| System.setProperty("gemfire.SPECIAL_DURABLE", "true"); |
| } |
| Properties props = new Properties(); |
| if (durable) { |
| props.setProperty(DistributionConfig.DURABLE_CLIENT_ID_NAME, |
| "my-durable-client"); |
| props.setProperty(DistributionConfig.DURABLE_CLIENT_TIMEOUT_NAME, |
| timeoutSeconds); |
| } |
| // props.setProperty("log-file", "client_" + OSProcess.getId() + ".log"); |
| // props.setProperty("log-level", "fine"); |
| // props.setProperty("statistic-archive-file", "client_" + OSProcess.getId() |
| // + ".gfs"); |
| // props.setProperty("statistic-sampling-enabled", "true"); |
| |
| DistributedSystem ds = new DurableClientQueueSizeDUnitTest( |
| "DurableClientQueueSizeDUnitTest").getSystem(props); |
| ds.disconnect(); |
| ClientCacheFactory ccf = new ClientCacheFactory(props); |
| ccf.setPoolSubscriptionEnabled(true); |
| ccf.setPoolSubscriptionAckInterval(50); |
| ccf.setPoolSubscriptionRedundancy(1); |
| ccf.setPoolMaxConnections(1); |
| for (int port : ports) { |
| ccf.addPoolServer(host.getHostName(), port); |
| } |
| cache = (GemFireCacheImpl) ccf.create(); |
| |
| ClientRegionFactory<String, String> crf = cache |
| .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY); |
| |
| crf.setPoolName(cache.getDefaultPool().getName()); |
| crf.create(REGION_NAME); |
| |
| if (multiPool) { |
| String poolName = POOL_NAME; |
| PoolFactory pf = PoolManager.createFactory(); |
| for (int port : ports) { |
| pf.addServer(host.getHostName(), port); |
| } |
| pf.setSubscriptionEnabled(true); |
| pf.create(poolName); |
| crf.setPoolName(poolName); |
| crf.create(NEW_REGION); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static void doRI() { |
| cache.getRegion(REGION_NAME).registerInterest("ALL_KEYS", true); |
| if (cache.getRegion(NEW_REGION) != null) { |
| cache.getRegion(NEW_REGION).registerInterest("ALL_KEYS", true); |
| } |
| } |
| |
| public static void readyForEvents() { |
| cache.readyForEvents(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static void doPuts(Integer numOfPuts) throws Exception { |
| Region<String, String> region = cache.getRegion(REGION_NAME); |
| |
| for (int j = 0; j < numOfPuts; j++) { |
| region.put("KEY_" + j, "VALUE_" + j); |
| } |
| |
| region = cache.getRegion(NEW_REGION); |
| |
| for (int j = 0; j < (numOfPuts*2); j++) { |
| region.put("KEY_" + j, "VALUE_" + j); |
| } |
| } |
| |
| public static void verifyQueueSizeAtServer(String poolName, Integer num) throws Exception { |
| Iterator<CacheClientProxy> it = CacheClientNotifier.getInstance().getClientProxies().iterator(); |
| while (it.hasNext()) { |
| CacheClientProxy ccp = it.next(); |
| if (ccp.getDurableId().contains(poolName)) { |
| assertEquals(num.intValue(), ccp.getQueueSize()); |
| } |
| } |
| } |
| |
| public static void verifyQueueSize(Integer num) throws Exception { |
| verifyQueueSize(num, Integer.MIN_VALUE); |
| } |
| |
| public static void verifyQueueSize(Integer num1, Integer num2) throws Exception { |
| try { |
| assertEquals(num1.intValue(), |
| PoolManager.find(DEFAULT_POOL_NAME).getPendingEventCount()); |
| } catch (IllegalStateException ise) { |
| assertEquals(EXCEPTION, num1.intValue()); |
| } |
| if (num2 != Integer.MIN_VALUE) { |
| try { |
| assertEquals(num2.intValue(), |
| PoolManager.find(POOL_NAME).getPendingEventCount()); |
| } catch (IllegalStateException ise) { |
| assertEquals(EXCEPTION, num2.intValue()); |
| } |
| } |
| } |
| } |