blob: 36c24e5c07377ff731948a64cc6d89f845ab6157 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.Properties;
import com.gemstone.gemfire.DeltaTestImpl;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.InterestPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.SubscriptionAttributes;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.tcp.ConnectionTable;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
/**
*
* Tests the P2P delta propagation functionality.
*
*/
public class P2PDeltaPropagationDUnitTest extends DistributedTestCase
{
static VM server1 = null;
static VM server2 = null;
static VM server3 = null;
/** the cache */
private static Cache cache = null;
/** port for the cache server */
private static int PORT1;
private static int PORT2;
private static final int NEW_INT = 11;
private static final String NEW_STR = "DELTA";
private static final int NUM_OF_CREATES = 3;
static PoolImpl pool = null;
/** name of the test region */
private static final String REGION_NAME = "P2PDeltaPropagationDUnitTest_Region";
private static int numOfUpdates = 0;
private static int hasDeltaBytes = 0;
private static boolean check = false;
protected static Object waitLock = new Object();
/**
* Constructor
*/
public P2PDeltaPropagationDUnitTest(String name) {
super(name);
}
/*
* Delta gets distributed in P2P D-ACK.
*/
public void testP2PDeltaPropagationEnableScopeDAck() throws Exception
{
Object args[] = new Object[] { Boolean.TRUE, DataPolicy.REPLICATE,
Scope.DISTRIBUTED_ACK, Boolean.FALSE };
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
server3.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
//only delta should get send to server2 and server3
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
Thread.sleep(5000);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaEnabledServer");
server3.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaEnabledServer");
}
/*
* Delta gets distributed in P2P GLOBAL.
*/
public void testP2PDeltaPropagationEnableScopeGlobal() throws Exception
{
Object args[] = new Object[] { Boolean.TRUE, DataPolicy.REPLICATE,
Scope.GLOBAL, Boolean.FALSE };
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
server3.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
Thread.sleep(5000);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaEnabledServer");
server3.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaEnabledServer");
}
/*
* Full object gets resend in P2P D-ACK if delta can not be applied.
*/
public void testP2PDACKInvalidDeltaException() throws Exception
{
server1.invoke(P2PDeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {Boolean.TRUE});
server2.invoke(P2PDeltaPropagationDUnitTest.class,
"createServerCache",new Object[] {Boolean.TRUE});
server3.invoke(P2PDeltaPropagationDUnitTest.class,
"createServerCache",new Object[] {Boolean.TRUE});
//Delta apply should fail on server2 and server3 as values are not there
server2.invoke(P2PDeltaPropagationDUnitTest.class, "invalidate");
server3.invoke(P2PDeltaPropagationDUnitTest.class, "destroy");
server1.invoke(P2PDeltaPropagationDUnitTest.class, "putDelta");
Thread.sleep(5000);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaEnabledWithInvalidate");//Full object
server3.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaEnabledWithDestroy");
}
/*
* Full object will be send in case of P2P D-ACK(direct-ack = true).
*/
public void testP2PDeltaPropagationEnableDirectAckTrue() throws Exception
{
Object args[] = new Object[] { Boolean.TRUE, DataPolicy.NORMAL,
Scope.DISTRIBUTED_ACK, Boolean.FALSE };
ConnectionTable.threadWantsOwnResources();
createServerCache(Boolean.TRUE, DataPolicy.NORMAL,Scope.DISTRIBUTED_ACK, Boolean.FALSE);
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
put();
Thread.sleep(5000);
server1.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaEnabledServer");
ConnectionTable.threadWantsSharedResources();
}
/*
* Full object will be send in case of P2P D-NO-ACK
*/
public void testP2PDeltaPropagationEnableScopeDNoAck()throws Exception
{
Object args[] = new Object[] { Boolean.TRUE, DataPolicy.NORMAL,
Scope.DISTRIBUTED_NO_ACK, Boolean.FALSE };
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",args);
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
Thread.sleep(5000);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaDisabledServer");
}
/*
* Check for full object gets distributed when DS level delta property is OFF.
*/
public void testP2PDeltaPropagationDisable() throws Exception
{
server1.invoke(P2PDeltaPropagationDUnitTest.class,
"createServerCache", new Object[] {Boolean.FALSE});
server2.invoke(P2PDeltaPropagationDUnitTest.class,
"createServerCache",new Object[] {Boolean.FALSE});
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
server2.invoke(P2PDeltaPropagationDUnitTest.class, "getOnDeltaDisabledServer");
}
/*
* Delta gets distributed in P2P D-ACK with data policy empty on feeder.
*/
public void testP2PDeltaPropagationEnableScopeDAckDataPolicyEmpty()
throws Exception {
Object args[] = new Object[] { Boolean.TRUE, DataPolicy.REPLICATE,
Scope.DISTRIBUTED_ACK, Boolean.FALSE };
Object args1[] = new Object[] { Boolean.TRUE, DataPolicy.EMPTY,
Scope.DISTRIBUTED_ACK, Boolean.FALSE };
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
args1);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
args);
server3.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
args);
// only delta should get send to server2 and server3
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
Thread.sleep(5000);
server2.invoke(P2PDeltaPropagationDUnitTest.class,
"getOnDeltaEnabledServer");
server3.invoke(P2PDeltaPropagationDUnitTest.class,
"getOnDeltaEnabledServer");
}
/*
* Full Onject is gets distributed in P2P D-ACK with data policy empty on feeder when its uses regions create API.
*/
public void testP2PDeltaPropagationEnableScopeDAckDataPolicyEmptyWithRegionsCreateApi()
throws Exception {
Object args[] = new Object[] { Boolean.TRUE, DataPolicy.REPLICATE,
Scope.DISTRIBUTED_ACK, Boolean.FALSE };
Object args1[] = new Object[] { Boolean.TRUE, DataPolicy.EMPTY,
Scope.DISTRIBUTED_ACK, Boolean.FALSE };
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
args1);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
args);
server3.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
args);
/* clean flags */
server1.invoke(P2PDeltaPropagationDUnitTest.class,"resetFlags");
server2.invoke(P2PDeltaPropagationDUnitTest.class,"resetFlags");
server3.invoke(P2PDeltaPropagationDUnitTest.class,"resetFlags");
// only delta should get send to server2 and server3
server1.invoke(P2PDeltaPropagationDUnitTest.class, "create");
server2.invoke(P2PDeltaPropagationDUnitTest.class,
"verifyNoFailurePeer");
server3.invoke(P2PDeltaPropagationDUnitTest.class,
"verifyNoFailurePeer");
}
public void testPeerWithEmptyRegionIterestPolicyALLReceivesNoDelta() throws Exception {
// 1. Setup three peers, one with a region data policy set to EMPTY.
// 2. Do delta feeds on any one of the two peers with non-EMPTY region data policy.
// 3. Assert that peer with non-EMPTY data policy receives delta.
// 4. Assert that peer with EMPTY data policy receives full value in the first attempt itself.
Object replicate[] = new Object[] { Boolean.TRUE/* Delta */,
DataPolicy.REPLICATE, Scope.DISTRIBUTED_ACK, Boolean.TRUE /* listener */};
Object empty[] = new Object[] { Boolean.TRUE/* Delta */, DataPolicy.EMPTY,
Scope.DISTRIBUTED_ACK, Boolean.TRUE/* listener */, Boolean.TRUE /* ALL interest policy */};
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
replicate);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
replicate);
server3.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
empty);
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
server2.invoke(P2PDeltaPropagationDUnitTest.class, "verifyDeltaReceived",
new Object[] { Integer.valueOf(3) });
server3.invoke(P2PDeltaPropagationDUnitTest.class, "verifyNoDeltaReceived",
new Object[] { Integer.valueOf(3) });
}
public void testPeerWithEmptyRegionDefaultIterestPolicyReceivesNoEvents() throws Exception {
// 1. Setup three peers, one with a region data policy set to EMPTY.
// 2. Do delta feeds on any one of the two peers with non-EMPTY region data policy.
// 3. Assert that peer with non-EMPTY data policy receives delta.
// 4. Assert that peer with EMPTY data policy receives full value in the first attempt itself.
Object replicate[] = new Object[] { Boolean.TRUE/* Delta */,
DataPolicy.REPLICATE, Scope.DISTRIBUTED_ACK, Boolean.TRUE /* listener */};
Object empty[] = new Object[] { Boolean.TRUE/* Delta */, DataPolicy.EMPTY,
Scope.DISTRIBUTED_ACK, Boolean.TRUE /* listener */};
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
replicate);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
replicate);
server3.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
empty);
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
server2.invoke(P2PDeltaPropagationDUnitTest.class, "verifyDeltaReceived",
new Object[] { Integer.valueOf(3) });
server3.invoke(P2PDeltaPropagationDUnitTest.class, "verifyNoDeltaReceived",
new Object[] { Integer.valueOf(0/* no events */) });
}
public void testPeerWithEmptyRegionAndNoCacheServerReceivesOnlyFullValue() throws Exception {
// 1. Setup three peers, two with region data policy set to EMPTY.
// 2. Of these two EMPTY peers, only one has a cache server.
// 2. Do delta feeds on the peer with non-EMPTY region data policy.
// 3. Assert that the peer with cache server receives delta bytes along with the full value.
// 4. Assert that peer with no cache server receives full value but no delta bytes.
int port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
Object replicate[] = new Object[] {Boolean.TRUE/* Delta */,
DataPolicy.REPLICATE, Scope.DISTRIBUTED_ACK, Boolean.FALSE /* listener */};
Object emptyWithServer[] = new Object[] {Boolean.TRUE/* Delta */,
DataPolicy.EMPTY, Scope.DISTRIBUTED_ACK, Boolean.TRUE/* listener */,
Boolean.TRUE /* ALL interest policy */, port1};
Object emptyWithoutServer[] = new Object[] {Boolean.TRUE/* Delta */,
DataPolicy.EMPTY, Scope.DISTRIBUTED_ACK, Boolean.TRUE/* listener */,
Boolean.TRUE /* ALL interest policy */};
server1.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
replicate);
server2.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
emptyWithServer);
server3.invoke(P2PDeltaPropagationDUnitTest.class, "createServerCache",
emptyWithoutServer);
server1.invoke(P2PDeltaPropagationDUnitTest.class, "put");
server2.invoke(P2PDeltaPropagationDUnitTest.class, "verifyDeltaBytesReceived",
new Object[] { Integer.valueOf(2) });
server3.invoke(P2PDeltaPropagationDUnitTest.class, "verifyDeltaBytesReceived",
new Object[] { Integer.valueOf(0) });
}
public static void put() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
DeltaTestImpl test = new DeltaTestImpl();
r1.put("KEY", test);
test = new DeltaTestImpl();
test.setIntVar(NEW_INT);
r1.put("KEY",test);
test = new DeltaTestImpl();
test.setStr(NEW_STR);
r1.put("KEY",test);
}
public static void create() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
DeltaTestImpl test = new DeltaTestImpl();
r1.create("KEY", test);
test = new DeltaTestImpl();
test.setIntVar(NEW_INT);
r1.create("KEY1",test);
test = new DeltaTestImpl();
test.setStr(NEW_STR);
r1.create("KEY2",test);
}
public static void putDelta() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
DeltaTestImpl test = new DeltaTestImpl(9999,NEW_STR);
test.setIntVar(NEW_INT);
r1.put("KEY",test);
}
public static void invalidate() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
r1.localInvalidate("KEY");
}
public static void destroy() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
r1.localDestroy("KEY");
}
public static void getOnDeltaEnabledWithInvalidate() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertTrue(((DeltaTestImpl)r1.getEntry("KEY").getValue()).getIntVar() == NEW_INT);
assertTrue(((DeltaTestImpl)r1.getEntry("KEY").getValue()).getStr().equals(NEW_STR));
}
public static void getOnDeltaEnabledWithDestroy() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNull(((DeltaTestImpl)r1.getEntry("KEY")));
}
public static void getOnDeltaEnabledServer() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertTrue(((DeltaTestImpl)r1.getEntry("KEY").getValue()).getIntVar() == NEW_INT);
assertTrue(((DeltaTestImpl)r1.getEntry("KEY").getValue()).getStr().equals(NEW_STR));
}
public static void getOnDeltaDisabledServer() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertFalse(((DeltaTestImpl)r1.getEntry("KEY").getValue()).getIntVar() == NEW_INT);//should be overwritten as delta is disabled
assertTrue(((DeltaTestImpl)r1.getEntry("KEY").getValue()).getStr().equals(NEW_STR));
}
public static void checkForNoFullObjectResend() throws Exception
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNull(((DeltaTestImpl)r1.getEntry("KEY").getValue()));
}
public static void checkForFlag() throws Exception
{
assertFalse(check);
}
public void setUp() throws Exception
{
super.setUp();
final Host host = Host.getHost(0);
server1 = host.getVM(0);
server2 = host.getVM(1);
server3 = host.getVM(2);
resetFlags();
server1.invoke(P2PDeltaPropagationDUnitTest.class, "resetFlags");
server2.invoke(P2PDeltaPropagationDUnitTest.class, "resetFlags");
server3.invoke(P2PDeltaPropagationDUnitTest.class, "resetFlags");
}
private Cache createCache(Properties props) throws Exception
{
DistributedSystem ds = getSystem(props);
ds.disconnect();
ds = getSystem(props);
cache = CacheFactory.create(ds);
if (cache == null) {
throw new Exception("CacheFactory.create() returned null ");
}
return cache;
}
public static void createServerCache(Boolean flag) throws Exception
{
ConnectionTable.threadWantsSharedResources();
createServerCache(flag, DataPolicy.DEFAULT, Scope.DISTRIBUTED_ACK, false);
}
public static void createServerCache(Boolean flag, DataPolicy policy,
Scope scope, Boolean listener) throws Exception {
createServerCache(flag, policy, scope, listener, Boolean.FALSE);
}
public static void createServerCache(Boolean flag, DataPolicy policy,
Scope scope, Boolean listener, Boolean interestPolicyAll) throws Exception {
createServerCache(flag, policy, scope, listener, interestPolicyAll, null);
}
public static void createServerCache(Boolean flag, DataPolicy policy,
Scope scope, Boolean listener, Boolean interestPolicyAll,
Integer port) throws Exception {
P2PDeltaPropagationDUnitTest test = new P2PDeltaPropagationDUnitTest("temp");
Properties props = new Properties();
if (!flag) {
props.setProperty(DistributionConfig.DELTA_PROPAGATION_PROP_NAME, "false");
}
cache = test.createCache(props);
AttributesFactory factory = new AttributesFactory();
factory.setScope(scope);
factory.setDataPolicy(policy);
if (policy == DataPolicy.EMPTY && interestPolicyAll) {
factory.setSubscriptionAttributes(new SubscriptionAttributes(
InterestPolicy.ALL));
}
if (listener) {
factory.addCacheListener(new CacheListenerAdapter() {
@SuppressWarnings("synthetic-access")
public void afterUpdate(EntryEvent event) {
numOfUpdates++;
cache.getLoggerI18n().fine(
"afterUpdate(): numOfUpdates = " + numOfUpdates);
cache.getLoggerI18n().fine(
"(key, val): " + event.getKey() + ", " + event.getNewValue());
if (event.getOldValue() != null) {
if (event.getOldValue() == event.getNewValue()) {
check = Boolean.TRUE;
}
}
if (((EntryEventImpl)event).getDeltaBytes() != null) {
cache.getLoggerI18n().fine("delta bytes received. " + hasDeltaBytes);
assertTrue("No full value received for event " + event,
((EntryEventImpl)event).getNewValue() != null);
hasDeltaBytes++;
} else {
cache.getLoggerI18n().fine("delta bytes not received.");
}
}
});
}
Region region = cache.createRegion(REGION_NAME, factory.create());
if (!policy.isReplicate()) {
region.create("KEY", "KEY");
}
if (port != null) {
CacheServer server1 = cache.addCacheServer();
server1.setPort(port);
server1.start();
}
}
public void tearDown2() throws Exception
{
super.tearDown2();
closeCache();
server1.invoke(P2PDeltaPropagationDUnitTest.class, "closeCache");
server2.invoke(P2PDeltaPropagationDUnitTest.class, "closeCache");
server3.invoke(P2PDeltaPropagationDUnitTest.class, "closeCache");
}
public static void closeCache()
{
check = false;
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
public static void verifyNoFailurePeer() throws Exception
{
Region reg = cache.getRegion(Region.SEPARATOR + REGION_NAME);
long elapsed = 0;
long start = System.currentTimeMillis();
while(elapsed < 10000 && reg.size() < NUM_OF_CREATES){
try {
elapsed = System.currentTimeMillis() - start;
Thread.sleep(100);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
assertTrue("create's are missing", reg.size() == NUM_OF_CREATES);
// start validation
CachePerfStats stats = ((DistributedRegion)cache.getRegion(REGION_NAME))
.getCachePerfStats();
long deltaFailures = stats.getDeltaFailedUpdates();
assertTrue("delta failures count is not zero", deltaFailures == 0);
assertTrue("fromDelta invoked", !DeltaTestImpl.fromDeltaFeatureUsed());
}
public static void verifyDeltaReceived(Integer updates) {
Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
CachePerfStats stats = ((DistributedRegion)cache.getRegion(REGION_NAME))
.getCachePerfStats();
long deltaFailures = stats.getDeltaFailedUpdates();
long deltas = stats.getDeltaUpdates();
assertTrue("Failures while processing delta at receiver.",
deltaFailures == 0);
assertTrue("Expected 2 deltas to be processed at receiver but were "
+ deltas + " (statistics)", deltas == 2);
assertTrue(
"Expected 2 deltas to be processed at receiver but were "
+ DeltaTestImpl.getFromDeltaInvokations()
+ " (implementation counter)", DeltaTestImpl
.getFromDeltaInvokations() == 2);
assertTrue(
"Expected " + updates + " updates but found " + numOfUpdates,
numOfUpdates == updates);
DeltaTestImpl val = (DeltaTestImpl)region.getEntry("KEY").getValue();
assertTrue("Latest value not received, found: " + val, NEW_STR.equals(val
.getStr()));
}
public static void verifyNoDeltaReceived(Integer updates) {
CachePerfStats stats = ((DistributedRegion)cache.getRegion(REGION_NAME))
.getCachePerfStats();
long deltaFailures = stats.getDeltaFailedUpdates();
long deltas = stats.getDeltaUpdates();
assertTrue(
"Failures while processing delta at receiver. But deltas were not expected.",
deltaFailures == 0);
assertFalse(
"Expected no deltas to be processed at receiver but processed were "
+ deltas + " (statistics)", deltas > 0);
assertFalse(
"Expected no deltas to be processed at receiver but processed were "
+ DeltaTestImpl.getFromDeltaInvokations()
+ " (implementation counter)", DeltaTestImpl.fromDeltaFeatureUsed());
assertTrue("Expected " + updates + " updates but found " + numOfUpdates,
numOfUpdates == updates);
}
public static void verifyDeltaBytesReceived(Integer num) {
assertTrue("Expected " + num + " events with delta bytes in it but found "
+ hasDeltaBytes, hasDeltaBytes == num);
}
public static void resetFlags() {
DeltaTestImpl.resetDeltaInvokationCounters();
numOfUpdates = 0;
hasDeltaBytes = 0;
check = false;
}
}