blob: 193aa8ffa978718b1e121918bdc36b7a602a3994 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.util.Properties;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.ExpirationAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueueStats;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
/**
* The test creates two datastores with a partitioned region, and also running a
* cache server each. A publisher client is connected to one server while a
* subscriber client is connected to both the servers. The partitioned region
* has entry expiry set with ttl of 3 seconds and action as DESTROY. The test
* ensures that the EXPIRE_DESTROY events are propagated to the subscriber
* client and the secondary server does process the QRMs for the EXPIRE_DESTROY
* events.
*
* @author ashetkar
*
*/
@SuppressWarnings("serial")
public class Bug47388DUnitTest extends DistributedTestCase {
private static VM vm0 = null;
private static VM vm1 = null;
private static VM vm2 = null;
private static VM vm3 = null;
private static GemFireCacheImpl cache;
private static volatile boolean lastKeyDestroyed = false;
public static final String REGION_NAME = "Bug47388DUnitTest_region";
/**
* @param name
*/
public Bug47388DUnitTest(String name) {
super(name);
}
public void setUp() throws Exception {
super.setUp();
disconnectFromDS();
Host host = Host.getHost(0);
vm0 = host.getVM(0); // datastore and server
vm1 = host.getVM(1); // datastore and server
vm2 = host.getVM(2); // durable client with subscription
vm3 = host.getVM(3); // durable client without subscription
//int mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS);
int port0 = (Integer) vm0.invoke(Bug47388DUnitTest.class,
"createCacheServerWithPRDatastore", new Object[] { });
int port1 = (Integer) vm1.invoke(Bug47388DUnitTest.class,
"createCacheServerWithPRDatastore", new Object[] { });
vm2.invoke(Bug47388DUnitTest.class, "createClientCache",
new Object[] { vm2.getHost(), new Integer[] { port0, port1 },
Boolean.TRUE });
vm3.invoke(Bug47388DUnitTest.class, "createClientCache",
new Object[] { vm3.getHost(), new Integer[] { port0 }, Boolean.FALSE });
}
public void tearDown2() throws Exception {
closeCache();
vm2.invoke(Bug47388DUnitTest.class, "closeCache");
vm3.invoke(Bug47388DUnitTest.class, "closeCache");
vm0.invoke(Bug47388DUnitTest.class, "closeCache");
vm1.invoke(Bug47388DUnitTest.class, "closeCache");
}
public static void closeCache() throws Exception {
if (cache != null) {
cache.close();
}
lastKeyDestroyed = false;
}
@SuppressWarnings("deprecation")
public static Integer createCacheServerWithPRDatastore()//Integer mcastPort)
throws Exception {
Properties props = new Properties();
Bug47388DUnitTest test = new Bug47388DUnitTest("Bug47388DUnitTest");
DistributedSystem ds = test.getSystem(props);
ds.disconnect();
cache = (GemFireCacheImpl)CacheFactory.create(test.getSystem());
RegionFactory<String, String> rf = cache
.createRegionFactory(RegionShortcut.PARTITION);
rf.setEntryTimeToLive(new ExpirationAttributes(3, ExpirationAction.DESTROY))
.setPartitionAttributes(
new PartitionAttributesFactory<String, String>()
.setRedundantCopies(1).setTotalNumBuckets(4).create())
.setConcurrencyChecksEnabled(false);
rf.create(REGION_NAME);
CacheServer server = cache.addCacheServer();
server.setPort(AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET));
server.start();
return server.getPort();
}
@SuppressWarnings("deprecation")
public static void createClientCache(Host host, Integer[] ports, Boolean doRI)
throws Exception {
Properties props = new Properties();
props.setProperty(DistributionConfig.DURABLE_CLIENT_ID_NAME,
"my-durable-client-" + ports.length);
props.setProperty(DistributionConfig.DURABLE_CLIENT_TIMEOUT_NAME, "300000");
DistributedSystem ds = new Bug47388DUnitTest("Bug47388DUnitTest").getSystem(props);
ds.disconnect();
ClientCacheFactory ccf = new ClientCacheFactory(props);
ccf.setPoolSubscriptionEnabled(doRI);
ccf.setPoolSubscriptionAckInterval(50);
ccf.setPoolSubscriptionRedundancy(1);
for (int port : ports) {
ccf.addPoolServer(host.getHostName(), port);
}
cache = (GemFireCacheImpl) ccf.create();
ClientRegionFactory<String, String> crf = cache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
if (doRI) {
crf.addCacheListener(new CacheListenerAdapter<String, String>() {
@Override
public void afterDestroy(EntryEvent<String, String> event) {
if (event.getKey().equalsIgnoreCase("LAST_KEY")) {
lastKeyDestroyed = true;
}
}
});
}
Region<String, String> region = crf.create(REGION_NAME);
if (doRI) {
region.registerInterest("ALL_KEYS", true);
cache.readyForEvents();
}
}
@SuppressWarnings("unchecked")
public static void doPuts(Integer numOfSets, Integer numOfPuts)
throws Exception {
Region<String, String> region = cache.getRegion(REGION_NAME);
for (int i = 0; i < numOfSets; i++) {
for (int j = 0; j < numOfPuts; j++) {
region.put("KEY_" + i + "_" + j, "VALUE_" + j);
}
}
region.put("LAST_KEY", "LAST_KEY");
}
public static Boolean isPrimaryServer() {
return ((CacheClientProxy) CacheClientNotifier.getInstance()
.getClientProxies().toArray()[0]).isPrimary();
}
public static void verifyClientSubscriptionStats(final Boolean isPrimary,
final Integer events) throws Exception {
WaitCriterion wc = new WaitCriterion() {
private long dispatched;
private long qrmed;
@Override
public boolean done() {
HARegionQueueStats stats = ((CacheClientProxy) CacheClientNotifier
.getInstance().getClientProxies().toArray()[0]).getHARegionQueue()
.getStatistics();
final int numOfEvents;
if (!isPrimary) {
numOfEvents = events - 1; // No marker
} else {
numOfEvents = events;
}
if (isPrimary) {
this.dispatched = stats.getEventsDispatched();
return numOfEvents == this.dispatched;
} else {
this.qrmed = stats.getEventsRemovedByQrm();
return this.qrmed == numOfEvents || (this.qrmed + 1) == numOfEvents;
// Why +1 above? Because sometimes(TODO: explain further) there may
// not be any QRM sent to the secondary for the last event dispatched
// at primary.
}
}
@Override
public String description() {
return "Expected events: " + events + " but actual eventsDispatched: "
+ this.dispatched + " and actual eventsRemovedByQrm: " + this.qrmed;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 500, true);
}
public static void waitForLastKeyDestroyed() throws Exception {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return lastKeyDestroyed;
}
@Override
public String description() {
return "Last key's destroy not received";
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 500, true);
}
public void bug51931_testQRMOfExpiredEventsProcessedSuccessfully() throws Exception {
int numOfSets = 2, numOfPuts = 5;
int totalEvents = 23; // = (numOfSets * numOfPuts) * 2 [eviction-destroys] +
// 2 [last key's put and eviction-destroy] + 1 [marker
// message]
vm3.invoke(Bug47388DUnitTest.class, "doPuts", new Object[] { numOfSets,
numOfPuts });
boolean isvm0Primary = (Boolean) vm0.invoke(Bug47388DUnitTest.class,
"isPrimaryServer");
vm2.invoke(Bug47388DUnitTest.class, "waitForLastKeyDestroyed");
vm0.invoke(Bug47388DUnitTest.class, "verifyClientSubscriptionStats",
new Object[] { isvm0Primary, totalEvents });
vm1.invoke(Bug47388DUnitTest.class, "verifyClientSubscriptionStats",
new Object[] { !isvm0Primary, totalEvents });
}
public void testNothingBecauseOfBug51931() {
// remove this when bug #51931 is fixed
}
}