blob: e4a7d58c93ff2337364503447b4bc258a2acdf8f [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.util.Properties;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.util.CacheWriterAdapter;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
/**
* Tests the basic use cases for PR persistence.
* @author dsmith
*
*/
public class PersistPRKRFDUnitTest extends PersistentPartitionedRegionTestBase {
private static final int NUM_BUCKETS = 15;
private static final int MAX_WAIT = 30 * 1000;
static Object lockObject = new Object();
public PersistPRKRFDUnitTest(String name) {
super(name);
}
/**
* do a put/modify/destroy while closing disk store
*
* to turn on debug, add following parameter in local.conf:
* hydra.VmPrms-extraVMArgs += "-Ddisk.KRF_DEBUG=true";
*/
public void testCloseDiskStoreWhenPut() {
final String title = "testCloseDiskStoreWhenPut:";
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
createPR(vm0, 0);
createData(vm0, 0, 10, "a");
vm0.invoke(new CacheSerializableRunnable(title+"server add writer") {
public void run2() throws CacheException {
Region region = getRootRegion(PR_REGION_NAME);
// let the region to hold on the put until diskstore is closed
if (!DiskStoreImpl.KRF_DEBUG) {
region.getAttributesMutator().setCacheWriter(new MyWriter());
}
}
});
// create test
AsyncInvocation async1 = vm0.invokeAsync(new CacheSerializableRunnable(title+"async create") {
public void run2() throws CacheException {
Region region = getRootRegion(PR_REGION_NAME);
ExpectedException expect = addExpectedException("CacheClosedException");
try {
region.put(10, "b");
fail("Expect CacheClosedException here");
} catch (CacheClosedException cce) {
System.out.println(title+cce.getMessage());
if (DiskStoreImpl.KRF_DEBUG) {
assert cce.getMessage().contains("The disk store is closed.");
} else {
assert cce.getMessage().contains("The disk store is closed");
}
} finally {
expect.remove();
}
}
});
vm0.invoke(new CacheSerializableRunnable(title+"close disk store") {
public void run2() throws CacheException {
GemFireCacheImpl gfc = (GemFireCacheImpl)getCache();
pause(500);
gfc.closeDiskStores();
synchronized(lockObject) {
lockObject.notify();
}
}
});
DistributedTestCase.join(async1, MAX_WAIT, getLogWriter());
closeCache(vm0);
// update
createPR(vm0, 0);
vm0.invoke(new CacheSerializableRunnable(title+"server add writer") {
public void run2() throws CacheException {
Region region = getRootRegion(PR_REGION_NAME);
// let the region to hold on the put until diskstore is closed
if (!DiskStoreImpl.KRF_DEBUG) {
region.getAttributesMutator().setCacheWriter(new MyWriter());
}
}
});
async1 = vm0.invokeAsync(new CacheSerializableRunnable(title+"async update") {
public void run2() throws CacheException {
Region region = getRootRegion(PR_REGION_NAME);
ExpectedException expect = addExpectedException("CacheClosedException");
try {
region.put(1, "b");
fail("Expect CacheClosedException here");
} catch (CacheClosedException cce) {
System.out.println(title+cce.getMessage());
if (DiskStoreImpl.KRF_DEBUG) {
assert cce.getMessage().contains("The disk store is closed.");
} else {
assert cce.getMessage().contains("The disk store is closed");
}
} finally {
expect.remove();
}
}
});
vm0.invoke(new CacheSerializableRunnable(title+"close disk store") {
public void run2() throws CacheException {
GemFireCacheImpl gfc = (GemFireCacheImpl)getCache();
pause(500);
gfc.closeDiskStores();
synchronized(lockObject) {
lockObject.notify();
}
}
});
DistributedTestCase.join(async1, MAX_WAIT, getLogWriter());
closeCache(vm0);
// destroy
createPR(vm0, 0);
vm0.invoke(new CacheSerializableRunnable(title+"server add writer") {
public void run2() throws CacheException {
Region region = getRootRegion(PR_REGION_NAME);
// let the region to hold on the put until diskstore is closed
if (!DiskStoreImpl.KRF_DEBUG) {
region.getAttributesMutator().setCacheWriter(new MyWriter());
}
}
});
async1 = vm0.invokeAsync(new CacheSerializableRunnable(title+"async destroy") {
public void run2() throws CacheException {
Region region = getRootRegion(PR_REGION_NAME);
ExpectedException expect = addExpectedException("CacheClosedException");
try {
region.destroy(2, "b");
fail("Expect CacheClosedException here");
} catch (CacheClosedException cce) {
System.out.println(title+cce.getMessage());
if (DiskStoreImpl.KRF_DEBUG) {
assert cce.getMessage().contains("The disk store is closed.");
} else {
assert cce.getMessage().contains("The disk store is closed");
}
} finally {
expect.remove();
}
}
});
vm0.invoke(new CacheSerializableRunnable(title+"close disk store") {
public void run2() throws CacheException {
GemFireCacheImpl gfc = (GemFireCacheImpl)getCache();
pause(500);
gfc.closeDiskStores();
synchronized(lockObject) {
lockObject.notify();
}
}
});
DistributedTestCase.join(async1, MAX_WAIT, getLogWriter());
checkData(vm0, 0, 10, "a");
checkData(vm0, 10, 11, null);
closeCache(vm0);
}
private static class MyWriter extends CacheWriterAdapter implements Declarable {
public MyWriter() {
}
public void init(Properties props) {
}
public void beforeCreate(EntryEvent event) {
try {
synchronized(lockObject) {
lockObject.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void beforeUpdate(EntryEvent event) {
try {
synchronized(lockObject) {
lockObject.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void beforeDestroy(EntryEvent event) {
try {
synchronized(lockObject) {
lockObject.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}