blob: c9819d46a57c836d05a43e468d7913d766270d6c [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.util.Properties;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxyStats;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
/**
*
* @author ashetkar
*
*/
public class Bug51400DUnitTest extends DistributedTestCase {
private static VM server0 = null;
private static VM server1 = null;
private static VM client0 = null;
private static VM client1 = null;
private static GemFireCacheImpl cache;
public static final String REGION_NAME = "Bug51400DUnitTest_region";
/**
* @param name
*/
public Bug51400DUnitTest(String name) {
super(name);
}
public void setUp() throws Exception {
super.setUp();
Host host = Host.getHost(0);
server0 = host.getVM(0);
server1 = host.getVM(1);
client0 = host.getVM(2);
client1 = host.getVM(3);
}
public void tearDown2() throws Exception {
closeCache();
client0.invoke(Bug51400DUnitTest.class, "closeCache");
client1.invoke(Bug51400DUnitTest.class, "closeCache");
server0.invoke(Bug51400DUnitTest.class, "closeCache");
server1.invoke(Bug51400DUnitTest.class, "closeCache");
}
public static void closeCache() throws Exception {
if (cache != null) {
cache.close();
}
}
public static Integer createServerCache(Integer mcastPort,
Integer maxMessageCount) throws Exception {
Properties props = new Properties();
props.setProperty("locators", "");
props.setProperty("mcast-port", String.valueOf(mcastPort));
// props.setProperty("log-file", "server_" + OSProcess.getId() + ".log");
// props.setProperty("log-level", "fine");
// props.setProperty("statistic-archive-file", "server_" + OSProcess.getId()
// + ".gfs");
// props.setProperty("statistic-sampling-enabled", "true");
Bug51400DUnitTest test = new Bug51400DUnitTest("Bug51400DUnitTest");
DistributedSystem ds = test.getSystem(props);
ds.disconnect();
cache = (GemFireCacheImpl)CacheFactory.create(test.getSystem());
// cache = (GemFireCacheImpl) new CacheFactory(props).create();
RegionFactory<String, String> rf = cache
.createRegionFactory(RegionShortcut.REPLICATE);
rf.setConcurrencyChecksEnabled(false);
Region<String, String> region = rf.create(REGION_NAME);
CacheServer server = cache.addCacheServer();
server.setMaximumMessageCount(maxMessageCount);
server.setPort(AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET));
server.start();
return server.getPort();
}
public static void createClientCache(String hostName, Integer[] ports,
Integer interval) throws Exception {
Properties props = new Properties();
// props.setProperty(DistributionConfig.DURABLE_CLIENT_ID_NAME,
// "my-durable-client-" + ports.length);
// props.setProperty(DistributionConfig.DURABLE_CLIENT_TIMEOUT_NAME, "300000");
// props.setProperty("log-file", "client_" + OSProcess.getId() + ".log");
// props.setProperty("log-level", "fine");
// props.setProperty("statistic-archive-file", "client_" + OSProcess.getId()
// + ".gfs");
// props.setProperty("statistic-sampling-enabled", "true");
DistributedSystem ds = new Bug51400DUnitTest("Bug51400DUnitTest").getSystem(props);
ds.disconnect();
ClientCacheFactory ccf = new ClientCacheFactory(props);
ccf.setPoolSubscriptionEnabled(true);
ccf.setPoolSubscriptionAckInterval(interval);
for (int port : ports) {
ccf.addPoolServer(hostName, port);
}
cache = (GemFireCacheImpl) ccf.create();
ClientRegionFactory<String, String> crf = cache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
Region<String, String> region = crf.create(REGION_NAME);
region.registerInterest("ALL_KEYS");
}
public static void verifyQueueSize(Boolean isPrimary,
Integer numOfEvents) throws Exception {
CacheClientProxyStats stats = ((CacheClientProxy) CacheClientNotifier
.getInstance().getClientProxies().toArray()[0]).getStatistics();
if (isPrimary) {
numOfEvents = numOfEvents + 1; // marker
}
long qSize = stats.getMessageQueueSize();
assertEquals("Expected queue size: " + numOfEvents
+ " but actual size: " + qSize + " at "
+ (isPrimary ? "primary." : "secondary."), numOfEvents.intValue(), qSize);
}
public void testNothing() {
// remove when ticket #51932 is fixed
}
public void ticket51932_testDeadlock() throws Throwable {
int maxQSize = 5;
// Set infinite ack interval so that the queue will not be drained.
int ackInterval = Integer.MAX_VALUE;
int mcastPort = AvailablePort.getRandomAvailablePort(AvailablePort.JGROUPS);
int port1 = (Integer) server0.invoke(Bug51400DUnitTest.class,
"createServerCache", new Object[] { mcastPort, maxQSize });
client1.invoke(Bug51400DUnitTest.class, "createClientCache",
new Object[] { getServerHostName(Host.getHost(0)), new Integer[]{port1}, ackInterval});
// Do puts from server as well as from client on the same key.
AsyncInvocation ai1 = server0.invokeAsync(Bug51400DUnitTest.class,
"updateKey", new Object[] { 2 * maxQSize });
AsyncInvocation ai2 = client1.invokeAsync(Bug51400DUnitTest.class,
"updateKey", new Object[] { 2 * maxQSize });
ai1.getResult();
ai2.getResult();
// Verify that the queue has crossed its limit of maxQSize
server0.invoke(Bug51400DUnitTest.class, "verifyQueueSize", new Object[] {
true, 2 * maxQSize });
}
public static void updateKey(Integer num) {
try {
String k = "51400_KEY";
Region r = cache.getRegion(REGION_NAME);
for (int i = 0; i < num; ++i) {
r.put(k, "VALUE_" + i);
}
}
catch (Exception e) {
fail("Failed in updateKey()" + e);
}
}
}