blob: fc81596845bc3a023b1b8391ab69db27bdcdd47f [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache30;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.cache.Region.Entry;
import com.gemstone.gemfire.cache.util.*;
import com.gemstone.gemfire.distributed.internal.*;
import com.gemstone.gemfire.internal.tcp.Connection;
import dunit.*;
import java.io.*;
import java.util.*;
/**
* Test to make sure slow receiver queuing is working
*
* @author darrel
* @since 4.2.1
*/
public class SlowRecDUnitDisabledTest extends CacheTestCase {
public SlowRecDUnitDisabledTest(String name) {
super(name);
}
// this test has special config of its distributed system so
// the setUp and tearDown methods need to make sure we don't
// use the ds from previous test and that we don't leave ours around
// for the next test to use.
public void setUp() throws Exception {
try {
disconnectAllFromDS();
} finally {
super.setUp();
}
}
public void tearDown2() throws Exception {
try {
super.tearDown2();
} finally {
disconnectAllFromDS();
}
}
////////////////////// Test Methods //////////////////////
private VM getOtherVm() {
Host host = Host.getHost(0);
return host.getVM(0);
}
static protected Object lastCallback = null;
private void doCreateOtherVm(final Properties p, final boolean addListener) {
VM vm = getOtherVm();
vm.invoke(new CacheSerializableRunnable("create root") {
public void run2() throws CacheException {
getSystem(p);
createAckRegion(true, false);
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_NO_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
if (addListener) {
CacheListener cl = new CacheListenerAdapter() {
public void afterUpdate(EntryEvent event) {
// make the slow receiver event slower!
try {Thread.sleep(500);} catch (InterruptedException shuttingDown) {fail("interrupted");}
}
};
af.setCacheListener(cl);
} else {
CacheListener cl = new CacheListenerAdapter() {
public void afterCreate(EntryEvent event) {
// getLogWriter().info("afterCreate " + event.getKey());
if (event.getCallbackArgument() != null) {
lastCallback = event.getCallbackArgument();
}
if (event.getKey().equals("sleepkey")) {
int sleepMs = ((Integer)event.getNewValue()).intValue();
// getLogWriter().info("sleepkey sleeping for " + sleepMs);
try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
}
}
public void afterUpdate(EntryEvent event) {
// getLogWriter().info("afterUpdate " + event.getKey());
if (event.getCallbackArgument() != null) {
lastCallback = event.getCallbackArgument();
}
if (event.getKey().equals("sleepkey")) {
int sleepMs = ((Integer)event.getNewValue()).intValue();
// getLogWriter().info("sleepkey sleeping for " + sleepMs);
try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
}
}
public void afterInvalidate(EntryEvent event) {
if (event.getCallbackArgument() != null) {
lastCallback = event.getCallbackArgument();
}
}
public void afterDestroy(EntryEvent event) {
if (event.getCallbackArgument() != null) {
lastCallback = event.getCallbackArgument();
}
}
};
af.setCacheListener(cl);
}
Region r1 = createRootRegion("slowrec", af.create());
// place holder so we receive updates
r1.create("key", "value");
}
});
}
static protected final String CHECK_INVALID = "CHECK_INVALID";
private void checkLastValueInOtherVm(final String lastValue, final Object lcb) {
VM vm = getOtherVm();
vm.invoke(new CacheSerializableRunnable("check last value") {
public void run2() throws CacheException {
Region r1 = getRootRegion("slowrec");
if (lcb != null) {
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return lcb.equals(lastCallback);
}
public String description() {
return "waiting for callback";
}
};
DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
assertEquals(lcb, lastCallback);
}
if (lastValue == null) {
final Region r = r1;
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return r.getEntry("key") == null;
}
public String description() {
return "waiting for key to become null";
}
};
DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
assertEquals(null, r1.getEntry("key"));
} else if (CHECK_INVALID.equals(lastValue)) {
// should be invalid
{
final Region r = r1;
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
Entry e = r.getEntry("key");
if (e == null) {
return false;
}
return e.getValue() == null;
}
public String description() {
return "waiting for invalidate";
}
};
DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
// assertNotNull(re);
// assertEquals(null, value);
}
} else {
{
int retryCount = 1000;
Region.Entry re = null;
Object value = null;
while (retryCount-- > 0) {
re = r1.getEntry("key");
if (re != null) {
value = re.getValue();
if (value != null && value.equals(lastValue)) {
break;
}
}
try {Thread.sleep(50);} catch (InterruptedException ignore) {fail("interrupted");}
}
assertNotNull(re);
assertNotNull(value);
assertEquals(lastValue, value);
}
}
}
});
}
private void forceQueueFlush() {
Connection.FORCE_ASYNC_QUEUE=false;
final DMStats stats = getSystem().getDistributionManager().getStats();
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return stats.getAsyncThreads() == 0;
}
public String description() {
return "Waiting for async threads to disappear";
}
};
DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
}
private void forceQueuing(final Region r) throws CacheException {
Connection.FORCE_ASYNC_QUEUE=true;
final DMStats stats = getSystem().getDistributionManager().getStats();
r.put("forcekey", "forcevalue");
// wait for the flusher to get its first flush in progress
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return stats.getAsyncQueueFlushesInProgress() != 0;
}
public String description() {
return "waiting for flushes to start";
}
};
DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
}
/**
* Make sure that noack puts to a receiver
* will eventually queue and then catch up.
*/
public void testNoAck() throws CacheException {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
final Region r = createRootRegion("slowrec", factory.create());
final DMStats stats = getSystem().getDistributionManager().getStats();
// create receiver in vm0 with queuing enabled
Properties p = new Properties();
p.setProperty("async-distribution-timeout", "1");
doCreateOtherVm(p, false);
int repeatCount = 2;
int count = 0;
while (repeatCount-- > 0) {
forceQueuing(r);
final Object key = "key";
long queuedMsgs = stats.getAsyncQueuedMsgs();
long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
// long conflatedMsgs = stats.getAsyncConflatedMsgs();
long queueSize = stats.getAsyncQueueSize();
String lastValue = "";
final long intialQueuedMsgs = queuedMsgs;
long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
try {
// loop while we still have queued the initially queued msgs
// OR the cur # of queued msgs < 6
while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
String value = "count=" + count;
lastValue = value;
r.put(key, value);
count ++;
queueSize = stats.getAsyncQueueSize();
queuedMsgs = stats.getAsyncQueuedMsgs();
dequeuedMsgs = stats.getAsyncDequeuedMsgs();
curQueuedMsgs = queuedMsgs - dequeuedMsgs;
}
getLogWriter().info("After " + count + " " + " puts slowrec mode kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
} finally {
forceQueueFlush();
}
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return stats.getAsyncQueueSize() == 0;
}
public String description() {
return "Waiting for queues to empty";
}
};
final long start = System.currentTimeMillis();
DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
final long finish = System.currentTimeMillis();
getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed. lastValue=" + lastValue);
checkLastValueInOtherVm(lastValue, null);
}
}
/**
* Create a region named AckRegion with ACK scope
*/
protected Region createAckRegion(boolean mirror, boolean conflate) throws CacheException {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
if (mirror) {
factory.setDataPolicy(DataPolicy.REPLICATE);
}
if (conflate) {
factory.setEnableAsyncConflation(true);
}
final Region r = createRootRegion("AckRegion", factory.create());
return r;
}
/**
* Make sure that noack puts to a receiver
* will eventually queue and then catch up with conflation
*/
public void testNoAckConflation() throws CacheException {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setEnableAsyncConflation(true);
final Region r = createRootRegion("slowrec", factory.create());
final DMStats stats = getSystem().getDistributionManager().getStats();
// create receiver in vm0 with queuing enabled
Properties p = new Properties();
p.setProperty("async-distribution-timeout", "1");
doCreateOtherVm(p, false);
forceQueuing(r);
final Object key = "key";
int count = 0;
// long queuedMsgs = stats.getAsyncQueuedMsgs();
// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
// long queueSize = stats.getAsyncQueueSize();
String lastValue = "";
final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
long start = 0;
try {
while ((stats.getAsyncConflatedMsgs()-initialConflatedMsgs) < 1000) {
String value = "count=" + count;
lastValue = value;
r.put(key, value);
count ++;
// getLogWriter().info("After " + count + " "
// + " puts queueSize=" + queueSize
// + " queuedMsgs=" + queuedMsgs
// + " dequeuedMsgs=" + dequeuedMsgs
// + " conflatedMsgs=" + conflatedMsgs);
}
start = System.currentTimeMillis();
} finally {
forceQueueFlush();
}
// queueSize = stats.getAsyncQueueSize();
// queuedMsgs = stats.getAsyncQueuedMsgs();
// getLogWriter().info("After " + count + " "
// + " puts slowrec mode kicked in by queuing "
// + queuedMsgs + " for a total size of " + queueSize
// + " conflatedMsgs=" + conflatedMsgs
// + " dequeuedMsgs=" + dequeuedMsgs);
// final long start = System.currentTimeMillis();
// while (stats.getAsyncQueuedMsgs() > stats.getAsyncDequeuedMsgs()) {
// try {Thread.sleep(100);} catch (InterruptedException ignore) {}
// queueSize = stats.getAsyncQueueSize();
// queuedMsgs = stats.getAsyncQueuedMsgs();
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
// conflatedMsgs = stats.getAsyncConflatedMsgs();
// getLogWriter().info("After sleeping"
// + " queueSize=" + queueSize
// + " queuedMsgs=" + queuedMsgs
// + " dequeuedMsgs=" + dequeuedMsgs
// + " conflatedMsgs=" + conflatedMsgs);
final long finish = System.currentTimeMillis();
getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + (stats.getAsyncDequeuedMsgs()-intialDeQueuedMsgs) + " were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The lastValue was " + lastValue);
checkLastValueInOtherVm(lastValue, null);
}
/**
* make sure ack does not hang
* make sure two ack updates do not conflate but are both queued
*/
public void testAckConflation() throws CacheException {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setEnableAsyncConflation(true);
final Region r = createRootRegion("slowrec", factory.create());
final Region ar = createAckRegion(false, true);
ar.create("ackKey", "ackValue");
final DMStats stats = getSystem().getDistributionManager().getStats();
// create receiver in vm0 with queuing enabled
Properties p = new Properties();
p.setProperty("async-distribution-timeout", "2");
doCreateOtherVm(p, false);
forceQueuing(r);
{
// make sure ack does not hang
// make sure two ack updates do not conflate but are both queued
long startQueuedMsgs = stats.getAsyncQueuedMsgs();
long startConflatedMsgs = stats.getAsyncConflatedMsgs();
Thread t = new Thread(new Runnable() {
public void run() {
ar.put("ackKey", "ackValue");
}
});
t.start();
Thread t2 = new Thread(new Runnable() {
public void run() {
ar.put("ackKey", "ackValue");
}
});
t2.start();
// give threads a chance to get queued
try {Thread.sleep(100);} catch (InterruptedException ignore) {fail("interrupted");}
forceQueueFlush();
DistributedTestCase.join(t, 2 * 1000, getLogWriter());
DistributedTestCase.join(t2, 2 * 1000, getLogWriter());
long endQueuedMsgs = stats.getAsyncQueuedMsgs();
long endConflatedMsgs = stats.getAsyncConflatedMsgs();
assertEquals(startConflatedMsgs, endConflatedMsgs);
// queue should be flushed by the time we get an ack
assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
assertEquals(startQueuedMsgs+2, endQueuedMsgs);
}
}
/**
* Make sure that only sequences of updates are conflated
* Also checks that sending to a conflating region and non-conflating region
* does the correct thing.
* Test disabled because it intermittently fails due to race conditions
* in test. This has been fixed in congo's tests. See bug 35357.
*/
public void _disabled_testConflationSequence() throws CacheException {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setEnableAsyncConflation(true);
final Region r = createRootRegion("slowrec", factory.create());
factory.setEnableAsyncConflation(false);
final Region noConflate = createRootRegion("noConflate", factory.create());
final DMStats stats = getSystem().getDistributionManager().getStats();
// create receiver in vm0 with queuing enabled
Properties p = new Properties();
p.setProperty("async-distribution-timeout", "1");
doCreateOtherVm(p, false);
{
VM vm = getOtherVm();
vm.invoke(new CacheSerializableRunnable("create noConflate") {
public void run2() throws CacheException {
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_NO_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
createRootRegion("noConflate", af.create());
}
});
}
// now make sure update+destroy does not conflate
final Object key = "key";
getLogWriter().info("[testConflationSequence] about to force queuing");
forceQueuing(r);
int count = 0;
String value = "";
String lastValue = value;
Object mylcb = null;
long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
// long initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
int endCount = count+60;
getLogWriter().info("[testConflationSequence] about to build up queue");
long begin = System.currentTimeMillis();
while (count < endCount) {
value = "count=" + count;
lastValue = value;
r.create(key, value);
count ++;
value = "count=" + count;
lastValue = value;
r.put(key, value);
count ++;
mylcb = value;
r.destroy(key, mylcb);
count ++;
lastValue = null;
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
assertTrue(System.currentTimeMillis() < begin+1000*60*2);
}
assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
forceQueueFlush();
checkLastValueInOtherVm(lastValue, mylcb);
// now make sure create+update+localDestroy does not conflate
getLogWriter().info("[testConflationSequence] force queuing create-update-destroy");
forceQueuing(r);
initialConflatedMsgs = stats.getAsyncConflatedMsgs();
// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
endCount = count + 40;
getLogWriter().info("[testConflationSequence] create-update-destroy");
begin = System.currentTimeMillis();
while (count < endCount) {
value = "count=" + count;
lastValue = value;
r.create(key, value);
count++;
value = "count=" + count;
lastValue = value;
r.put(key, value);
count ++;
r.localDestroy(key);
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
assertTrue(System.currentTimeMillis() < begin+1000*60*2);
}
assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
forceQueueFlush();
checkLastValueInOtherVm(lastValue, null);
// now make sure update+invalidate does not conflate
getLogWriter().info("[testConflationSequence] force queuing update-invalidate");
forceQueuing(r);
initialConflatedMsgs = stats.getAsyncConflatedMsgs();
// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
value = "count=" + count;
lastValue = value;
r.create(key, value);
count++;
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
endCount = count + 40;
getLogWriter().info("[testConflationSequence] update-invalidate");
begin = System.currentTimeMillis();
while (count < endCount) {
value = "count=" + count;
lastValue = value;
r.put(key, value);
count ++;
r.invalidate(key);
count ++;
lastValue = CHECK_INVALID;
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
assertTrue(System.currentTimeMillis() < begin+1000*60*2);
}
assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
forceQueueFlush();
getLogWriter().info("[testConflationSequence] assert other vm");
checkLastValueInOtherVm(lastValue, null);
r.destroy(key);
// now make sure updates to a conflating region are conflated even while
// updates to a non-conflating are not.
getLogWriter().info("[testConflationSequence] conflate & no-conflate regions");
forceQueuing(r);
final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
value = "count=" + count;
lastValue = value;
long conflatedMsgs = stats.getAsyncConflatedMsgs();
long queuedMsgs = stats.getAsyncQueuedMsgs();
r.create(key, value);
queuedMsgs++;
assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
r.put(key, value);
queuedMsgs++;
assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
noConflate.create(key, value);
queuedMsgs++;
assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
noConflate.put(key, value);
queuedMsgs++;
assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
count++;
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
endCount = count + 80;
begin = System.currentTimeMillis();
getLogWriter().info("[testConflationSequence:DEBUG] count=" + count
+ " queuedMsgs=" + stats.getAsyncQueuedMsgs()
+ " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
+ " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs()
+ " asyncSocketWrites=" + stats.getAsyncSocketWrites()
);
while (count < endCount) {
// make sure we continue to have a flush in progress
assertEquals(1, stats.getAsyncThreads());
assertEquals(1, stats.getAsyncQueues());
assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
// make sure we are not completing any flushing while this loop is in progress
assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
value = "count=" + count;
lastValue = value;
r.put(key, value);
count ++;
// make sure it was conflated and not queued
assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
conflatedMsgs++;
assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
noConflate.put(key, value);
// make sure it was queued and not conflated
queuedMsgs++;
assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
assertTrue(System.currentTimeMillis() < begin+1000*60*2);
}
forceQueueFlush();
getLogWriter().info("[testConflationSequence] assert other vm");
checkLastValueInOtherVm(lastValue, null);
}
/**
* Make sure that exceeding the queue size limit causes a disconnect.
*/
public void testSizeDisconnect() throws CacheException {
final String expected =
"com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
"||java.io.IOException: Broken pipe";
final String addExpected =
"<ExpectedException action=add>" + expected + "</ExpectedException>";
final String removeExpected =
"<ExpectedException action=remove>" + expected + "</ExpectedException>";
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
final Region r = createRootRegion("slowrec", factory.create());
final DM dm = getSystem().getDistributionManager();
final DMStats stats = dm.getStats();
// set others before vm0 connects
final Set others = dm.getOtherDistributionManagerIds();
// create receiver in vm0 with queuing enabled
Properties p = new Properties();
p.setProperty("async-distribution-timeout", "5");
p.setProperty("async-max-queue-size", "1"); // 1 meg
doCreateOtherVm(p, false);
final Object key = "key";
final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give us 10 of these 100K msgs before queue full
final byte[] value = new byte[VALUE_SIZE];
int count = 0;
forceQueuing(r);
long queuedMsgs = stats.getAsyncQueuedMsgs();
long queueSize = stats.getAsyncQueueSize();
getCache().getLogger().info(addExpected);
try {
while (stats.getAsyncQueueSizeExceeded() == 0 && stats.getAsyncQueueTimeouts() == 0) {
r.put(key, value);
count ++;
if (stats.getAsyncQueueSize() > 0) {
queuedMsgs = stats.getAsyncQueuedMsgs();
queueSize = stats.getAsyncQueueSize();
}
if (count > 100) {
fail("should have exceeded max-queue-size by now");
}
}
getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
// make sure we lost a connection to vm0
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return dm.getOtherDistributionManagerIds().size() <= others.size()
&& stats.getAsyncQueueSize() == 0;
}
public String description() {
return "waiting for connection loss";
}
};
DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
}
finally {
forceQueueFlush();
getCache().getLogger().info(removeExpected);
}
assertEquals(others, dm.getOtherDistributionManagerIds());
assertEquals(0, stats.getAsyncQueueSize());
}
/**
* Make sure that exceeding the async-queue-timeout causes a disconnect.<p>
* [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
* in build.xml in the splitbrainNov07 branch. It had been disabled since
* June 2006 due to hangs. Some of the tests, like this one, still need
* work because the periodically (some quite often) fail.
*/
public void donottestTimeoutDisconnect() throws CacheException {
final String expected =
"com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
"||java.io.IOException: Broken pipe";
final String addExpected =
"<ExpectedException action=add>" + expected + "</ExpectedException>";
final String removeExpected =
"<ExpectedException action=remove>" + expected + "</ExpectedException>";
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
final Region r = createRootRegion("slowrec", factory.create());
final DM dm = getSystem().getDistributionManager();
final DMStats stats = dm.getStats();
// set others before vm0 connects
final Set others = dm.getOtherDistributionManagerIds();
// create receiver in vm0 with queuing enabled
Properties p = new Properties();
p.setProperty("async-distribution-timeout", "5");
p.setProperty("async-queue-timeout", "500"); // 500 ms
doCreateOtherVm(p, true);
final Object key = "key";
final int VALUE_SIZE = 1024; // 1k
final byte[] value = new byte[VALUE_SIZE];
int count = 0;
long queuedMsgs = stats.getAsyncQueuedMsgs();
long queueSize = stats.getAsyncQueueSize();
final long timeoutLimit = System.currentTimeMillis() + 5000;
getCache().getLogger().info(addExpected);
try {
while (stats.getAsyncQueueTimeouts() == 0) {
r.put(key, value);
count ++;
if (stats.getAsyncQueueSize() > 0) {
queuedMsgs = stats.getAsyncQueuedMsgs();
queueSize = stats.getAsyncQueueSize();
}
if (System.currentTimeMillis() > timeoutLimit) {
fail("should have exceeded async-queue-timeout by now");
}
}
getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
// make sure we lost a connection to vm0
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
if (dm.getOtherDistributionManagerIds().size() > others.size()) {
return false;
}
return stats.getAsyncQueueSize() == 0;
}
public String description() {
return "waiting for departure";
}
};
DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
}
finally {
getCache().getLogger().info(removeExpected);
}
assertEquals(others, dm.getOtherDistributionManagerIds());
assertEquals(0, stats.getAsyncQueueSize());
}
// static helper methods ---------------------------------------------------
private static final String KEY_SLEEP = "KEY_SLEEP";
private static final String KEY_WAIT = "KEY_WAIT";
private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
protected final static int CALLBACK_CREATE = 0;
protected final static int CALLBACK_UPDATE = 1;
protected final static int CALLBACK_INVALIDATE = 2;
protected final static int CALLBACK_DESTROY = 3;
protected final static int CALLBACK_REGION_INVALIDATE = 4;
protected final static Integer CALLBACK_CREATE_INTEGER = new Integer(CALLBACK_CREATE);
protected final static Integer CALLBACK_UPDATE_INTEGER = new Integer(CALLBACK_UPDATE);
protected final static Integer CALLBACK_INVALIDATE_INTEGER = new Integer(CALLBACK_INVALIDATE);
protected final static Integer CALLBACK_DESTROY_INTEGER = new Integer(CALLBACK_DESTROY);
protected final static Integer CALLBACK_REGION_INVALIDATE_INTEGER = new Integer(CALLBACK_REGION_INVALIDATE);
private static class CallbackWrapper {
public final Object callbackArgument;
public final int callbackType;
public CallbackWrapper(Object callbackArgument, int callbackType) {
this.callbackArgument = callbackArgument;
this.callbackType = callbackType;
}
public String toString() {
return "CallbackWrapper: " + callbackArgument.toString() + " of type " + callbackType;
}
}
protected static class ControlListener extends CacheListenerAdapter {
public final LinkedList callbackArguments = new LinkedList();
public final LinkedList callbackTypes = new LinkedList();
public final Object CONTROL_LOCK = new Object();
public void afterCreate(EntryEvent event) {
getLogWriter().info(event.getRegion().getName() + " afterCreate " + event.getKey());
synchronized(this.CONTROL_LOCK) {
if (event.getCallbackArgument() != null) {
this.callbackArguments.add(
new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
this.CONTROL_LOCK.notifyAll();
}
}
processEvent(event);
}
public void afterUpdate(EntryEvent event) {
getLogWriter().info(event.getRegion().getName() + " afterUpdate " + event.getKey());
synchronized(this.CONTROL_LOCK) {
if (event.getCallbackArgument() != null) {
this.callbackArguments.add(
new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
this.CONTROL_LOCK.notifyAll();
}
}
processEvent(event);
}
public void afterInvalidate(EntryEvent event) {
synchronized(this.CONTROL_LOCK) {
if (event.getCallbackArgument() != null) {
this.callbackArguments.add(
new CallbackWrapper(event.getCallbackArgument(), CALLBACK_INVALIDATE));
this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
this.CONTROL_LOCK.notifyAll();
}
}
}
public void afterDestroy(EntryEvent event) {
synchronized(this.CONTROL_LOCK) {
if (event.getCallbackArgument() != null) {
this.callbackArguments.add(
new CallbackWrapper(event.getCallbackArgument(), CALLBACK_DESTROY));
this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
this.CONTROL_LOCK.notifyAll();
}
}
}
public void afterRegionInvalidate(RegionEvent event) {
synchronized(this.CONTROL_LOCK) {
if (event.getCallbackArgument() != null) {
this.callbackArguments.add(
new CallbackWrapper(event.getCallbackArgument(), CALLBACK_REGION_INVALIDATE));
this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
this.CONTROL_LOCK.notifyAll();
}
}
}
private void processEvent(EntryEvent event) {
if (event.getKey().equals(KEY_SLEEP)) {
processSleep(event);
}
else if (event.getKey().equals(KEY_WAIT)) {
processWait(event);
}
else if (event.getKey().equals(KEY_DISCONNECT)) {
processDisconnect(event);
}
}
private void processSleep(EntryEvent event) {
int sleepMs = ((Integer)event.getNewValue()).intValue();
getLogWriter().info("[processSleep] sleeping for " + sleepMs);
try {
Thread.sleep(sleepMs);
} catch (InterruptedException ignore) {fail("interrupted");}
}
private void processWait(EntryEvent event) {
int sleepMs = ((Integer)event.getNewValue()).intValue();
getLogWriter().info("[processWait] waiting for " + sleepMs);
synchronized(this.CONTROL_LOCK) {
try {
this.CONTROL_LOCK.wait(sleepMs);
} catch (InterruptedException ignore) {return;}
}
}
private void processDisconnect(EntryEvent event) {
getLogWriter().info("[processDisconnect] disconnecting");
disconnectFromDS();
}
};
/**
* Make sure a multiple no ack regions conflate properly.
* [bruce] disabled when use of this dunit test class was reenabled in
* the splitbrainNov07 branch. The class had been disabled since
* June 2006 r13222 in the trunk. This test is failing because conflation
* isn't kicking in for some reason.
*/
public void donottestMultipleRegionConflation() throws Throwable {
try {
doTestMultipleRegionConflation();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable t) {
getLogWriter().error("Encountered exception: ", t);
throw t;
}
finally {
// make sure other vm was notified even if test failed
getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
public void run() {
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
}
}
});
}
}
protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
private void doTestMultipleRegionConflation() throws Exception {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setEnableAsyncConflation(true);
final Region r1 = createRootRegion("slowrec1", factory.create());
final Region r2 = createRootRegion("slowrec2", factory.create());
assertTrue(getSystem().isConnected());
assertNotNull(r1);
assertFalse(r1.isDestroyed());
assertNotNull(getCache());
assertNotNull(getCache().getRegion("slowrec1"));
assertNotNull(r2);
assertFalse(r2.isDestroyed());
assertNotNull(getCache());
assertNotNull(getCache().getRegion("slowrec2"));
final DM dm = getSystem().getDistributionManager();
final Serializable controllerVM = dm.getDistributionManagerId();
final DMStats stats = dm.getStats();
final int millisToWait = 1000 * 60 * 5; // 5 minutes
// set others before vm0 connects
long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
// create receiver in vm0 with queuing enabled
final Properties p = new Properties();
p.setProperty("async-distribution-timeout", "5");
p.setProperty("async-queue-timeout", "86400000"); // max value
p.setProperty("async-max-queue-size", "1024"); // max value
getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
public void run2() throws CacheException {
getSystem(p);
DM dm = getSystem().getDistributionManager();
assertTrue(dm.getDistributionManagerIds().contains(controllerVM));
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_NO_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
doTestMultipleRegionConflation_R1_Listener = new ControlListener();
af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
createRootRegion("slowrec1", af.create());
doTestMultipleRegionConflation_R2_Listener = new ControlListener();
af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
createRootRegion("slowrec2", af.create());
}
});
// put vm0 cache listener into wait
getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0 into wait");
r1.put(KEY_WAIT, new Integer(millisToWait));
// build up queue size
getLogWriter().info("[doTestMultipleRegionConflation] building up queue size...");
final Object key = "key";
final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
final int VALUE_SIZE = socketBufferSize*3;
//final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
final byte[] value = new byte[VALUE_SIZE];
int count = 0;
while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
count++;
r1.put(key, value);
}
getLogWriter().info("[doTestMultipleRegionConflation] After " +
count + " puts of size " + VALUE_SIZE +
" slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
// put values that will be asserted
final Object key1 = "key1";
final Object key2 = "key2";
Object putKey = key1;
boolean flag = true;
for (int i = 0; i < 30; i++) {
if (i == 10) putKey = key2;
if (flag) {
if (i == 6) {
r1.invalidate(putKey, new Integer(i));
} else if (i == 24) {
r1.invalidateRegion(new Integer(i));
} else {
r1.put(putKey, value, new Integer(i));
}
} else {
if (i == 15) {
r2.destroy(putKey, new Integer(i));
} else {
r2.put(putKey, value, new Integer(i));
}
}
flag = !flag;
}
// r1: key1, 0, create
// r1: key1, 4, update
// r1: key1, 6, invalidate
// r1: key1, 8, update
// r1: key2, 10, create
// r1: 24, invalidateRegion
// r1: key2, 28, update
// r2: key1, 1, create
// r2: key1, 9, update
// r2: key2, 11, create
// r2: key2, 13, update
// r2: key2, 15, destroy
// r2: key2, 17, create
// r2: key2, 29, update
final int[] r1ExpectedArgs = new int[] { 0, 4, 6, 8, 10, 24, 28 };
final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
{ CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE,
CALLBACK_CREATE, CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE };
final int[] r2ExpectedArgs = new int[] { 1, 9, 11, 13, 15, 17, 29 };
final int[] r2ExpectedTypes = new int[]
{ CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE, CALLBACK_UPDATE,
CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE };
// send notify to vm0
getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
public void run() {
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
}
}
});
// wait for queue to be flushed
getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
public void run() {
try {
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
while (doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() < r1ExpectedArgs.length) {
doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
}
}
synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
while (doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() < r2ExpectedArgs.length) {
doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
}
}
} catch (InterruptedException ignore) {fail("interrupted");}
}
});
// assert values on both listeners
getLogWriter().info("[doTestMultipleRegionConflation] assert callback arguments");
getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
public void run() {
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments=" + doTestMultipleRegionConflation_R1_Listener.callbackArguments);
getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes=" + doTestMultipleRegionConflation_R1_Listener.callbackTypes);
assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
int i = 0;
for (Iterator iter = doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator(); iter.hasNext();) {
CallbackWrapper wrapper = (CallbackWrapper) iter.next();
assertEquals(new Integer(r1ExpectedArgs[i]),
wrapper.callbackArgument);
assertEquals(new Integer(r1ExpectedTypes[i]),
doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
i++;
}
}
synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments=" + doTestMultipleRegionConflation_R2_Listener.callbackArguments);
getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes=" + doTestMultipleRegionConflation_R2_Listener.callbackTypes);
assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
int i = 0;
for (Iterator iter = doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator(); iter.hasNext();) {
CallbackWrapper wrapper = (CallbackWrapper) iter.next();
assertEquals(new Integer(r2ExpectedArgs[i]),
wrapper.callbackArgument);
assertEquals(new Integer(r2ExpectedTypes[i]),
doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
i++;
}
}
}
});
}
/**
* Make sure a disconnect causes queue memory to be released.
*/
public void testDisconnectCleanup() throws Throwable {
try {
doTestDisconnectCleanup();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable t) {
getLogWriter().error("Encountered exception: ", t);
throw t;
}
finally {
// make sure other vm was notified even if test failed
getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
public void run() {
synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
}
}
});
}
}
protected static ControlListener doTestDisconnectCleanup_Listener;
private void doTestDisconnectCleanup() throws Exception {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
final Region r = createRootRegion("slowrec", factory.create());
final DM dm = getSystem().getDistributionManager();
final DMStats stats = dm.getStats();
// set others before vm0 connects
final Set others = dm.getOtherDistributionManagerIds();
long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
final int initialQueues = stats.getAsyncQueues();
// create receiver in vm0 with queuing enabled
final Properties p = new Properties();
p.setProperty("async-distribution-timeout", "5");
p.setProperty("async-queue-timeout", "86400000"); // max value
p.setProperty("async-max-queue-size", "1024"); // max value
getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
public void run2() throws CacheException {
getSystem(p);
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_NO_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
doTestDisconnectCleanup_Listener = new ControlListener();
af.setCacheListener(doTestDisconnectCleanup_Listener);
createRootRegion("slowrec", af.create());
}
});
// put vm0 cache listener into wait
getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
int millisToWait = 1000 * 60 * 5; // 5 minutes
r.put(KEY_WAIT, new Integer(millisToWait));
r.put(KEY_DISCONNECT, KEY_DISCONNECT);
// build up queue size
getLogWriter().info("[testDisconnectCleanup] building up queue size...");
final Object key = "key";
final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
final int VALUE_SIZE = socketBufferSize*3;
//final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
final byte[] value = new byte[VALUE_SIZE];
int count = 0;
final long abortMillis = System.currentTimeMillis() + millisToWait;
while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
count++;
r.put(key, value);
assertFalse(System.currentTimeMillis() >= abortMillis);
}
getLogWriter().info("[testDisconnectCleanup] After " +
count + " puts of size " + VALUE_SIZE +
" slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
while (stats.getAsyncQueuedMsgs() < 10 ||
stats.getAsyncQueueSize() < VALUE_SIZE*10) {
count++;
r.put(key, value);
assertFalse(System.currentTimeMillis() >= abortMillis);
}
assertTrue(stats.getAsyncQueuedMsgs() >= 10);
while (stats.getAsyncQueues() < 1) {
pause(100);
assertFalse(System.currentTimeMillis() >= abortMillis);
}
getLogWriter().info("[testDisconnectCleanup] After " +
count + " puts of size " + VALUE_SIZE + " queue size has reached " +
stats.getAsyncQueueSize() + " bytes and number of queues is " +
stats.getAsyncQueues() + ".");
assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE*5));
assertEquals(initialQueues+1, stats.getAsyncQueues());
// assert vm0 is still connected
assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());
// send notify to vm0
getLogWriter().info("[testDisconnectCleanup] wake up vm0");
getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
public void run() {
synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
}
}
});
// make sure we lost a connection to vm0
getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return dm.getOtherDistributionManagerIds().size() <= others.size();
}
public String description() {
return "waiting for disconnect";
}
};
DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
assertEquals(others, dm.getOtherDistributionManagerIds());
// check free memory... perform wait loop with System.gc
getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
ev = new WaitCriterion() {
public boolean done() {
if (stats.getAsyncQueues() <= initialQueues) {
return true;
}
Runtime.getRuntime().gc();
return false;
}
public String description() {
return "waiting for queue cleanup";
}
};
DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
// getLogWriter().info("[testDisconnectCleanup] initialQueues=" +
// initialQueues + " asyncQueues=" + stats.getAsyncQueues());
assertEquals(initialQueues, stats.getAsyncQueues());
}
/**
* Make sure a disconnect causes queue memory to be released.<p>
* [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
* in build.xml in the splitbrainNov07 branch. It had been disabled since
* June 2006 due to hangs. Some of the tests, like this one, still need
* work because the periodically (some quite often) fail.
*/
public void donottestPartialMessage() throws Throwable {
try {
doTestPartialMessage();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable t) {
getLogWriter().error("Encountered exception: ", t);
throw t;
}
finally {
// make sure other vm was notified even if test failed
getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
public void run() {
synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
}
}
});
}
}
protected static ControlListener doTestPartialMessage_Listener;
private void doTestPartialMessage() throws Exception {
final AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setEnableAsyncConflation(true);
final Region r = createRootRegion("slowrec", factory.create());
final DM dm = getSystem().getDistributionManager();
final DMStats stats = dm.getStats();
// set others before vm0 connects
// final Set others = dm.getOtherDistributionManagerIds();
long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
// int initialQueues = stats.getAsyncQueues();
// create receiver in vm0 with queuing enabled
final Properties p = new Properties();
p.setProperty("async-distribution-timeout", String.valueOf(1000*4)); // 4 sec
p.setProperty("async-queue-timeout", "86400000"); // max value
p.setProperty("async-max-queue-size", "1024"); // max value
getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
public void run2() throws CacheException {
getSystem(p);
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_NO_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
doTestPartialMessage_Listener = new ControlListener();
af.setCacheListener(doTestPartialMessage_Listener);
createRootRegion("slowrec", af.create());
}
});
// put vm0 cache listener into wait
getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
final int millisToWait = 1000 * 60 * 5; // 5 minutes
r.put(KEY_WAIT, new Integer(millisToWait));
// build up queue size
getLogWriter().info("[testPartialMessage] building up queue size...");
final Object key = "key";
final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
final int VALUE_SIZE = socketBufferSize*3;
//1024 * 20; // 20 KB
final byte[] value = new byte[VALUE_SIZE];
int count = 0;
while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
count++;
r.put(key, value, new Integer(count));
}
final int partialId = count;
assertEquals(0, stats.getAsyncConflatedMsgs());
getLogWriter().info("[testPartialMessage] After " +
count + " puts of size " + VALUE_SIZE +
" slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
pause(2000);
// conflate 10 times
while (stats.getAsyncConflatedMsgs() < 10) {
count++;
r.put(key, value, new Integer(count));
if (count == partialId+1) {
// long begin = System.currentTimeMillis();
// while (stats.getAsyncQueues() < 1) {
// pause(100);
// assertFalse(System.currentTimeMillis() > begin+1000*10);
// }
assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
assertEquals(0, stats.getAsyncConflatedMsgs());
} else if (count == partialId+2) {
assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
assertEquals(1, stats.getAsyncConflatedMsgs());
}
}
final int conflateId = count;
final int[] expectedArgs = { partialId, conflateId };
// send notify to vm0
getLogWriter().info("[testPartialMessage] wake up vm0");
getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
public void run() {
synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
doTestPartialMessage_Listener.CONTROL_LOCK.notify();
}
}
});
// wait for queue to be flushed
getLogWriter().info("[testPartialMessage] wait for vm0");
getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
public void run() {
try {
synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
boolean done = false;
while (!done) {
if (doTestPartialMessage_Listener.callbackArguments.size()> 0) {
CallbackWrapper last = (CallbackWrapper)
doTestPartialMessage_Listener.callbackArguments.getLast();
Integer lastId = (Integer) last.callbackArgument;
if (lastId.intValue() == conflateId) {
done = true;
} else {
doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
}
} else {
doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
}
}
}
} catch (InterruptedException ignore) {fail("interrupted");}
}
});
// assert values on both listeners
getLogWriter().info("[testPartialMessage] assert callback arguments");
getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
public void run() {
synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
getLogWriter().info("[testPartialMessage] " +
"doTestPartialMessage_Listener.callbackArguments=" +
doTestPartialMessage_Listener.callbackArguments);
assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
doTestPartialMessage_Listener.callbackTypes.size());
int i = 0;
Iterator argIter =
doTestPartialMessage_Listener.callbackArguments.iterator();
Iterator typeIter =
doTestPartialMessage_Listener.callbackTypes.iterator();
while (argIter.hasNext()) {
CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
Integer arg = (Integer) wrapper.callbackArgument;
typeIter.next(); // Integer type
if (arg.intValue() < partialId) {
continue;
}
assertEquals(new Integer(expectedArgs[i]), arg);
//assertEquals(CALLBACK_UPDATE_INTEGER, type);
i++;
}
}
}
});
}
}