blob: ae2519cbfa2edf7791474f457d76067f133bbf3d [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.internal.cache.ha;
import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.BridgeTestCase;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeServerImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.ha.HAContainerMap;
import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage;
import com.gemstone.gemfire.internal.cache.tier.sockets.ConflationDUnitTest;
import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import util.TestException;
/**
* This DUnit contains various tests to ensure new implementation of ha region
* queues works as expected.
*
* @author ashetkar
* @since 5.7
*
*/
public class HARQueueNewImplDUnitTest extends DistributedTestCase {
protected static Cache cache = null;
protected static VM serverVM0 = null;
private static VM serverVM1 = null;
protected static VM clientVM1 = null;
protected static VM clientVM2 = null;
private static int PORT1;
private static int PORT2;
private static final String regionName = "HARQueueNewImplDUnitTest";
private static final Map map = new HashMap();
protected static LogWriter logger = null;
protected static int numOfCreates = 0;
protected static int numOfUpdates = 0;
protected static int numOfInvalidates = 0;
private static Object[] deletedValues = null;
/**
* @param name
* name of the test
*/
public HARQueueNewImplDUnitTest(String name) {
super(name);
}
/**
* Sets up the test.
*/
public void setUp() throws Exception {
super.setUp();
final Host host = Host.getHost(0);
serverVM0 = host.getVM(0);
serverVM1 = host.getVM(1);
clientVM1 = host.getVM(2);
clientVM2 = host.getVM(3);
PORT1 = ((Integer)serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
PORT2 = ((Integer)serverVM1.invoke(HARQueueNewImplDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_ENTRY })).intValue();
numOfCreates = 0;
numOfUpdates = 0;
numOfInvalidates = 0;
}
/**
* Tears down the test.
*/
public void tearDown2() throws Exception {
super.tearDown2();
closeCache();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
// Unset the isSlowStartForTesting flag
serverVM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
serverVM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
// then close the servers
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
disconnectAllFromDS();
}
private void createCache(Properties props) throws Exception {
props.setProperty(DistributionConfig.DELTA_PROPAGATION_PROP_NAME, "false");
DistributedSystem ds = getSystem(props);
ds.disconnect();
ds = getSystem(props);
assertNotNull(ds);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
public static Integer createServerCache() throws Exception {
return createServerCache(null);
}
public static Integer createServerCache(String ePolicy) throws Exception {
return createServerCache(ePolicy, new Integer(1));
}
public static Integer createServerCache(String ePolicy, Integer cap)
throws Exception {
new HARQueueNewImplDUnitTest("temp").createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attrs = factory.create();
cache.createRegion(regionName, attrs);
logger = cache.getLogger();
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server1 = cache.addBridgeServer();
server1.setPort(port);
server1.setNotifyBySubscription(true);
if (ePolicy != null) {
File overflowDirectory = new File("bsi_overflow_"+port);
overflowDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] {overflowDirectory};
server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy);
server1.getClientSubscriptionConfig().setCapacity(cap.intValue());
// specify diskstore for this server
server1.getClientSubscriptionConfig().setDiskStoreName(dsf.setDiskDirs(dirs1).create("bsi").getName());
}
server1.start();
return new Integer(server1.getPort());
}
public static Integer createOneMoreBridgeServer(Boolean notifyBySubscription)
throws Exception {
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server1 = cache.addBridgeServer();
server1.setPort(port);
server1.setNotifyBySubscription(notifyBySubscription.booleanValue());
server1.getClientSubscriptionConfig().setEvictionPolicy(
HARegionQueue.HA_EVICTION_POLICY_MEMORY);
// let this server to use default diskstore
server1.start();
return new Integer(server1.getPort());
}
public static void createClientCache(String host, Integer port1, Integer port2,
String rLevel, Boolean addListener) throws Exception {
CacheServerTestUtil.disableShufflingOfEndpoints();
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
new HARQueueNewImplDUnitTest("temp").createCache(props);
AttributesFactory factory = new AttributesFactory();
BridgeTestCase.configureConnectionPool(factory, host, port1
.intValue(), port2.intValue(), true, Integer.parseInt(rLevel), 2, null,
1000, 250, false);
factory.setScope(Scope.LOCAL);
if (addListener.booleanValue()) {
factory.addCacheListener(new CacheListenerAdapter() {
public void afterInvalidate(EntryEvent event) {
logger.fine("Invalidate Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">");
numOfInvalidates++;
}
public void afterCreate(EntryEvent event) {
logger.fine("Create Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">");
numOfCreates++;
}
public void afterUpdate(EntryEvent event) {
logger.fine("Update Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">");
numOfUpdates++;
}
});
}
RegionAttributes attrs = factory.create();
cache.createRegion(regionName, attrs);
logger = cache.getLogger();
}
public static void createClientCache(String host, Integer port1, Integer port2,
String rLevel) throws Exception {
createClientCache(host, port1, port2, rLevel, Boolean.FALSE);
}
public static void registerInterestListAll() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.registerInterest("ALL_KEYS");
}
catch (Exception ex) {
fail("failed in registerInterestListAll", ex);
}
}
public static void registerInterestList() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.registerInterest("k1");
r.registerInterest("k3");
r.registerInterest("k5");
}
catch (Exception ex) {
fail("failed while registering keys", ex);
}
}
public static void putEntries() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.put("k1", "pv1");
r.put("k2", "pv2");
r.put("k3", "pv3");
r.put("k4", "pv4");
r.put("k5", "pv5");
}
catch (Exception ex) {
fail("failed in putEntries()", ex);
}
}
public static void createEntries() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.create("k1", "v1");
r.create("k2", "v2");
r.create("k3", "v3");
r.create("k4", "v4");
r.create("k5", "v5");
}
catch (Exception ex) {
fail("failed in createEntries()", ex);
}
}
public static void createEntries(Long num) {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
for (long i = 0; i < num.longValue(); i++) {
r.create("k" + i, "v" + i);
}
}
catch (Exception ex) {
fail("failed in createEntries(Long)", ex);
}
}
public static void putHeavyEntries(Integer num) {
try {
byte[] val = null;
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
for (long i = 0; i < num.intValue(); i++) {
val = new byte[1024 * 1024 * 5]; // 5 MB
r.put("k0", val);
}
}
catch (Exception ex) {
fail("failed in putHeavyEntries(Long)", ex);
}
}
/**
* This test verifies that the client-messages-region does not store duplicate
* ClientUpdateMessageImpl instances, during a normal put path as well as the
* GII path.
*
* @throws Exception
*/
public void testClientMsgsRegionSize() throws Exception {
// slow start for dispatcher
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
serverVM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "startServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT2) });
}
/**
* This test verifies that the ha-region-queues increment the reference count
* of their respective HAEventWrapper instances in the client-messages-region
* correctly, during put as well as GII path.
*
* @throws Exception
*/
public void testRefCountForNormalAndGIIPut() throws Exception {
// slow start for dispatcher
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
serverVM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "startServer");
Thread.sleep(3000); // TODO: Find a better 'n reliable alternative
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "updateMapForVM0");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "updateMapForVM1");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyQueueData",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyQueueData",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT2) });
}
/**
* This test verifies that the ha-region-queues decrement the reference count
* of their respective HAEventWrapper instances in the client-messages-region
* correctly, after the events have been peeked and removed from the queue.
*
* @throws Exception
*/
public void testRefCountForPeekAndRemove() throws Exception {
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT1) });
serverVM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"waitTillMessagesAreDispatched", new Object[] { new Integer(PORT1),
new Long(5000) });
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(0), new Integer(PORT1) });
}
/**
* This test verifies that the processing of the QRM messages results in
* decrementing the reference count of corresponding HAEventWrapper instances,
* correctly.
*
* @throws Exception
*/
public void testRefCountForQRM() throws Exception {
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "startServer");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT2) });
serverVM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(0), new Integer(PORT2) });
}
/**
* This test verifies that the destruction of a ha-region (caused by
* proxy/client disconnect), causes the reference count of all HAEventWrapper
* instances belonging to the ha-region-queue to be decremented by one, and
* makes it visible to the client-messages-region.
*
* @throws Exception
*/
public void testRefCountForDestroy() throws Exception {
// slow start for dispatcher
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
serverVM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
// 1. stop the second server
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
// 3. start the second server.
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "startServer");
Thread.sleep(3000);
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
Thread.sleep(1000);
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "updateMap1");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "updateMap1");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyQueueData",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyQueueData",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT2) });
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "updateMap2");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "updateMap2");
Thread.sleep(1000);
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyQueueData",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyQueueData",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT2) });
}
/**
* Addresses the bug 39179. If a clientUpdateMessage is dispatched to the
* client while its GII was under way, then it should not be put into the
* HARegionQueue of a client at receiving server side.
*
* @throws Exception
*/
public void testConcurrentGIIAndDispatch() throws Exception {
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "40000" });
serverVM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "40000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestListAll");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestListAll");
// 1. stop the second server
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"makeValuesOfSomeKeysNullInClientMsgsRegion", new Object[] {
new Integer(PORT1), new String[] { "k1", "k3" } });
// 3. start the second server.
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "startServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(3), new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyNullValuesInCMR",
new Object[] { new Integer(3), new Integer(PORT2),
new String[] { "k1", "k3" } });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(3), new Integer(PORT2) });
serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"populateValuesOfSomeKeysInClientMsgsRegion", new Object[] {
new Integer(PORT1), new String[] { "k1", "k3" } });
serverVM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
serverVM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
}
/*
* public void PtestRefCountForConcurrentPutAndRemove() throws Exception { }
* public void PtestRefCountForConcurrentGIIPut() throws Exception { }
*/
/**
* This test verifies that when two BridgeServerImpl instances are created in
* a single VM, they do share the client-messages-region.
*
* @throws Exception
*/
public void testTwoBridgeServersInOneVMDoShareCMR() throws Exception {
// slow start for dispatcher
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
Integer port3 = (Integer)serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"createOneMoreBridgeServer", new Object[] { Boolean.TRUE });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), port3, "0");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(5), new Integer(PORT1) });
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyRegionSize",
new Object[] { new Integer(5), new Integer(5), port3 });
}
/**
* This test verifies that two clients, connected to two bridge servers with
* different notifyBySubscription values, on a single VM, receive
* updates/invalidates depending upon their notifyBySubscription value.
*
* @throws Exception
*/
public void testUpdatesWithTwoBridgeServersInOneVM() throws Exception {
Integer port3 = (Integer)serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"createOneMoreBridgeServer", new Object[] { Boolean.FALSE });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1", Boolean.TRUE);
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, port3, new Integer(PORT2), "1", Boolean.TRUE });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestListAll");
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "putEntries");
serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"waitTillMessagesAreDispatched", new Object[] { new Integer(PORT1),
new Long(5000) });
serverVM0
.invoke(HARQueueNewImplDUnitTest.class,
"waitTillMessagesAreDispatched", new Object[] { port3,
new Long(5000) });
// expect updates
verifyUpdatesReceived(new Integer(5), Boolean.TRUE, new Long(5000));
// expect invalidates
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyUpdatesReceived",
new Object[] { new Integer(5), Boolean.TRUE, new Long(5000) });
}
/**
* This test verifies that the HAEventWrapper instances present in the
* client-messages-region give up the references to their respective
* ClientUpdateMessageImpl instances.
*
* @throws Exception
*/
public void testHAEventWrapperDoesNotHoldCUMOnceInsideCMR() throws Exception {
// slow start for dispatcher
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopServer");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries",
new Object[] { new Long(1000) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "startServer");
Thread.sleep(2000);
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyNullCUMReference",
new Object[] { new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyNullCUMReference",
new Object[] { new Integer(PORT2) });
}
/**
* This test verifies that client-messages-regions are not created for the
* bridge servers who have eviction policy as 'none'. Instead, such bridge
* servers will have simple HashMap structures. Also, it verifies that such a
* structure (referred to as haContainer, in general) is destroyed when its
* bridge server is stopped.
*
* @throws Exception
*/
public void testCMRNotCreatedForNoneEvictionPolicy() throws Exception {
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
Thread.sleep(2000);
PORT1 = ((Integer)serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_NONE })).intValue();
PORT2 = ((Integer)serverVM1.invoke(HARQueueNewImplDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_NONE })).intValue();
Boolean isRegion = Boolean.FALSE;
// slow start for dispatcher
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyHaContainerType",
new Object[] { isRegion, new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyHaContainerType",
new Object[] { isRegion, new Integer(PORT2) });
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "stopOneBridgeServer",
new Object[] { new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopOneBridgeServer",
new Object[] { new Integer(PORT2) });
serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"verifyHaContainerDestroyed", new Object[] { isRegion,
new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class,
"verifyHaContainerDestroyed", new Object[] { isRegion,
new Integer(PORT2) });
}
/**
* This test verifies that client-messages-regions are created for the bridge
* servers who have eviction policy either as 'mem' or as 'entry'. Also, it
* verifies that such a client-messages-region is destroyed when its bridge
* server is stopped.
*
* @throws Exception
*/
public void testCMRCreatedForMemOrEntryEvictionPolicy() throws Exception {
Boolean isRegion = Boolean.TRUE;
// slow start for dispatcher
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "30000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM1.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "verifyHaContainerType",
new Object[] { isRegion, new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "verifyHaContainerType",
new Object[] { isRegion, new Integer(PORT2) });
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "stopOneBridgeServer",
new Object[] { new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "stopOneBridgeServer",
new Object[] { new Integer(PORT2) });
serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"verifyHaContainerDestroyed", new Object[] { isRegion,
new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class,
"verifyHaContainerDestroyed", new Object[] { isRegion,
new Integer(PORT2) });
}
/**
* This test verifies that the Cache.rootRegions() method does not return the
* client-messages-region of any of the cache's attached bridge servers.
*
* @throws Exception
*/
public void testCMRNotReturnedByRootRegionsMethod() throws Exception {
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestList");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "createEntries");
serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"verifyRootRegionsDoesNotReturnCMR",
new Object[] { new Integer(PORT1) });
serverVM1.invoke(HARQueueNewImplDUnitTest.class,
"verifyRootRegionsDoesNotReturnCMR",
new Object[] { new Integer(PORT2) });
}
/**
* This test verifies that the memory foorprint of the ha region queues is
* less when ha-overflow is enabled (with an appropriate value of haCapacity)
* compared to when it is disabled, for the same amount of data feed.
*
* @throws Exception
*/
public void _testMemoryFootprintOfHARegionQueuesWithAndWithoutOverflow()
throws Exception {
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
serverVM1.invoke(HARQueueNewImplDUnitTest.class, "closeCache");
Thread.sleep(2000);
Integer numOfEntries = new Integer(30);
PORT1 = ((Integer)serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_MEMORY, new Integer(30) }))
.intValue();
PORT2 = ((Integer)serverVM1.invoke(HARQueueNewImplDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_NONE })).intValue();
serverVM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
serverVM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
createClientCache(getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2), "1");
final String client1Host = getServerHostName(clientVM1.getHost());
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client1Host, new Integer(PORT1), new Integer(PORT2), "1" });
final String client2Host = getServerHostName(clientVM2.getHost());
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "createClientCache",
new Object[] { client2Host, new Integer(PORT1), new Integer(PORT2), "1" });
registerInterestListAll();
clientVM1.invoke(HARQueueNewImplDUnitTest.class, "registerInterestListAll");
clientVM2.invoke(HARQueueNewImplDUnitTest.class, "registerInterestListAll");
serverVM0.invoke(HARQueueNewImplDUnitTest.class, "putHeavyEntries",
new Object[] { numOfEntries });
Long usedMemInVM0 = (Long)serverVM0.invoke(HARQueueNewImplDUnitTest.class,
"getUsedMemoryAndVerifyRegionSize", new Object[] { new Integer(1),
numOfEntries, new Integer(PORT1) });
Long usedMemInVM1 = (Long)serverVM1.invoke(HARQueueNewImplDUnitTest.class,
"getUsedMemoryAndVerifyRegionSize", new Object[] { new Integer(1),
numOfEntries, new Integer(-1) });
serverVM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
serverVM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
logger
.fine("Used Mem: " + usedMemInVM1.longValue() + "(without overflow), "
+ usedMemInVM0.longValue() + "(with overflow)");
assertTrue(usedMemInVM0.longValue() < usedMemInVM1.longValue());
}
private static void verifyNullCUMReference(Integer port) {
Region r = cache.getRegion("/"
+ BridgeServerImpl.generateNameForClientMsgsRegion(port.intValue()));
assertNotNull(r);
Object[] arr = r.keySet().toArray();
for (int i = 0; i < arr.length; i++) {
assertNull(((HAEventWrapper)arr[i]).getClientUpdateMessage());
}
}
private static void verifyHaContainerDestroyed(Boolean isRegion, Integer port) {
Map r = cache.getRegion("/"
+ BridgeServerImpl.generateNameForClientMsgsRegion(port.intValue()));
if (isRegion.booleanValue()) {
if (r != null) {
assertTrue(((Region)r).isDestroyed());
}
}
else {
r = ((BridgeServerImpl)cache.getBridgeServers().toArray()[0])
.getAcceptor().getCacheClientNotifier().getHaContainer();
if (r != null) {
assertTrue(r.isEmpty());
}
}
}
static Long getUsedMemoryAndVerifyRegionSize(Integer rSize,
Integer haContainerSize, Integer port) {
Long retVal = null;
try {
retVal = new Long(Runtime.getRuntime().totalMemory()
- Runtime.getRuntime().freeMemory());
if (port.intValue() != -1) {
verifyRegionSize(rSize, haContainerSize, port);
}
else {
verifyRegionSize(rSize, haContainerSize);
}
}
catch (Exception e) {
fail("failed in getUsedMemory()" + e);
}
return retVal;
}
private static void setHACapacity(Integer cap) {
try {
Iterator iter = cache.getBridgeServers().iterator();
if (iter.hasNext()) {
BridgeServer server = (BridgeServer)iter.next();
server.getClientSubscriptionConfig().setCapacity(cap.intValue());
}
}
catch (Exception e) {
fail("failed in setHACapacity()" + e);
}
}
private static void stopOneBridgeServer(Integer port) {
try {
Iterator iter = cache.getBridgeServers().iterator();
if (iter.hasNext()) {
BridgeServer server = (BridgeServer)iter.next();
if (server.getPort() == port.intValue()) {
server.stop();
}
}
}
catch (Exception e) {
fail("failed in stopOneBridgeServer()" + e);
}
}
public static void stopServer() {
try {
Iterator iter = cache.getBridgeServers().iterator();
if (iter.hasNext()) {
BridgeServer server = (BridgeServer)iter.next();
server.stop();
}
}
catch (Exception e) {
fail("failed in stopServer()" + e);
}
}
public static void updateMapForVM0() {
try {
map.put("k1", new Long(3));
map.put("k2", new Long(1));
map.put("k3", new Long(3));
map.put("k4", new Long(1));
map.put("k5", new Long(3));
}
catch (Exception e) {
fail("failed in updateMapForVM0()" + e);
}
}
public static void updateMap1() {
try {
map.put("k1", new Long(2));
map.put("k2", new Long(1));
map.put("k3", new Long(2));
map.put("k4", new Long(1));
map.put("k5", new Long(2));
}
catch (Exception e) {
fail("failed in updateMap1()" + e);
}
}
public static void updateMap2() {
try {
map.put("k1", new Long(1));
map.put("k2", new Long(1));
map.put("k3", new Long(1));
map.put("k4", new Long(1));
map.put("k5", new Long(1));
}
catch (Exception e) {
fail("failed in updateMap2()" + e);
}
}
public static void updateMapForVM1() {
try {
updateMapForVM0();
}
catch (Exception e) {
fail("failed in updateMapForVM1()" + e);
}
}
public static void printMsg(String msg) {
try {
logger.fine(msg);
}
catch (Exception e) {
fail("failed in printMsg()" + e);
}
}
public static void haQueuePut() {
Set set = HARegionQueue.getDispatchedMessagesMapForTesting().keySet();
Iterator iter = set.iterator();
logger.fine("# of HAQueues: " + set.size());
while (iter.hasNext()) {
// HARegion haRegion = (HARegion)
cache.getRegion(Region.SEPARATOR
+ (String)iter.next());
// haRegion.getOwner().put();
}
}
public static void verifyNullValuesInCMR(final Integer numOfEntries,
final Integer port,
String[] keys) {
final Region msgsRegion = cache.getRegion(BridgeServerImpl
.generateNameForClientMsgsRegion(port.intValue()));
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
int sz = msgsRegion.size();
return sz == numOfEntries.intValue();
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
Set entries = msgsRegion.entrySet();
Iterator iter = entries.iterator();
for (; iter.hasNext();) {
Region.Entry entry = (Region.Entry)iter.next();
ClientUpdateMessage cum = (ClientUpdateMessage)entry.getValue();
for (int i = 0; i < keys.length; i++) {
logger.fine("cum.key: " + cum.getKeyToConflate());
// assert that the keys are not present in entries set
assertTrue(!keys[i].equals(cum.getKeyToConflate()));
}
}
}
public static void makeValuesOfSomeKeysNullInClientMsgsRegion(Integer port,
String[] keys) {
Region msgsRegion = cache.getRegion(BridgeServerImpl
.generateNameForClientMsgsRegion(port.intValue()));
assertNotNull(msgsRegion);
Set entries = msgsRegion.entrySet();
Iterator iter = entries.iterator();
deletedValues = new Object[keys.length];
while (iter.hasNext()) {
Region.Entry entry = (Region.Entry)iter.next();
ClientUpdateMessage cum = (ClientUpdateMessage)entry.getValue();
for (int i = 0; i < keys.length; i++) {
if (keys[i].equals(cum.getKeyToConflate())) {
logger.fine("HARQueueNewImplDUnit: Removing "
+ cum.getKeyOfInterest());
deletedValues[i] = msgsRegion.remove(entry.getKey());
}
}
}
}
public static void populateValuesOfSomeKeysInClientMsgsRegion(Integer port,
String[] keys) {
Region msgsRegion = cache.getRegion(BridgeServerImpl
.generateNameForClientMsgsRegion(port.intValue()));
assertNotNull(msgsRegion);
for (int i = 0; i < keys.length; i++) {
logger.fine("HARQueueNewImplDUnit: populating "
+ deletedValues[i]);
msgsRegion.put(keys[1], deletedValues[i]);
}
}
public static void startServer() {
try {
Iterator iter = cache.getBridgeServers().iterator();
if (iter.hasNext()) {
BridgeServer server = (BridgeServer)iter.next();
server.start();
}
}
catch (Exception e) {
fail("failed in startServer()" + e);
}
}
public static void verifyQueueData(Integer regionsize,
Integer msgsRegionsize, Integer port) {
try {
// Get the clientMessagesRegion and check the size.
Region msgsRegion = cache.getRegion(BridgeServerImpl
.generateNameForClientMsgsRegion(port.intValue()));
Region region = cache.getRegion("/" + regionName);
logger.fine("size<serverRegion, clientMsgsRegion>: " + region.size()
+ ", " + msgsRegion.size());
assertEquals(regionsize.intValue(), region.size());
assertEquals(msgsRegionsize.intValue(), msgsRegion.size());
Iterator iter = msgsRegion.entrySet().iterator();
while (iter.hasNext()) {
Region.Entry entry = (Region.Entry)iter.next();
HAEventWrapper wrapper = (HAEventWrapper)entry.getKey();
ClientUpdateMessage cum = (ClientUpdateMessage)entry.getValue();
Object key = cum.getKeyOfInterest();
logger.fine("key<feedCount, regionCount>: " + key + "<"
+ ((Long)map.get(key)).longValue() + ", "
+ wrapper.getReferenceCount() + ">");
assertEquals(((Long)map.get(key)).longValue(), wrapper.getReferenceCount());
}
}
catch (Exception e) {
fail("failed in verifyQueueData()" + e);
}
}
public static void verifyRegionSize(final Integer regionSize,
final Integer msgsRegionsize, final Integer port) {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
try {
// Get the clientMessagesRegion and check the size.
Region region = cache.getRegion("/" + regionName);
// logger.fine("size<serverRegion, clientMsgsRegion>: " + region.size()
// + ", " + msgsRegion.size());
int sz = region.size();
if (regionSize.intValue() != sz) {
excuse = "expected regionSize = " + regionSize + ", actual = " + sz;
return false;
}
Iterator iter = cache.getBridgeServers().iterator();
if (iter.hasNext()) {
BridgeServerImpl server = (BridgeServerImpl)iter.next();
Map msgsRegion = server.getAcceptor().getCacheClientNotifier()
.getHaContainer();
//Region msgsRegion = cache.getRegion(BridgeServerImpl
// .generateNameForClientMsgsRegion(port.intValue()));
sz = msgsRegion.size();
if (msgsRegionsize.intValue() != sz) {
excuse = "expected msgsRegionsize = " + msgsRegionsize + ", actual = " + sz;
return false;
}
}
return true;
}
catch (Exception e) {
excuse = "Caught exception " + e;
return false;
}
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 120 * 1000, 1000, true);
}
public static void verifyRegionSize(final Integer regionSize,
final Integer msgsRegionsize) {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
try {
// Get the clientMessagesRegion and check the size.
Region region = cache.getRegion("/" + regionName);
int sz = region.size();
if (regionSize.intValue() != sz) {
excuse = "Expected regionSize = " + regionSize.intValue() +
", actual = " + sz;
return false;
}
Iterator iter = cache.getBridgeServers().iterator();
if (!iter.hasNext()) {
return true;
}
BridgeServerImpl server = (BridgeServerImpl)iter.next();
sz = server.getAcceptor().getCacheClientNotifier().getHaContainer().size();
if (sz != msgsRegionsize.intValue()) {
excuse = "Expected msgsRegionsize = " + msgsRegionsize.intValue() +
", actual = " + sz;
return false;
}
return true;
} catch (Exception e) {
excuse = "failed due to " + e;
return false;
}
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 120 * 1000, 1000, true);
}
public static void verifyHaContainerType(Boolean isRegion, Integer port) {
try {
Map haMap = cache.getRegion(BridgeServerImpl
.generateNameForClientMsgsRegion(port.intValue()));
if (isRegion.booleanValue()) {
assertNotNull(haMap);
assertTrue(haMap instanceof LocalRegion);
haMap = ((BridgeServerImpl)cache.getBridgeServers().toArray()[0])
.getAcceptor().getCacheClientNotifier().getHaContainer();
assertNotNull(haMap);
assertTrue(haMap instanceof HAContainerRegion);
}
else {
assertNull(haMap);
haMap = ((BridgeServerImpl)cache.getBridgeServers().toArray()[0])
.getAcceptor().getCacheClientNotifier().getHaContainer();
assertNotNull(haMap);
assertTrue(haMap instanceof HAContainerMap);
}
logger.fine("haContainer: " + haMap);
}
catch (Exception e) {
fail("failed in verifyHaContainerType()" + e);
}
}
public static void verifyRootRegionsDoesNotReturnCMR(Integer port) {
try {
String cmrName = BridgeServerImpl.generateNameForClientMsgsRegion(port
.intValue());
Map haMap = cache.getRegion(cmrName);
assertNotNull(haMap);
String rName = "";
Iterator iter = cache.rootRegions().iterator();
while (iter.hasNext()) {
rName = ((Region)iter.next()).getName();
if (cmrName.equals(rName)) {
throw new TestException(
"Cache.rootRegions() method should not return the client_messages_region.");
}
logger.fine("Region name returned from cache.rootRegions(): " + rName);
}
}
catch (Exception e) {
fail("failed in verifyRootRegionsDoesNotReturnCMR()" + e);
}
}
public static void verifyUpdatesReceived(final Integer num, Boolean isUpdate,
Long waitLimit) {
try {
if (isUpdate.booleanValue()) {
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return num.intValue() == numOfUpdates;
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, waitLimit.longValue(), 200, true);
}
else {
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return num.intValue() == numOfInvalidates;
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, waitLimit.longValue(), 200, true);
}
}
catch (Exception e) {
fail("failed in verifyUpdatesReceived()" + e);
}
}
public static void waitTillMessagesAreDispatched(Integer port, Long waitLimit) {
try {
Map haContainer = null;
haContainer = cache.getRegion(Region.SEPARATOR
+ BridgeServerImpl.generateNameForClientMsgsRegion(port.intValue()));
if (haContainer == null) {
Object[] servers = cache.getBridgeServers().toArray();
for (int i = 0; i < servers.length; i++) {
if (port.intValue() == ((BridgeServerImpl)servers[i]).getPort()) {
haContainer = ((BridgeServerImpl)servers[i]).getAcceptor()
.getCacheClientNotifier().getHaContainer();
break;
}
}
}
final Map m = haContainer;
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return m.size() == 0;
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, waitLimit.longValue(), 200, true);
}
catch (Exception e) {
fail("failed in waitTillMessagesAreDispatched()" + e);
}
}
public static void closeCache() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
}