blob: 8a8dd4ce067917f01faaa1a86fd6bf36b694ac5e [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.internal.cache.InitialImageOperation.ImageReplyMessage;
import dunit.AsyncInvocation;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
*/
public class GIIFlowControlDUnitTest extends CacheTestCase {
protected static final String REGION_NAME = "region";
private static final long MAX_WAIT = 10 * 1000;
private static int origChunkSize = InitialImageOperation.CHUNK_SIZE_IN_BYTES;
private static int origNumChunks = InitialImageOperation.CHUNK_PERMITS;
protected static FlowControlObserver observer;
/**
* @param name
*/
public GIIFlowControlDUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
invokeInEveryVM(new SerializableRunnable("reset chunk size") {
public void run() {
InitialImageOperation.CHUNK_SIZE_IN_BYTES = origChunkSize;
InitialImageOperation.CHUNK_PERMITS = origNumChunks;
}
});
super.tearDown2();
}
public void testLotsOfChunks() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
invokeInEveryVM(new SerializableRunnable("reset chunk size") {
public void run() {
InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
InitialImageOperation.CHUNK_PERMITS = 2;
}
});
createRegion(vm0);
createData(vm0, 0, 50, "1234567890");
createRegion(vm1);
closeCache(vm0);
}
public void testFlowControlHappening() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
invokeInEveryVM(new SerializableRunnable("set chunk size") {
public void run() {
InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
InitialImageOperation.CHUNK_PERMITS = 2;
}
});
vm1.invoke(new SerializableRunnable("Add flow control observer") {
public void run() {
observer = new FlowControlObserver();
DistributionMessageObserver.setInstance(observer);
getCache();
observer.start();
}
});
createRegion(vm0);
createData(vm0, 0, 50, "1234567890");
AsyncInvocation async1 = createRegionAsync(vm1);
async1.join(100);
assertTrue(async1.isAlive());
vm1.invoke(new SerializableRunnable("Wait for chunks") {
public void run() {
waitForCriterion(new WaitCriterion(){
public String description() {
return "Waiting for messages to be at least 2: " + observer.messageCount.get();
}
public boolean done() {
return observer.messageCount.get() >= 2;
}
}, MAX_WAIT, 100, true);
//Make sure no more messages show up
try {
Thread.sleep(500);
} catch (InterruptedException e) {
fail("interrupted", e);
}
assertEquals(2, observer.messageCount.get());
observer.allowMessages.countDown();
}
});
async1.getResult(MAX_WAIT);
vm1.invoke(new SerializableRunnable("Add flow control observer") {
public void run() {
assertTrue("Message count should be greater than 2 now", observer.messageCount.get() > 2);
}
});
closeCache(vm0);
}
public void testKillSenderNoHang() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
invokeInEveryVM(new SerializableRunnable("set chunk size") {
public void run() {
InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
InitialImageOperation.CHUNK_PERMITS = 2;
}
});
vm1.invoke(new SerializableRunnable("Add flow control observer") {
public void run() {
observer = new FlowControlObserver();
DistributionMessageObserver.setInstance(observer);
getCache();
observer.start();
}
});
createRegion(vm0);
createData(vm0, 0, 50, "1234567890");
AsyncInvocation async1 = createRegionAsync(vm1);
vm1.invoke(new SerializableRunnable("Wait to flow control messages") {
public void run() {
waitForCriterion(new WaitCriterion(){
public String description() {
return "Waiting for messages to be at least 2: " + observer.messageCount.get();
}
public boolean done() {
return observer.messageCount.get() >= 2;
}
}, MAX_WAIT, 100, true);
//Make sure no more messages show up
try {
Thread.sleep(500);
} catch (InterruptedException e) {
fail("interrupted", e);
}
assertEquals(2, observer.messageCount.get());
}
});
closeCache(vm0);
vm1.invoke(new SerializableRunnable("release flow control") {
public void run() {
observer.allowMessages.countDown();
}
});
//We should now finish.
async1.getResult(MAX_WAIT);
}
// DISABLED due to high failure rate due, apparently, to problems
// with the flow-control statistics. See internal ticket #52221
public void disabledtestCloseReceiverCacheNoHang() throws Throwable {
doCloseTest(false);
}
// DISABLED due to high failure rate due, apparently, to problems
// with the flow-control statistics. See internal ticket #52221
public void disabledtestDisconnectReceiverNoHang() throws Throwable {
doCloseTest(true);
}
public void doCloseTest(boolean disconnect) throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
invokeInEveryVM(new SerializableRunnable("set chunk size") {
public void run() {
InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
InitialImageOperation.CHUNK_PERMITS = 2;
}
});
InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
InitialImageOperation.CHUNK_PERMITS = 2;
vm1.invoke(new SerializableRunnable("Add flow control observer") {
public void run() {
observer = new FlowControlObserver();
DistributionMessageObserver.setInstance(observer);
getCache();
observer.start();
}
});
ExpectedException expectedEx = null;
try {
createRegion(vm0);
createData(vm0, 0, 50, "1234567890");
createRegionAsync(vm1);
vm1.invoke(new SerializableRunnable("Wait to flow control messages") {
public void run() {
waitForCriterion(new WaitCriterion(){
public String description() {
return "Waiting for messages to be at least 2: " + observer.messageCount.get();
}
public boolean done() {
return observer.messageCount.get() >= 2;
}
}, MAX_WAIT, 100, true);
//Make sure no more messages show up
try {
Thread.sleep(500);
} catch (InterruptedException e) {
fail("interrupted", e);
}
assertEquals(2, observer.messageCount.get());
}
});
try {
vm0.invoke(new SerializableRunnable("check for in progress messages") {
public void run() {
final DMStats stats = getSystem().getDMStats();
assertEquals(2, stats.getInitialImageMessagesInFlight());
}
});
} catch (Exception e) {
vm1.invoke(new SerializableRunnable("release flow control due to exception") {
public void run() {
observer.allowMessages.countDown();
}
});
throw e;
}
expectedEx = addExpectedException(InterruptedException.class.getName(),
vm1);
if(disconnect) {
disconnect(vm1);
} else {
closeCache(vm1);
}
vm1.invoke(new SerializableRunnable("release flow control") {
public void run() {
observer.allowMessages.countDown();
}
});
vm0.invoke(new SerializableRunnable("check for in progress messages") {
public void run() {
final DMStats stats = getSystem().getDMStats();
waitForCriterion(new WaitCriterion() {
public boolean done() {
return stats.getInitialImageMessagesInFlight() == 0;
}
public String description() {
return "Timeout waiting for all initial image messages to be processed: "
+ stats.getInitialImageMessagesInFlight();
}
}, MAX_WAIT, 100, true);
}
});
} finally {
if (expectedEx != null) {
expectedEx.remove();
}
}
}
//TODO Test destroying either side of the DS during the flow control, no hangs.
protected void closeCache(final VM vm) {
SerializableRunnable closeCache = new SerializableRunnable("close cache") {
public void run() {
Cache cache = getCache();
cache.close();
}
};
vm.invoke(closeCache);
}
protected void disconnect(final VM vm) {
SerializableRunnable closeCache = new SerializableRunnable("close cache") {
public void run() {
disconnectFromDS();
}
};
vm.invoke(closeCache);
}
private void createRegion(VM vm) throws Throwable {
SerializableRunnable createRegion = getCreateRegionRunnable();
vm.invoke(createRegion);
}
private SerializableRunnable getCreateRegionRunnable() {
SerializableRunnable createRegion = new SerializableRunnable("Create non persistent region") {
public void run() {
getCache();
RegionFactory rf = new RegionFactory();
rf.setDataPolicy(DataPolicy.REPLICATE);
rf.setScope(Scope.DISTRIBUTED_ACK);
rf.create(REGION_NAME);
}
};
return createRegion;
}
private AsyncInvocation createRegionAsync(VM vm) throws Throwable {
SerializableRunnable createRegion = getCreateRegionRunnable();
return vm.invokeAsync(createRegion);
}
protected void createData(VM vm, final int startKey, final int endKey,
final Object value) {
SerializableRunnable createData = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(REGION_NAME);
for(int i =startKey; i < endKey; i++) {
region.put(i, value);
}
}
};
vm.invoke(createData);
}
protected void checkData(VM vm0, final int startKey, final int endKey,
final String value) {
SerializableRunnable checkData = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(REGION_NAME);
for(int i =startKey; i < endKey; i++) {
assertEquals("On key " + i, value, region.get(i));
}
}
};
vm0.invoke(checkData);
}
private static class FlowControlObserver extends DistributionMessageObserver {
CountDownLatch allowMessages = new CountDownLatch(1);
AtomicInteger messageCount = new AtomicInteger();
private volatile boolean started = false;
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage message) {
if(started && message instanceof ImageReplyMessage) {
messageCount.incrementAndGet();
try {
allowMessages.await();
} catch (InterruptedException e) {
fail("Interrupted", e);
}
}
}
public void start() {
started = true;
}
}
}