| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.ha; |
| |
| import java.net.SocketException; |
| import java.util.Properties; |
| import com.gemstone.gemfire.LogWriter; |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.EntryEvent; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.client.PoolManager; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl; |
| import com.gemstone.gemfire.cache.util.BridgeServer; |
| import com.gemstone.gemfire.cache.util.CacheListenerAdapter; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.cache.BridgeObserverAdapter; |
| import com.gemstone.gemfire.internal.cache.BridgeObserverHolder; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil; |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.VM; |
| |
| public class HASlowReceiverDUnitTest extends DistributedTestCase { |
| protected static Cache cache = null; |
| |
| private static VM serverVM1 = null; |
| |
| protected static VM serverVM2 = null; |
| |
| protected static VM clientVM = null; |
| |
| private static int PORT0; |
| |
| private static int PORT1; |
| |
| private static int PORT2; |
| |
| private static final String regionName = "HASlowReceiverDUnitTest"; |
| |
| protected static LogWriter logger = null; |
| |
| static PoolImpl pool = null; |
| |
| private static boolean isUnresponsiveClientRemoved = false; |
| |
| public HASlowReceiverDUnitTest(String name) { |
| super(name); |
| } |
| |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| |
| final Host host = Host.getHost(0); |
| serverVM1 = host.getVM(1); |
| serverVM2 = host.getVM(2); |
| clientVM = host.getVM(3); |
| |
| PORT0 = createServerCache().intValue(); |
| PORT1 = ((Integer)serverVM1.invoke(HASlowReceiverDUnitTest.class, |
| "createServerCache")).intValue(); |
| PORT2 = ((Integer)serverVM2.invoke(HASlowReceiverDUnitTest.class, |
| "createServerCache")).intValue(); |
| |
| } |
| |
| @Override |
| public void tearDown2() throws Exception { |
| super.tearDown2(); |
| clientVM.invoke(HASlowReceiverDUnitTest.class, "closeCache"); |
| |
| // then close the servers |
| closeCache(); |
| serverVM1.invoke(HASlowReceiverDUnitTest.class, "closeCache"); |
| serverVM2.invoke(HASlowReceiverDUnitTest.class, "closeCache"); |
| disconnectAllFromDS(); |
| } |
| |
| private void createCache(Properties props) throws Exception { |
| DistributedSystem ds = getSystem(props); |
| ds.disconnect(); |
| ds = getSystem(props); |
| assertNotNull(ds); |
| cache = CacheFactory.create(ds); |
| assertNotNull(cache); |
| } |
| |
| public static Integer createServerCache() throws Exception { |
| return createServerCache(null); |
| } |
| |
| public static Integer createServerCache(String ePolicy) throws Exception { |
| return createServerCache(ePolicy, new Integer(1)); |
| } |
| |
| public static Integer createServerCache(String ePolicy, Integer cap) |
| throws Exception { |
| |
| Properties prop = new Properties(); |
| prop.setProperty(DistributionConfig.REMOVE_UNRESPONSIVE_CLIENT_PROP_NAME, |
| "true"); |
| new HASlowReceiverDUnitTest("temp").createCache(prop); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| RegionAttributes attrs = factory.create(); |
| cache.createRegion(regionName, attrs); |
| logger = cache.getLogger(); |
| |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| BridgeServer server1 = cache.addBridgeServer(); |
| server1.setPort(port); |
| server1.setNotifyBySubscription(true); |
| server1.setMaximumMessageCount(200); |
| if (ePolicy != null) { |
| server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy); |
| server1.getClientSubscriptionConfig().setCapacity(cap.intValue()); |
| } |
| server1.start(); |
| return new Integer(server1.getPort()); |
| } |
| |
| public static void createClientCache(String host, Integer port1, |
| Integer port2, Integer port3, Integer rLevel, Boolean addListener) |
| throws Exception { |
| CacheServerTestUtil.disableShufflingOfEndpoints(); |
| |
| Properties props = new Properties(); |
| props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); |
| props.setProperty(DistributionConfig.LOCATORS_NAME, ""); |
| new HASlowReceiverDUnitTest("temp").createCache(props); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| PoolImpl p = (PoolImpl)PoolManager.createFactory().addServer("localhost", |
| port1).addServer("localhost", port2).addServer("localhost", port3) |
| .setSubscriptionEnabled(true).setSubscriptionRedundancy( |
| rLevel.intValue()).setThreadLocalConnections(true) |
| .setMinConnections(6).setReadTimeout(20000).setPingInterval(1000) |
| .setRetryAttempts(5).create("HASlowRecieverDUnitTestPool"); |
| |
| factory.setScope(Scope.LOCAL); |
| factory.setPoolName(p.getName()); |
| if (addListener.booleanValue()) { |
| factory.addCacheListener(new CacheListenerAdapter() { |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| |
| if (event.getNewValue().equals("v20")) { |
| try { |
| Thread.sleep(120000); |
| } |
| catch (InterruptedException e) { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } |
| } |
| } |
| }); |
| } |
| RegionAttributes attrs = factory.create(); |
| cache.createRegion(regionName, attrs); |
| pool = p; |
| } |
| |
| public static void createClientCache(String host, Integer port1, |
| Integer port2, Integer port3, Integer rLevel) throws Exception { |
| createClientCache(host, port1, port2, port3, rLevel, Boolean.TRUE); |
| } |
| |
| public static void registerInterest() { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| r.registerInterest("ALL_KEYS"); |
| } |
| catch (Exception ex) { |
| fail("failed in registerInterestListAll", ex); |
| } |
| } |
| |
| public static void putEntries() { |
| try { |
| |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| for (long i = 0; i < 300; i++) { |
| r.put("k" + (i % 10), "v" + i); |
| r.put("k" + (i % 10), new byte[1000]); |
| } |
| } |
| catch (Exception ex) { |
| fail("failed in putEntries()", ex); |
| } |
| } |
| |
| public static void createEntries(Long num) { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| for (long i = 0; i < num.longValue(); i++) { |
| r.create("k" + i, "v" + i); |
| } |
| } |
| catch (Exception ex) { |
| fail("failed in createEntries(Long)", ex); |
| } |
| } |
| |
| public static void checkRedundancyLevel(final Integer redundantServers) { |
| WaitCriterion wc = new WaitCriterion() { |
| public boolean done() { |
| return pool.getRedundantNames().size() == redundantServers.intValue(); |
| } |
| |
| public String description() { |
| return "Expected redundant count (" + pool.getRedundantNames().size() |
| + ") to become " + redundantServers.intValue(); |
| } |
| }; |
| DistributedTestCase.waitForCriterion(wc, 200 * 1000, 1000, true); |
| } |
| |
| // Test slow client |
| public void testSlowClient() throws Exception { |
| setBridgeObeserverForAfterQueueDestroyMessage(); |
| clientVM.invoke(HASlowReceiverDUnitTest.class, "createClientCache", |
| new Object[] { getServerHostName(Host.getHost(0)), new Integer(PORT0), |
| new Integer(PORT1), new Integer(PORT2), new Integer(2) }); |
| clientVM.invoke(HASlowReceiverDUnitTest.class, "registerInterest"); |
| // add expected socket exception string |
| final ExpectedException ex1 = addExpectedException(SocketException.class |
| .getName()); |
| final ExpectedException ex2 = addExpectedException(InterruptedException.class |
| .getName()); |
| putEntries(); |
| Thread.sleep(20000);// wait for put to block and allow server to remove |
| // client queue |
| clientVM.invoke(HASlowReceiverDUnitTest.class, "checkRedundancyLevel", |
| new Object[] { new Integer(2) }); |
| // check for slow client queue is removed or not. |
| assertTrue("isUnresponsiveClientRemoved is false, but should be true " |
| + "after 20 seconds", isUnresponsiveClientRemoved); |
| ex1.remove(); |
| ex2.remove(); |
| } |
| |
| public static void setBridgeObeserverForAfterQueueDestroyMessage() |
| throws Exception { |
| PoolImpl.AFTER_QUEUE_DESTROY_MESSAGE_FLAG = true; |
| BridgeObserverHolder.setInstance(new BridgeObserverAdapter() { |
| @Override |
| public void afterQueueDestroyMessage() { |
| clientVM.invoke(HASlowReceiverDUnitTest.class, "checkRedundancyLevel", |
| new Object[] { new Integer(0) }); |
| isUnresponsiveClientRemoved = true; |
| PoolImpl.AFTER_QUEUE_DESTROY_MESSAGE_FLAG = false; |
| } |
| }); |
| } |
| |
| public static void closeCache() { |
| if (cache != null && !cache.isClosed()) { |
| cache.close(); |
| cache.getDistributedSystem().disconnect(); |
| } |
| } |
| |
| } |