blob: f599687a31d9435b00d762c7d1e007688c2e2854 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.internal.cache;
import java.io.File;
import java.util.Properties;
import com.gemstone.gemfire.DeltaTestImpl;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.ExpirationAttributes;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolFactory;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.BridgeTestCase;
import com.gemstone.gemfire.compression.Compressor;
import com.gemstone.gemfire.compression.SnappyCompressor;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.PartitionedRegionLocalMaxMemoryDUnitTest.TestObject1;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
import com.gemstone.gemfire.internal.cache.tier.sockets.ConflationDUnitTest;
import com.gemstone.gemfire.internal.tcp.ConnectionTable;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @since 6.1
*/
public class DeltaPropagationDUnitTest extends DistributedTestCase {
private final static Compressor compressor = SnappyCompressor.getDefaultInstance();
protected static Cache cache = null;
protected static Pool pool = null;
protected static VM VM0 = null;
protected static VM VM1 = null;
protected static VM VM2 = null;
protected static VM VM3 = null;
private static int PORT1;
private static int PORT2;
private static final String regionName = "DeltaPropagationDUnitTest";
private static LogWriter logger = null;
public static final int EVENTS_SIZE = 6;
private static boolean lastKeyReceived = false;
private static boolean markerReceived = false;
private static int numOfCreates;
private static int numOfUpdates;
private static int numOfInvalidates;
private static int numOfDestroys;
private static int numOfEvents;
private static DeltaTestImpl[] deltaPut = new DeltaTestImpl[EVENTS_SIZE];
private static boolean areListenerResultsValid = true;
private static boolean closeCache = false;
private static StringBuffer listenerError = new StringBuffer("");
public static String DELTA_KEY = "DELTA_KEY";
public static String LAST_KEY = "LAST_KEY";
public static final int NO_LISTENER = 0;
public static final int CLIENT_LISTENER = 1;
public static final int SERVER_LISTENER = 2;
public static final int C2S2S_SERVER_LISTENER = 3;
public static final int LAST_KEY_LISTENER = 4;
public static final int DURABLE_CLIENT_LISTENER = 5;
public static final int CLIENT_LISTENER_2 = 6;
public static final String CREATE = "CREATE";
public static final String UPDATE = "UPDATE";
public static final String INVALIDATE = "INVALIDATE";
public static final String DESTROY = "DESTROY";
/**
* @param name
*/
public DeltaPropagationDUnitTest(String name) {
super(name);
}
public void setUp() throws Exception {
super.setUp();
final Host host = Host.getHost(0);
VM0 = host.getVM(0);
VM1 = host.getVM(1);
VM2 = host.getVM(2);
VM3 = host.getVM(3);
VM0.invoke(DeltaPropagationDUnitTest.class, "resetAll");
VM1.invoke(DeltaPropagationDUnitTest.class, "resetAll");
VM2.invoke(DeltaPropagationDUnitTest.class, "resetAll");
VM3.invoke(DeltaPropagationDUnitTest.class, "resetAll");
DeltaPropagationDUnitTest.resetAll();
}
public void tearDown2() throws Exception {
super.tearDown2();
DeltaPropagationDUnitTest.closeCache();
VM2.invoke(DeltaPropagationDUnitTest.class, "closeCache");
VM3.invoke(DeltaPropagationDUnitTest.class, "closeCache");
// Unset the isSlowStartForTesting flag
VM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
VM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
// then close the servers
VM0.invoke(DeltaPropagationDUnitTest.class, "closeCache");
VM1.invoke(DeltaPropagationDUnitTest.class, "closeCache");
disconnectAllFromDS();
}
public void testS2CSuccessfulDeltaPropagationWithCompression() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1),
new Integer(NO_LISTENER), Boolean.FALSE, compressor })).intValue();
VM0.invoke(new SerializableRunnable() {
public void run() { assertTrue(cache.getRegion(regionName).getAttributes().getCompressor() != null); }
});
createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(
CLIENT_LISTENER));
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
waitForLastKey();
long toDeltas = ((Long)VM0.invoke(DeltaTestImpl.class, "getToDeltaInvokations"));
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltas, toDeltas == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were "
+ fromDeltas, fromDeltas == toDeltas);
verifyData(2, EVENTS_SIZE - 1);
assertTrue(listenerError.toString(), areListenerResultsValid);
}
public void testS2CSuccessfulDeltaPropagation() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(
CLIENT_LISTENER));
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
waitForLastKey();
long toDeltas = ((Long)VM0.invoke(DeltaTestImpl.class, "getToDeltaInvokations"));
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltas, toDeltas == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were "
+ fromDeltas, fromDeltas == toDeltas);
verifyData(2, EVENTS_SIZE - 1);
assertTrue(listenerError.toString(), areListenerResultsValid);
}
public void testS2CFailureInToDeltaMethod() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(
CLIENT_LISTENER_2));
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class,
"prepareErroneousDeltasForToDelta");
prepareErroneousDeltasForToDelta();
VM0.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
waitForLastKey();
long toDeltas = ((Long)VM0.invoke(DeltaTestImpl.class, "getToDeltaInvokations"));
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
long toDeltafailures = ((Long)VM0.invoke(DeltaTestImpl.class, "getToDeltaFailures"));
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltas, toDeltas == (EVENTS_SIZE - 1));
assertTrue(
(EVENTS_SIZE - 1 - 1/*
* This is because the one failed in toDelta will be
* sent as full value. So client will not see it as
* 'delta'.
*/)
+ " deltas were to be received but were " + fromDeltas,
fromDeltas == (EVENTS_SIZE - 1 - 1));
assertTrue(1 + " deltas were to be failed while extracting but were "
+ toDeltafailures, toDeltafailures == 1);
verifyData(2, EVENTS_SIZE - 1 - 1 /*Full value no more sent if toDelta() fails*/);
assertTrue(listenerError.toString(), areListenerResultsValid);
}
public void testS2CFailureInFromDeltaMethod() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(
CLIENT_LISTENER));
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class,
"prepareErroneousDeltasForFromDelta");
prepareErroneousDeltasForFromDelta();
VM0.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
waitForLastKey();
long toDeltas = ((Long)VM0.invoke(DeltaTestImpl.class, "getToDeltaInvokations"));
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations();
long fromDeltafailures = DeltaTestImpl.getFromDeltaFailures();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltas, toDeltas == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were "
+ fromDeltas, fromDeltas == toDeltas);
assertTrue(1 + " deltas were to be failed while applying but were "
+ fromDeltafailures, fromDeltafailures == 1);
verifyData(2, EVENTS_SIZE - 1);
assertTrue(listenerError.toString(), areListenerResultsValid);
}
public void testS2CWithOldValueAtClientOverflownToDisk() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
EvictionAttributes evAttr = EvictionAttributes.createLRUEntryAttributes(1,
EvictionAction.OVERFLOW_TO_DISK);
createClientCache(new Integer(PORT1), new Integer(-1), "0",
Boolean.TRUE/* add listener */, evAttr);
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "createDelta");
VM0.invoke(DeltaPropagationDUnitTest.class, "createAnEntry");
Thread.sleep(5000); // TODO: Find a better 'n reliable alternative
// assert overflow occured on client vm
verifyOverflowOccured(1L, 2);
VM0.invoke(DeltaPropagationDUnitTest.class, "updateDelta");
waitForLastKey();
long toDeltas = ((Long)VM0.invoke(DeltaTestImpl.class,
"getToDeltaInvokations")).longValue();
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations().longValue();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltas, toDeltas == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were "
+ fromDeltas, fromDeltas == (EVENTS_SIZE - 1));
verifyData(3, EVENTS_SIZE - 1);
assertTrue(listenerError.toString(), areListenerResultsValid);
}
public void testS2CWithLocallyDestroyedOldValueAtClient() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
EvictionAttributes evAttr = EvictionAttributes.createLRUEntryAttributes(1,
EvictionAction.LOCAL_DESTROY);
createClientCache(new Integer(PORT1), new Integer(-1), "0",
Boolean.TRUE/* add listener */, evAttr);
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "createDelta");
VM0.invoke(DeltaPropagationDUnitTest.class, "createAnEntry");
Thread.sleep(5000); // TODO: Find a better 'n reliable alternative
// assert overflow occured on client vm
verifyOverflowOccured(1L, 1);
VM0.invoke(DeltaPropagationDUnitTest.class, "updateDelta");
waitForLastKey();
long toDeltas = ((Long)VM0.invoke(DeltaTestImpl.class,
"getToDeltaInvokations")).longValue();
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations().longValue();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltas, toDeltas == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1 - 1/* destroyed */)
+ " deltas were to be received but were " + fromDeltas,
fromDeltas == (EVENTS_SIZE - 1 - 1));
verifyData(4, EVENTS_SIZE - 2);
}
public void testS2CWithInvalidatedOldValueAtClient() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(
CLIENT_LISTENER));
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "createDelta");
VM0.invoke(DeltaPropagationDUnitTest.class, "invalidateDelta");
VM0.invoke(DeltaPropagationDUnitTest.class, "updateDelta");
waitForLastKey();
long toDeltas = ((Long)VM0.invoke(DeltaTestImpl.class,
"getToDeltaInvokations")).longValue();
long fromDeltas = DeltaTestImpl.getFromDeltaInvokations().longValue();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltas, toDeltas == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1 - 1/* invalidated */)
+ " deltas were to be received but were " + fromDeltas,
fromDeltas == (EVENTS_SIZE - 1 - 1));
verifyData(2, EVENTS_SIZE - 1);
assertTrue(listenerError.toString(), areListenerResultsValid);
}
public void testS2CDeltaPropagationWithClientConflationON() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0",
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_ON, new Integer(
LAST_KEY_LISTENER), null, null);
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM0.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
waitForLastKey();
// TODO: (Amogh) get CCPStats and assert 0 deltas sent.
assertTrue("Delta Propagation feature used.", DeltaTestImpl
.getFromDeltaInvokations().longValue() == 0);
}
public void testS2CDeltaPropagationWithServerConflationON() throws Exception {
VM0.invoke(DeltaPropagationDUnitTest.class, "closeCache");
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_MEMORY, Integer.valueOf(1),
Integer.valueOf(NO_LISTENER), Boolean.TRUE /* conflate */, null}))
.intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0",
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, new Integer(
LAST_KEY_LISTENER), null, null);
VM3.invoke(DeltaPropagationDUnitTest.class, "createClientCache",
new Object[] { new Integer(PORT1), new Integer(-1), "0",
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_OFF,
new Integer(LAST_KEY_LISTENER), null, null });
registerInterestListAll();
VM3.invoke(DeltaPropagationDUnitTest.class, "registerInterestListAll");
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM0.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
waitForLastKey();
VM3.invoke(DeltaPropagationDUnitTest.class, "waitForLastKey");
// TODO: (Amogh) use CCPStats.
assertTrue("Delta Propagation feature used.", DeltaTestImpl
.getFromDeltaInvokations().longValue() == 0);
long fromDeltaInvocations = (Long)VM3.invoke(DeltaTestImpl.class, "getFromDeltaInvokations");
assertTrue("Expected " + (EVENTS_SIZE - 1) + " fromDelta() invocations but found " + "",
(fromDeltaInvocations == (EVENTS_SIZE - 1)));
}
public void testS2CDeltaPropagationWithOnlyCreateEvents() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(
LAST_KEY_LISTENER));
registerInterestListAll();
VM0.invoke(DeltaPropagationDUnitTest.class, "createDeltas");
waitForLastKey();
assertTrue("Delta Propagation feature used.", ((Long)VM0.invoke(
DeltaTestImpl.class, "getToDeltaInvokations")).longValue() == 0);
assertTrue("Delta Propagation feature used.", DeltaTestImpl
.getFromDeltaInvokations().longValue() == 0);
}
/**
* Tests that an update on a server with full Delta object causes distribution
* of the full Delta instance, and not its delta bits, to other peers, even if
* that instance's <code>hasDelta()</code> returns true.
*
* @throws Exception
*/
public void testC2S2SDeltaPropagation() throws Exception {
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM1.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
DeltaTestImpl val = deltaPut[1];
VM0.invoke(DeltaPropagationDUnitTest.class, "closeCache");
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_MEMORY, new Integer(1),
new Integer(C2S2S_SERVER_LISTENER) })).intValue();
PORT2 = ((Integer)VM1.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_MEMORY, new Integer(1),
new Integer(C2S2S_SERVER_LISTENER) })).intValue();
createClientCache(new Integer(PORT1), new Integer(-1), "0", new Integer(
NO_LISTENER));
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.create(DELTA_KEY, deltaPut[0]);
// Invalidate the value at both the servers.
VM0.invoke(DeltaPropagationDUnitTest.class, "doLocalOp", new Object[] {
INVALIDATE, regionName, DELTA_KEY });
VM1.invoke(DeltaPropagationDUnitTest.class, "doLocalOp", new Object[] {
INVALIDATE, regionName, DELTA_KEY });
VM0.invoke(DeltaPropagationDUnitTest.class, "assertOp", new Object[] {
INVALIDATE, new Integer(1) });
VM1.invoke(DeltaPropagationDUnitTest.class, "assertOp", new Object[] {
INVALIDATE, new Integer(1) });
r.put(DELTA_KEY, val);
Thread.sleep(5000);
// Assert that VM0 distributed val as full value to VM1.
VM1.invoke(DeltaPropagationDUnitTest.class, "assertValue", new Object[] {
regionName, DELTA_KEY, val });
assertTrue("Delta Propagation feature used.", !((Boolean)VM0.invoke(
DeltaTestImpl.class, "deltaFeatureUsed")).booleanValue());
assertTrue("Delta Propagation feature used.", !((Boolean)VM1.invoke(
DeltaTestImpl.class, "deltaFeatureUsed")).booleanValue());
assertTrue("Delta Propagation feature NOT used.", DeltaTestImpl
.deltaFeatureUsed());
}
public void testS2S2CDeltaPropagationWithHAOverflow() throws Exception {
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM1.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM0.invoke(DeltaPropagationDUnitTest.class, "closeCache");
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1) }))
.intValue();
PORT2 = ((Integer)VM1.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_ENTRY, new Integer(1) }))
.intValue();
VM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
VM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
createClientCache(new Integer(PORT2), new Integer(-1), "0", new Integer(
CLIENT_LISTENER));
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.registerInterest("ALL_KEYS");
VM0.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
VM1.invoke(DeltaPropagationDUnitTest.class, "confirmEviction",
new Object[] { new Integer(PORT2) });
VM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
waitForLastKey();
long toDeltasOnServer1 = ((Long)VM0.invoke(DeltaTestImpl.class,
"getToDeltaInvokations")).longValue();
long fromDeltasOnServer2 = ((Long)VM1.invoke(DeltaTestImpl.class,
"getFromDeltaInvokations")).longValue();
long toDeltasOnServer2 = ((Long)VM1.invoke(DeltaTestImpl.class,
"getToDeltaInvokations")).longValue();
long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations()
.longValue();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were "
+ toDeltasOnServer1, toDeltasOnServer1 == (EVENTS_SIZE - 1));
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were "
+ fromDeltasOnServer2, fromDeltasOnServer2 == (EVENTS_SIZE - 1));
assertTrue("0 toDelta() were to be invoked but were "
+ toDeltasOnServer2, toDeltasOnServer2 == 0);
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were "
+ fromDeltasOnClient, fromDeltasOnClient == (EVENTS_SIZE - 1));
}
public void testS2CDeltaPropagationWithGIIAndFailover() throws Exception {
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM1.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM2.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM0.invoke(DeltaPropagationDUnitTest.class, "closeCache");
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1),
new Integer(NO_LISTENER) })).intValue();
PORT2 = ((Integer)VM1.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1),
new Integer(NO_LISTENER) })).intValue();
int port3 = ((Integer)VM2.invoke(DeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {
HARegionQueue.HA_EVICTION_POLICY_NONE, new Integer(1),
new Integer(NO_LISTENER) })).intValue();
// Do puts after slowing the dispatcher.
try {
VM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
VM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
VM2.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
createClientCache(new int[] { PORT1, PORT2, port3 }, "1",
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, new Integer(
CLIENT_LISTENER), null, null);
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.registerInterest("ALL_KEYS");
VM primary = (((PoolImpl)pool).getPrimaryPort() == PORT1) ? VM0
: ((((PoolImpl)pool).getPrimaryPort() == PORT2) ? VM1 : VM2);
primary.invoke(DeltaPropagationDUnitTest.class, "createAndUpdateDeltas");
Thread.sleep(5000);
primary.invoke(DeltaPropagationDUnitTest.class, "closeCache");
Thread.sleep(5000);
primary = (((PoolImpl)pool).getPrimaryPort() == PORT1) ? VM0
: ((((PoolImpl)pool).getPrimaryPort() == PORT2) ? VM1 : VM2);
VM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
VM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
VM2.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
primary.invoke(DeltaPropagationDUnitTest.class, "closeCache");
Thread.sleep(5000);
primary = (((PoolImpl)pool).getPrimaryPort() == PORT1) ? VM0
: ((((PoolImpl)pool).getPrimaryPort() == PORT2) ? VM1 : VM2);
getLogWriter().info("waiting for client to receive last_key");
waitForLastKey();
long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations()
.longValue();
assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were "
+ fromDeltasOnClient, fromDeltasOnClient == (EVENTS_SIZE - 1));
}
finally {
VM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
VM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
VM2.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
}
}
public void testBug40165ClientReconnects() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
/**
* 1. Create a cache server with slow dispatcher
* 2. Start a durable client with a custom cache listener
* which shuts itself down as soon as it recieves a marker message.
* 3. Do some puts on the server region
* 4. Let the dispatcher start dispatching
* 5. Verify that durable client is disconnected as soon as it processes the marker.
* Server will retain its queue which has some events (containing deltas) in it.
* 6. Restart the durable client without the self-destructing listener.
* 7. Wait till the durable client processes all its events.
* 8. Verify that no deltas are received by it.
*/
// Step 0
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
// Step 1
try {
VM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
// Step 2
String durableClientId = getName() + "_client";
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", PORT1)
.setSubscriptionEnabled(true)
.setSubscriptionAckInterval(1);
((PoolFactoryImpl)pf).getPoolAttributes();
Properties properties = new Properties();
properties.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
properties.setProperty(DistributionConfig.LOCATORS_NAME, "");
properties.setProperty(DistributionConfig.DURABLE_CLIENT_ID_NAME, durableClientId);
properties.setProperty(DistributionConfig.DURABLE_CLIENT_TIMEOUT_NAME, String.valueOf(60));
createDurableCacheClient(((PoolFactoryImpl)pf).getPoolAttributes(),
regionName, properties, new Integer(DURABLE_CLIENT_LISTENER), Boolean.TRUE);
// Step 3
VM0.invoke(DeltaPropagationDUnitTest.class, "doPuts");
// Step 4
VM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
// Step 5
//verifyDurableClientDisconnected();
Thread.sleep(5000);
// Step 6
createDurableCacheClient(((PoolFactoryImpl)pf).getPoolAttributes(),
regionName, properties, new Integer(DURABLE_CLIENT_LISTENER), Boolean.FALSE);
// Step 7
waitForLastKey();
// Step 8
long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations()
.longValue();
assertTrue("No deltas were to be received but received: "
+ fromDeltasOnClient, fromDeltasOnClient < 1);
} finally {
// Step 4
VM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
}
}
public void testBug40165ClientFailsOver() throws Exception {
PORT1 = ((Integer)VM0.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
/**
* 1. Create two cache servers with slow dispatcher
* 2. Start a durable client with a custom cache listener
* 3. Do some puts on the server region
* 4. Let the dispatcher start dispatching
* 5. Wait till the durable client receives marker from its primary.
* 6. Kill the primary server, so that the second one becomes primary.
* 7. Wait till the durable client processes all its events.
* 8. Verify that expected number of deltas are received by it.
*/
// Step 0
prepareDeltas();
VM0.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
VM1.invoke(DeltaPropagationDUnitTest.class, "prepareDeltas");
try {
// Step 1
VM0.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
PORT2 = ((Integer)VM1.invoke(DeltaPropagationDUnitTest.class,
"createServerCache",
new Object[] { HARegionQueue.HA_EVICTION_POLICY_MEMORY })).intValue();
VM1.invoke(ConflationDUnitTest.class, "setIsSlowStart",
new Object[] { "60000" });
// Step 2
String durableClientId = getName() + "_client";
PoolFactory pf = PoolManager.createFactory();
pf.addServer("localhost", PORT1)
.addServer("localhost", PORT2)
.setSubscriptionEnabled(true)
.setSubscriptionAckInterval(1)
.setSubscriptionRedundancy(2);
((PoolFactoryImpl)pf).getPoolAttributes();
Properties properties = new Properties();
properties.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
properties.setProperty(DistributionConfig.LOCATORS_NAME, "");
properties.setProperty(DistributionConfig.DURABLE_CLIENT_ID_NAME, durableClientId);
properties.setProperty(DistributionConfig.DURABLE_CLIENT_TIMEOUT_NAME, String.valueOf(60));
createDurableCacheClient(((PoolFactoryImpl)pf).getPoolAttributes(),
regionName, properties, new Integer(DURABLE_CLIENT_LISTENER), Boolean.FALSE);
// Step 3
VM0.invoke(DeltaPropagationDUnitTest.class, "doPuts");
} finally {
// Step 4
VM0.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
VM1.invoke(ConflationDUnitTest.class, "unsetIsSlowStart");
}
// Step 5
VM pVM = (((PoolImpl)pool).getPrimaryPort() == PORT1) ? VM0 : VM1;
while (!markerReceived) {
Thread.sleep(50);
}
// Step 6
pVM.invoke(DeltaPropagationDUnitTest.class, "closeCache");
Thread.sleep(5000);
// Step 7
waitForLastKey();
// Step 8
long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations()
.longValue();
assertTrue("Atleast 99 deltas were to be received but received: "
+ fromDeltasOnClient, fromDeltasOnClient >= 99);
}
public static void doLocalOp(String op, String rName, String key) {
try {
Region r = cache.getRegion("/" + rName);
assertNotNull(r);
if (INVALIDATE.equals(op)) {
r.localInvalidate(key);
}
else if (DESTROY.equals(op)) {
r.localDestroy(key);
}
}
catch (Exception e) {
fail("failed in doLocalOp()", e);
}
}
public static void assertOp(String op, Integer num) {
final int expected = num.intValue();
WaitCriterion wc = null;
if (INVALIDATE.equals(op)) {
wc = new WaitCriterion() {
public boolean done() {
return numOfInvalidates == expected;
}
public String description() {
return "numOfInvalidates was expected to be " + expected + " but is "
+ numOfInvalidates;
}
};
}
else if (DESTROY.equals(op)) {
wc = new WaitCriterion() {
public boolean done() {
return numOfInvalidates == expected;
}
public String description() {
return "numOfDestroys was expected to be " + expected + " but is "
+ numOfDestroys;
}
};
}
DistributedTestCase.waitForCriterion(wc, 5 * 1000, 100, true);
}
public static void assertValue(String rName, String key, Object expected) {
try {
Region r = cache.getRegion("/" + rName);
assertNotNull(r);
Object value = r.getEntry(key).getValue();
assertTrue("Value against " + key + " is " + value + ". It should be "
+ expected, expected.equals(value));
}
catch (Exception e) {
fail("failed in assertValue()", e);
}
}
public static void confirmEviction(Integer port) {
final EnableLRU cc = ((VMLRURegionMap)((LocalRegion)cache
.getRegion(Region.SEPARATOR
+ BridgeServerImpl.generateNameForClientMsgsRegion(port))).entries)
._getCCHelper();
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return cc.getStats().getEvictions() > 0;
}
public String description() {
return "HA Overflow did not occure.";
}
};
DistributedTestCase.waitForCriterion(wc, 10 * 1000, 100, true);
}
public static void waitForLastKey() {
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return DeltaPropagationDUnitTest.isLastKeyReceived();
}
public String description() {
return "Last key NOT received.";
}
};
DistributedTestCase.waitForCriterion(wc, 10 * 1000, 100, true);
}
public static void prepareDeltas() {
for (int i = 0; i < EVENTS_SIZE; i++) {
deltaPut[i] = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObject1("0", 0));
}
deltaPut[1].setIntVar(5);
deltaPut[2].setIntVar(5);
deltaPut[3].setIntVar(5);
deltaPut[4].setIntVar(5);
deltaPut[5].setIntVar(5);
deltaPut[2].resetDeltaStatus();
deltaPut[2].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[3].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[4].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[5].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[3].resetDeltaStatus();
deltaPut[3].setDoubleVar(new Double(5));
deltaPut[4].setDoubleVar(new Double(5));
deltaPut[5].setDoubleVar(new Double(5));
deltaPut[4].resetDeltaStatus();
deltaPut[4].setStr("str changed");
deltaPut[5].setStr("str changed");
deltaPut[5].resetDeltaStatus();
deltaPut[5].setIntVar(100);
deltaPut[5].setTestObj(new TestObject1("CHANGED", 100));
}
public static void prepareErroneousDeltasForToDelta() {
for (int i = 0; i < EVENTS_SIZE; i++) {
deltaPut[i] = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObject1("0", 0));
}
deltaPut[1].setIntVar(5);
deltaPut[2].setIntVar(5);
deltaPut[3].setIntVar(DeltaTestImpl.ERRONEOUS_INT_FOR_TO_DELTA);
deltaPut[4].setIntVar(5);
deltaPut[5].setIntVar(5);
deltaPut[2].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[3].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[4].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[5].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[3].setDoubleVar(new Double(5));
deltaPut[4].setDoubleVar(new Double(5));
deltaPut[5].setDoubleVar(new Double(5));
deltaPut[4].setStr("str changed");
deltaPut[5].setStr("str changed");
deltaPut[5].setIntVar(100);
deltaPut[5].setTestObj(new TestObject1("CHANGED", 100));
}
public static void prepareErroneousDeltasForFromDelta() {
for (int i = 0; i < EVENTS_SIZE; i++) {
deltaPut[i] = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObject1("0", 0));
}
deltaPut[1].setIntVar(5);
deltaPut[2].setIntVar(5);
deltaPut[3].setIntVar(5);
deltaPut[4].setIntVar(5);
deltaPut[5].setIntVar(5);
deltaPut[2].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[3].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[4].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[5].setByteArr(new byte[] { 1, 2, 3, 4, 5 });
deltaPut[3].setDoubleVar(new Double(5));
deltaPut[4].setDoubleVar(new Double(5));
deltaPut[5].setDoubleVar(new Double(5));
deltaPut[4].setStr("str changed");
deltaPut[5].setStr(DeltaTestImpl.ERRONEOUS_STRING_FOR_FROM_DELTA);
deltaPut[5].setIntVar(100);
deltaPut[5].setTestObj(new TestObject1("CHANGED", 100));
}
public static void doPuts() {
doPuts(100);
}
public static void doPuts(Integer num) {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
for (int i = 0; i < num; i++) {
DeltaTestImpl val = new DeltaTestImpl();
val.setStr("" + i);
r.put(DELTA_KEY, val);
}
r.put(LAST_KEY, "");
}
catch (Exception ex) {
fail("failed in createDelta()", ex);
}
}
public static void createAndUpdateDeltas() {
createDelta();
updateDelta();
}
public static void createDelta() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.create(DELTA_KEY, deltaPut[0]);
}
catch (Exception ex) {
fail("failed in createDelta()", ex);
}
}
public static void updateDelta() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
for (int i = 1; i < EVENTS_SIZE; i++) {
try {
r.put(DELTA_KEY, deltaPut[i]);
}
catch (InvalidDeltaException ide) {
assertTrue(
"InvalidDeltaException not expected for deltaPut[" + i + "]",
deltaPut[i].getIntVar() == DeltaTestImpl.ERRONEOUS_INT_FOR_TO_DELTA);
}
}
r.put(LAST_KEY, "");
}
catch (Exception ex) {
fail("failed in updateDelta()", ex);
}
}
public static void createDeltas() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
for (int i = 0; i < 100; i++) {
r.create(DELTA_KEY + i, new DeltaTestImpl());
}
r.create(LAST_KEY, "");
}
catch (Exception ex) {
fail("failed in createDeltas()", ex);
}
}
public static void createAnEntry() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.create("KEY-A", "I push the delta out to disk :)");
}
catch (Exception ex) {
fail("failed in createAnEntry()", ex);
}
}
public static void invalidateDelta() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.invalidate(DELTA_KEY);
}
catch (Exception ex) {
fail("failed in invalidateDelta()", ex);
}
}
public static void verifyOverflowOccured(long evictions, int regionsize) {
EnableLRU cc = ((VMLRURegionMap)((LocalRegion)cache.getRegion(regionName)).entries)
._getCCHelper();
Assert.assertTrue(cc.getStats().getEvictions() == evictions,
"Number of evictions expected to be " + evictions + " but was "
+ cc.getStats().getEvictions());
int rSize = ((LocalRegion)cache.getRegion(regionName)).getRegionMap()
.size();
Assert.assertTrue(rSize == regionsize, "Region size expected to be "
+ regionsize + " but was " + rSize);
}
public static void verifyData(int creates, int updates) {
assertEquals(creates, numOfCreates);
assertEquals(updates, numOfUpdates);
}
public static Integer createServerCache(String ePolicy) throws Exception {
return createServerCache(ePolicy, Integer.valueOf(1));
}
public static Integer createServerCache(String ePolicy, Integer cap)
throws Exception {
return createServerCache(ePolicy, cap, new Integer(NO_LISTENER));
}
public static Integer createServerCache(String ePolicy, Integer cap,
Integer listenerCode) throws Exception {
return createServerCache(ePolicy, cap, listenerCode, Boolean.FALSE, null);
}
public static Integer createServerCache(String ePolicy, Integer cap,
Integer listenerCode, Boolean conflate, Compressor compressor) throws Exception {
ConnectionTable.threadWantsSharedResources();
new DeltaPropagationDUnitTest("temp").createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setEnableSubscriptionConflation(conflate);
if (listenerCode.intValue() != 0) {
factory.addCacheListener(getCacheListener(listenerCode));
}
if (compressor != null) {
factory.setCompressor(compressor);
}
if (listenerCode.intValue() == C2S2S_SERVER_LISTENER) {
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setDataPolicy(DataPolicy.NORMAL);
factory.setConcurrencyChecksEnabled(false);
RegionAttributes attrs = factory.create();
Region r = cache.createRegion(regionName, attrs);
logger = cache.getLogger();
r.create(DELTA_KEY, deltaPut[0]);
}
else {
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
RegionAttributes attrs = factory.create();
cache.createRegion(regionName, attrs);
logger = cache.getLogger();
}
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer server1 = cache.addCacheServer();
server1.setPort(port);
server1.setNotifyBySubscription(true);
if (ePolicy != null) {
File overflowDirectory = new File("bsi_overflow_"+port);
overflowDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] {overflowDirectory};
server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy);
server1.getClientSubscriptionConfig().setCapacity(cap.intValue());
// specify diskstore for this server
server1.getClientSubscriptionConfig().setDiskStoreName(dsf.setDiskDirs(dirs1).create("bsi").getName());
}
server1.start();
return new Integer(server1.getPort());
}
public static CacheListener getCacheListener(Integer code) {
CacheListener listener = null;
switch (code.intValue()) {
case 0:
break;
case SERVER_LISTENER:
//listener = new CacheListenerAdapter() {};
break;
case CLIENT_LISTENER:
listener = new CacheListenerAdapter() {
public void afterCreate(EntryEvent event) {
numOfCreates++;
logger.fine("Create Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">");
if (DELTA_KEY.equals(event.getKey())
&& !deltaPut[0].equals(event.getNewValue())) {
areListenerResultsValid = false;
listenerError.append("Create event:\n |-> sent: " + deltaPut[0]
+ "\n |-> rcvd: " + event.getNewValue() + "\n");
}
else if (LAST_KEY.equals(event.getKey())) {
lastKeyReceived = true;
}
}
public void afterUpdate(EntryEvent event) {
numOfUpdates++;
logger
.fine("Update Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">" + ", numOfUpdates: "
+ numOfUpdates);
if (!deltaPut[numOfUpdates].equals(event.getNewValue())) {
areListenerResultsValid = false;
listenerError.append("\nUpdate event(" + numOfUpdates
+ "):\n |-> sent: " + deltaPut[numOfUpdates]
+ "\n |-> recd: " + event.getNewValue());
}
}
};
break;
case CLIENT_LISTENER_2:
listener = new CacheListenerAdapter() {
public void afterCreate(EntryEvent event) {
numOfCreates++;
logger.fine("Create Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">");
if (DELTA_KEY.equals(event.getKey())
&& !deltaPut[0].equals(event.getNewValue())) {
areListenerResultsValid = false;
listenerError.append("Create event:\n |-> sent: " + deltaPut[0]
+ "\n |-> rcvd: " + event.getNewValue() + "\n");
}
else if (LAST_KEY.equals(event.getKey())) {
lastKeyReceived = true;
}
}
public void afterUpdate(EntryEvent event) {
int tmp = ++numOfUpdates;
logger
.fine("Update Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">" + ", numOfUpdates: "
+ numOfUpdates);
// Hack to ignore illegal delta put
tmp = (tmp >= 3) ? ++tmp : tmp;
if (!deltaPut[tmp].equals(event.getNewValue())) {
areListenerResultsValid = false;
listenerError.append("\nUpdate event(" + numOfUpdates
+ "):\n |-> sent: " + deltaPut[tmp]
+ "\n |-> recd: " + event.getNewValue());
}
}
};
break;
case C2S2S_SERVER_LISTENER:
listener = new CacheListenerAdapter() {
public void afterCreate(EntryEvent event) {
numOfCreates++;
logger.fine("Create Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">");
if (LAST_KEY.equals(event.getKey())) {
lastKeyReceived = true;
}
}
public void afterUpdate(EntryEvent event) {
numOfUpdates++;
logger
.fine("Update Event: <" + event.getKey() + ", "
+ event.getNewValue() + ">" + ", numOfUpdates: "
+ numOfUpdates);
}
public void afterInvalidate(EntryEvent event) {
numOfInvalidates++;
logger.fine("Invalidate Event: <" + event.getKey() + ", "
+ event.getOldValue() + ">" + ", numOfInvalidates: "
+ numOfInvalidates);
}
public void afterDestroy(EntryEvent event) {
numOfDestroys++;
logger.fine("Destroy Event: <" + event.getKey() + ", "
+ event.getOldValue() + ">" + ", numOfDestroys: "
+ numOfDestroys);
}
};
break;
case LAST_KEY_LISTENER:
listener = new CacheListenerAdapter() {
public void afterCreate(EntryEvent event) {
if (LAST_KEY.equals(event.getKey())) {
lastKeyReceived = true;
}
}
};
break;
case DURABLE_CLIENT_LISTENER:
listener = new CacheListenerAdapter() {
public void afterRegionLive(RegionEvent event) {
logger.fine("Marker received");
if (Operation.MARKER == event.getOperation()) {
markerReceived = true;
if (closeCache) {
logger.fine("Closing the durable client cache...");
closeCache(true); // keepAlive
}
}
}
public void afterCreate(EntryEvent event) {
logger.fine("CREATE received");
if (LAST_KEY.equals(event.getKey())) {
logger.fine("LAST KEY received");
lastKeyReceived = true;
}
}
public void afterUpdate(EntryEvent event) {
assertNotNull(event.getNewValue());
}
};
break;
default:
fail("Invalid listener code");
break;
}
return listener;
}
public static void createClientCache(Integer port1, Integer port2,
String rLevel) throws Exception {
createClientCache(port1, port2, rLevel,
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, new Integer(
CLIENT_LISTENER), null, null);
}
public static void createClientCache(Integer port1, Integer port2,
String rLevel, Boolean addListener, EvictionAttributes evictAttrs)
throws Exception {
createClientCache(port1, port2, rLevel,
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, new Integer(
CLIENT_LISTENER), evictAttrs, null);
}
public static void createClientCache(Integer port1, Integer port2,
String rLevel, Boolean addListener, ExpirationAttributes expAttrs)
throws Exception {
createClientCache(port1, port2, rLevel,
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, new Integer(
CLIENT_LISTENER), null, expAttrs);
}
public static void createClientCache(Integer port1, Integer port2,
String rLevel, Integer listener) throws Exception {
createClientCache(port1, port2, rLevel,
DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, listener,
null, null);
}
public static void createClientCache(Integer port1, Integer port2,
String rLevel, String conflate, Integer listener,
EvictionAttributes evictAttrs, ExpirationAttributes expAttrs)
throws Exception {
int[] ports = null;
if (port2 != -1) {
ports = new int[] { port1, port2 };
}
else {
ports = new int[] { port1 };
}
assertTrue("No server ports provided", ports != null);
createClientCache(ports, rLevel, conflate, listener, evictAttrs, expAttrs);
}
public static void createClientCache(int[] ports, String rLevel,
String conflate, Integer listener, EvictionAttributes evictAttrs,
ExpirationAttributes expAttrs) throws Exception {
CacheServerTestUtil.disableShufflingOfEndpoints();
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
props.setProperty(DistributionConfig.CLIENT_CONFLATION_PROP_NAME, conflate);
new DeltaPropagationDUnitTest("temp").createCache(props);
AttributesFactory factory = new AttributesFactory();
pool = BridgeTestCase.configureConnectionPool(factory, "localhost", ports,
true, Integer.parseInt(rLevel), 2, null, 1000, 250, false, -2);
factory.setScope(Scope.LOCAL);
if (listener.intValue() != 0) {
factory.addCacheListener(getCacheListener(listener.intValue()));
}
if (evictAttrs != null) {
factory.setEvictionAttributes(evictAttrs);
}
if (expAttrs != null) {
factory.setEntryTimeToLive(expAttrs);
}
if (evictAttrs!=null && evictAttrs.getAction().isOverflowToDisk()) {
// create diskstore with overflow dir
// since it's overflow, no need to recover, so we can use random number as dir name
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
File dir = new File("overflow_"+port);
if (!dir.exists()) {
dir.mkdir();
}
File[] dir1 = new File[] { dir };
DiskStoreFactory dsf = cache.createDiskStoreFactory();
factory.setDiskStoreName(dsf.setDiskDirs(dir1).create("client_overflow_ds").getName());
}
factory.setConcurrencyChecksEnabled(false);
RegionAttributes attrs = factory.create();
Region region = cache.createRegion(regionName, attrs);
logger = cache.getLogger();
}
private void createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
ds.disconnect();
ds = getSystem(props);
assertNotNull(ds);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
public static void verifyRegionSize(Integer regionSize,
Integer msgsRegionsize, Integer port) {
try {
// Get the clientMessagesRegion and check the size.
Region region = (Region)cache.getRegion("/" + regionName);
Region msgsRegion = (Region)cache.getRegion(BridgeServerImpl
.generateNameForClientMsgsRegion(port.intValue()));
logger.fine("size<serverRegion, clientMsgsRegion>: " + region.size()
+ ", " + msgsRegion.size());
assertEquals(regionSize.intValue(), region.size());
assertEquals(msgsRegionsize.intValue(), msgsRegion.size());
}
catch (Exception e) {
fail("failed in verifyRegionSize()" + e);
}
}
public static void createDurableCacheClient(Pool poolAttr, String regionName,
Properties dsProperties, Integer listenerCode, Boolean close) throws Exception {
new DeltaPropagationDUnitTest("temp").createCache(dsProperties);
PoolFactoryImpl pf = (PoolFactoryImpl)PoolManager.createFactory();
pf.init(poolAttr);
PoolImpl p = (PoolImpl)pf.create("DeltaPropagationDUnitTest");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
factory.setPoolName(p.getName());
if (listenerCode.intValue() != 0) {
factory.addCacheListener(getCacheListener(listenerCode));
}
RegionAttributes attrs = factory.create();
Region r = cache.createRegion(regionName, attrs);
r.registerInterest("ALL_KEYS");
pool = p;
cache.readyForEvents();
logger = cache.getLogger();
closeCache = close.booleanValue();
}
/*
* public static void createDeltaEntries(Long num) { }
*/
public static void registerInterestListAll() {
try {
Region r = cache.getRegion("/" + regionName);
assertNotNull(r);
r.registerInterest("ALL_KEYS");
}
catch (Exception ex) {
fail("failed in registerInterestListAll", ex);
}
}
public static void closeCache() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
public static void closeCache(boolean keepalive) {
if (cache != null && !cache.isClosed()) {
cache.close(keepalive);
cache.getDistributedSystem().disconnect();
}
}
public static boolean isLastKeyReceived() {
return lastKeyReceived;
}
public static void setLastKeyReceived(boolean val) {
lastKeyReceived = val;
}
public static void resetAll() {
DeltaTestImpl.resetDeltaInvokationCounters();
numOfCreates = numOfUpdates = numOfInvalidates = numOfDestroys = numOfEvents = 0;
lastKeyReceived = false;
markerReceived = false;
areListenerResultsValid = true;
listenerError = new StringBuffer("");
}
}