blob: 62e16c55da7db9172b3fc248d55961a91cc7bac5 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.persistence;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import util.TestException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.CacheObserverAdapter;
import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
import com.gemstone.gemfire.internal.cache.DiskRegion;
import com.gemstone.gemfire.internal.cache.DiskStoreFactoryImpl;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
import com.gemstone.gemfire.internal.cache.DiskStoreObserver;
import com.gemstone.gemfire.internal.cache.EntrySnapshot;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.InitialImageOperation;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.RegionEntry;
import com.gemstone.gemfire.internal.cache.Token.Tombstone;
import com.gemstone.gemfire.internal.cache.TombstoneService;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import dunit.AsyncInvocation;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase {
private static final int TEST_REPLICATED_TOMBSTONE_TIMEOUT = 1000;
public PersistentRVVRecoveryDUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
super.tearDown2();
invokeInEveryVM(PersistentRecoveryOrderDUnitTest.class, "resetAckWaitThreshold");
}
public void testNoConcurrencyChecks () {
Cache cache = getCache();
RegionFactory rf = new RegionFactory();
rf.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
rf.setConcurrencyChecksEnabled(false);
try {
LocalRegion region = (LocalRegion) rf.create(REGION_NAME);
fail("Expected to get an IllegalStateException because concurrency checks can't be disabled");
} catch(IllegalStateException expected) {
//do nothing
}
}
/**
* Test that we can recover the RVV information with some normal
* usage.
*/
public void testRecoveryWithKRF() throws Throwable {
doTestRecovery(new Runnable() {
@Override
public void run() {
//do nothing
}
});
}
/**
* Test that we can recover the RVV information if the krf is missing
*/
public void testRecoveryWithoutKRF() throws Throwable {
doTestRecovery(new Runnable() {
@Override
public void run() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
deleteKRFs(vm0);
}
});
}
/**
* Test that we correctly recover and expire recovered tombstones, with compaction enabled
*/
public void testLotsOfTombstones() throws Throwable {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
//I think we just need to assert the number of tombstones, maybe?
//Bruce has code that won't even let the tombstones expire for 10 minutes
//That means on recovery we need to recover them all? Or do we need to recover
//any? We're doing a GII. Won't we GII tombstones anyway? Ahh, but we need
//to know that we don't need to record the new tombstones.
LocalRegion region = createRegion(vm0);
int initialCount = getTombstoneCount(region);
assertEquals(0, initialCount);
final int entryCount = 20;
for(int i =0 ; i < entryCount; i++) {
region.put(i, new byte[100]);
//destroy each entry.
region.destroy(i);
}
assertEquals(entryCount, getTombstoneCount(region));
//roll to a new oplog
region.getDiskStore().forceRoll();
//Force a compaction. This should do nothing, because
//The tombstones are not garbage, so only 50% of the oplog
//is garbage (the creates).
region.getDiskStore().forceCompaction();
assertEquals(0, region.getDiskStore().numCompactableOplogs());
assertEquals(entryCount, getTombstoneCount(region));
getCache().close();
region = createRegion(vm0);
assertEquals(entryCount, getTombstoneCount(region));
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
TombstoneService tombstoneService = cache.getTombstoneService();
//Before expiring tombstones, no oplogs are available for compaction
assertEquals(0, region.getDiskStore().numCompactableOplogs());
region.getDiskStore().forceCompaction();
assertTrue(tombstoneService.forceBatchExpirationForTests(entryCount/2));
assertEquals(entryCount/2, getTombstoneCount(region));
//After expiring, we should have an oplog available for compaction.
assertEquals(1, region.getDiskStore().numCompactableOplogs());
//Test after restart the tombstones are still missing
getCache().close();
region = createRegion(vm0);
assertEquals(entryCount/2, getTombstoneCount(region));
//We should have an oplog available for compaction, because the tombstones
//were garbage collected
assertEquals(1, region.getDiskStore().numCompactableOplogs());
//This should compact some oplogs
region.getDiskStore().forceCompaction();
assertEquals(0, region.getDiskStore().numCompactableOplogs());
//Restart again, and make sure the compaction didn't mess up our tombstone
//count
getCache().close();
region = createRegion(vm0);
assertEquals(entryCount/2, getTombstoneCount(region));
cache = (GemFireCacheImpl) getCache();
//Add a test hook that will shutdown the system as soon as we write a GC RVV record
DiskStoreObserver.setInstance(new DiskStoreObserver() {
@Override
public void afterWriteGCRVV(DiskRegion dr) {
//This will cause the disk store to shut down,
//preventing us from writing any other records.
throw new DiskAccessException();
}
});
ExpectedException ex = addExpectedException("DiskAccessException");
try {
//Force expiration, with our test hook that should close the cache
tombstoneService = cache.getTombstoneService();
tombstoneService.forceBatchExpirationForTests(entryCount/4);
getCache().close();
assertTrue(cache.isClosed());
//Restart again, and make sure the tombstones are in fact removed
region = createRegion(vm0);
assertEquals(entryCount/4, getTombstoneCount(region));
} finally {
ex.remove();
}
}
/**
* Test that we correctly recover and expire recovered tombstones, with
* compaction enabled.
*
* This test differs from above test in that we need to make sure tombstones
* start expiring based on their original time-stamp, NOT the time-stamp
* assigned during scheduling for expiration after recovery.
*/
public void DISABLED_testLotsOfTombstonesExpiration() throws Throwable {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
vm0.invoke(new CacheSerializableRunnable("") {
@Override
public void run2() throws CacheException {
// TODO Auto-generated method stub
long replicatedTombstoneTomeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
long expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
try {
LocalRegion region = createRegion(vm0);
int initialCount = getTombstoneCount(region);
assertEquals(0, initialCount);
final int entryCount = 20;
for (int i = 0; i < entryCount; i++) {
region.put(i, new byte[100]);
// destroy each entry.
region.destroy(i);
}
assertEquals(entryCount, getTombstoneCount(region));
// roll to a new oplog
region.getDiskStore().forceRoll();
// Force a compaction. This should do nothing, because
// The tombstones are not garbage, so only 50% of the oplog
// is garbage (the creates).
region.getDiskStore().forceCompaction();
assertEquals(0, region.getDiskStore().numCompactableOplogs());
assertEquals(entryCount, getTombstoneCount(region));
getCache().close();
// We should wait for timeout time so that tomstones are expired
// right away when they are gIId based on their original timestamp.
pause((int) TEST_REPLICATED_TOMBSTONE_TIMEOUT);
TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = TEST_REPLICATED_TOMBSTONE_TIMEOUT;
TombstoneService.EXPIRED_TOMBSTONE_LIMIT = entryCount;
// Do region GII
region = createRegion(vm0);
assertEquals(entryCount, getTombstoneCount(region));
getCache().getLogger().fine("Waiting for maximumSleepTime ms");
pause(10000); // maximumSleepTime+500 in TombstoneSweeper GC thread
// Tombstones should have been expired and garbage collected by now by
// TombstoneService.
assertEquals(0, getTombstoneCount(region));
// This should compact some oplogs
region.getDiskStore().forceCompaction();
assertEquals(0, region.getDiskStore().numCompactableOplogs());
// Test after restart the tombstones are still missing
getCache().close();
region = createRegion(vm0);
assertEquals(0, getTombstoneCount(region));
// We should have an oplog available for compaction, because the
// tombstones
// were garbage collected
assertEquals(0, region.getDiskStore().numCompactableOplogs());
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.close();
} finally {
TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = replicatedTombstoneTomeout;
TombstoneService.EXPIRED_TOMBSTONE_LIMIT = expiriredTombstoneLimit;
}
}
});
}
/**
* This test creates 2 VMs in a distributed system with a persistent
* PartitionedRegion and one VM (VM1) puts an entry in region. Second VM (VM2)
* starts later and does a delta GII. During Delta GII in VM2 a DESTROY
* operation happens in VM1 and gets propagated to VM2 concurrently with GII.
* At this point if entry version is greater than the once received from GII
* then it must not get applied. Which is Bug #45921.
*
* @author shobhit
*/
public void testConflictChecksDuringConcurrentDeltaGIIAndOtherOp() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
vm0.invoke(new CacheSerializableRunnable("Create PR and put an entry") {
@Override
public void run2() throws CacheException {
Cache cache = getCache();
PartitionAttributes attrs = new PartitionAttributesFactory()
.setRedundantCopies(1).setTotalNumBuckets(1).create();
AttributesFactory factory = new AttributesFactory();
factory.setPartitionAttributes(attrs);
RegionAttributes rAttrs = factory.create();
Region region = cache.createRegionFactory(rAttrs).create("prRegion");
region.put("testKey", "testValue");
assertEquals(1, region.size());
}
});
// Create a cache and region, do an update to change the version no. and
// restart the cache and region.
vm1.invoke(new CacheSerializableRunnable("Create PR and put an entry") {
@Override
public void run2() throws CacheException {
Cache cache = getCache();
PartitionAttributes attrs = new PartitionAttributesFactory()
.setRedundantCopies(1).setTotalNumBuckets(1).create();
AttributesFactory factory = new AttributesFactory();
factory.setPartitionAttributes(attrs);
RegionAttributes rAttrs = factory.create();
Region region = cache.createRegionFactory(rAttrs).create("prRegion");
region.put("testKey", "testValue2");
cache.close();
//Restart
cache = getCache();
region = cache.createRegionFactory(rAttrs).create("prRegion");
}
});
// Do a DESTROY in vm0 when delta GII is in progress in vm1 (Hopefully, Not
// guaranteed).
AsyncInvocation async = vm0.invokeAsync(new CacheSerializableRunnable("Detroy entry in region") {
@Override
public void run2() throws CacheException {
Cache cache = getCache();
Region region = cache.getRegion("prRegion");
while (!region.get("testKey").equals("testValue2")) {
pause(100);
}
region.destroy("testKey");
}
});
try {
async.join(3000);
} catch (InterruptedException e) {
new TestException("VM1 entry destroy did not finish in 3000 ms");
}
vm1.invoke(new CacheSerializableRunnable("Verifying entry version in new node VM1") {
@Override
public void run2() throws CacheException {
Cache cache = getCache();
Region region = cache.getRegion("prRegion");
Region.Entry entry = ((PartitionedRegion)region).getEntry("testKey", true /*Entry is destroyed*/);
RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region));
assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
VersionTag tag = re.getVersionStamp().asVersionTag();
assertEquals(3 /*Two puts and a Destroy*/, tag.getEntryVersion());
}
});
closeCache(vm0);
closeCache(vm1);
}
private LocalRegion createRegion(final VM vm0) {
Cache cache = getCache();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File dir = getDiskDirForVM(vm0);
dir.mkdirs();
dsf.setDiskDirs(new File[] {dir});
dsf.setMaxOplogSize(1);
//Turn of automatic compaction
dsf.setAllowForceCompaction(true);
dsf.setAutoCompact(false);
//The compaction javadocs seem to be wrong. This
//is the amount of live data in the oplog
dsf.setCompactionThreshold(40);
DiskStore ds = dsf.create(REGION_NAME);
RegionFactory rf = new RegionFactory();
rf.setDiskStoreName(ds.getName());
rf.setDiskSynchronous(true);
rf.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
rf.setScope(Scope.DISTRIBUTED_ACK);
LocalRegion region = (LocalRegion) rf.create(REGION_NAME);
return region;
}
private int getTombstoneCount(LocalRegion region) {
int regionCount = region.getTombstoneCount();
int actualCount = 0;
for(RegionEntry entry : region.entries.regionEntries()) {
if(entry.isTombstone()) {
actualCount++;
}
}
assertEquals(actualCount, regionCount);
return actualCount;
}
public void doTestRecovery(Runnable doWhileOffline) throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
//Create the region in few members to test recovery
createPersistentRegion(vm0);
createPersistentRegion(vm1);
createPersistentRegion(vm2);
//Create and delete some keys (to update the RVV)
createData(vm0, 0,5, "value1");
createData(vm1, 3,8, "value2");
createData(vm2, 6,11, "value3");
delete(vm1, 0,1);
delete(vm0, 10,11);
//Make sure the RVVs are the same in the members
RegionVersionVector vm0RVV = getRVV(vm0);
RegionVersionVector vm1RVV = getRVV(vm1);
RegionVersionVector vm2RVV = getRVV(vm2);
assertSameRVV(vm0RVV, vm1RVV);
assertSameRVV(vm0RVV, vm2RVV);
//Closing the cache will ensure the disk store is closed
closeCache(vm2);
closeCache(vm1);
closeCache(vm0);
doWhileOffline.run();
//Make sure we can recover the RVV
createPersistentRegion(vm0);
RegionVersionVector new0RVV = getRVV(vm0);
assertSameRVV(vm0RVV, new0RVV);
assertEquals(vm0RVV.getOwnerId(), new0RVV.getOwnerId());
createData(vm0, 12, 15, "value");
//Make sure we can GII the RVV
new0RVV = getRVV(vm0);
assertSameRVV(new0RVV, getDiskRVV(vm0));
createPersistentRegion(vm1);
assertSameRVV(new0RVV, getRVV(vm1));
assertSameRVV(new0RVV, getDiskRVV(vm1));
closeCache(vm0);
closeCache(vm1);
doWhileOffline.run();
//Make the sure member that GII'd the RVV can also recover it
createPersistentRegion(vm1);
RegionVersionVector new1RVV = getRVV(vm1);
assertSameRVV(new0RVV, getRVV(vm1));
}
/**
* Test that we skip conflict checks with entries that are on
* disk compared to entries that come in as part of a GII
*/
public void testSkipConflictChecksForGIIdEntries() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
//Create the region in few members to test recovery
createPersistentRegion(vm0);
createPersistentRegion(vm1);
//Create an update some entries in vm0 and vm1.
createData(vm0, 0,1, "value1");
createData(vm0, 0,2, "value2");
closeCache(vm1);
//Reset the entry version in vm0.
//This means that if we did a conflict check, vm0's key will have
//a lower entry version than vm1, which would cause us to prefer vm1's
//value
SerializableRunnable createData = new SerializableRunnable("rollEntryVersion") {
public void run() {
Cache cache = getCache();
LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME);
region.put(0, "value3");
RegionEntry entry = region.getRegionEntry(0);
entry = region.getRegionEntry(0);
//Sneak in and change the version number for an entry to generate
//a conflict.
VersionTag tag = entry.getVersionStamp().asVersionTag();
tag.setEntryVersion(tag.getEntryVersion() - 2);
entry.getVersionStamp().setVersions(tag);
}
};
vm0.invoke(createData);
//Create vm1, doing a GII
createPersistentRegion(vm1);
checkData(vm0, 0, 1, "value3");
//If we did a conflict check, this would be value2
checkData(vm1, 0, 1, "value3");
}
/**
* Test that we skip conflict checks with entries that are on
* disk compared to entries that come in as part of a concurrent operation
*/
public void testSkipConflictChecksForConcurrentOps() throws Throwable {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
//Create the region in few members to test recovery
createPersistentRegion(vm0);
createPersistentRegion(vm1);
//Create an update some entries in vm0 and vm1.
createData(vm0, 0,1, "value1");
createData(vm0, 0,1, "value2");
createData(vm0, 0,1, "value2");
closeCache(vm1);
//Update the keys in vm0 until the entry version rolls over.
//This means that if we did a conflict check, vm0's key will have
//a lower entry version than vm1, which would cause us to prefer vm1's
//value
SerializableRunnable createData = new SerializableRunnable("rollEntryVersion") {
public void run() {
Cache cache = getCache();
LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME);
region.put(0, "value3");
RegionEntry entry = region.getRegionEntry(0);
entry = region.getRegionEntry(0);
//Sneak in and change the version number for an entry to generate
//a conflict.
VersionTag tag = entry.getVersionStamp().asVersionTag();
tag.setEntryVersion(tag.getEntryVersion() - 2);
entry.getVersionStamp().setVersions(tag);
}
};
vm0.invoke(createData);
//Add an observer to vm0 which will perform a concurrent operation during
//the GII. If we do a conflict check, this operation will be rejected
//because it will have a lower entry version that what vm1 recovered from
//disk
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage msg) {
if(msg instanceof InitialImageOperation.RequestImageMessage) {
if(((InitialImageOperation.RequestImageMessage) msg).regionPath.contains(REGION_NAME)) {
createData(vm0, 0, 1, "value4");
DistributionMessageObserver.setInstance(null);
}
}
}
});
}
});
//Create vm1, doing a GII
createPersistentRegion(vm1);
//If we did a conflict check, this would be value2
checkData(vm0, 0, 1, "value4");
checkData(vm1, 0, 1, "value4");
}
/**
* Test that with concurrent updates to an async disk region,
* we correctly update the RVV On disk
*/
public void testUpdateRVVWithAsyncPersistence() throws Throwable {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(1);
SerializableRunnable createRegion = new SerializableRunnable("Create persistent region") {
public void run() {
Cache cache = getCache();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File dir = getDiskDirForVM(vm0);
dir.mkdirs();
dsf.setDiskDirs(new File[] {dir});
dsf.setMaxOplogSize(1);
dsf.setQueueSize(100);
dsf.setTimeInterval(1000);
DiskStore ds = dsf.create(REGION_NAME);
RegionFactory rf = new RegionFactory();
rf.setDiskStoreName(ds.getName());
rf.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
rf.setScope(Scope.DISTRIBUTED_ACK);
rf.setDiskSynchronous(false);
rf.create(REGION_NAME);
}
};
//Create a region with async persistence
vm0.invoke(createRegion);
//In two different threads, perform updates to the same key on the same region
AsyncInvocation ins0 = vm0.invokeAsync(new SerializableRunnable("change the entry at vm0") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(REGION_NAME);
for (int i=0; i<500; i++) {
region.put("A", "vm0-"+i);
}
}
});
AsyncInvocation ins1 = vm0.invokeAsync(new SerializableRunnable("change the entry at vm1") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(REGION_NAME);
for (int i=0; i<500; i++) {
region.put("A", "vm1-"+i);
}
}
});
//Wait for the update threads to finish.
ins0.getResult(MAX_WAIT);
ins1.getResult(MAX_WAIT);
//Make sure the async queue is flushed to disk
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore(REGION_NAME);
ds.flush();
}
});
//Assert that the disk has seen all of the updates
RegionVersionVector rvv = getRVV(vm0);
RegionVersionVector diskRVV = getDiskRVV(vm0);
assertSameRVV(rvv, diskRVV);
//Bounce the cache and make the same assertion
closeCache(vm0);
vm0.invoke(createRegion);
//Assert that the recovered RVV is the same as before the restart
RegionVersionVector rvv2 = getRVV(vm0);
assertSameRVV(rvv, rvv2);
//The disk RVV should also match.
RegionVersionVector diskRVV2 = getDiskRVV(vm0);
assertSameRVV(rvv2, diskRVV2);
}
/**
* Test that when we generate a krf, we write the version tag
* that matches the entry in the crf.
*/
public void testWriteCorrectVersionToKrf() throws Throwable {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(1);
final LocalRegion region = (LocalRegion) createAsyncRegionWithSmallQueue(vm0);
//The idea here is to do a bunch of puts with async persistence
//At some point the oplog will switch. At that time, we wait for a krf
//to be created and then throw an exception to shutdown the disk store.
//
//This should cause us to create a krf with some entries that have been
//modified since the crf was written and are still in the async queue.
//
//To avoid deadlocks, we need to mark that the oplog was switched,
//and then do the wait in the flusher thread.
//Setup the callbacks to wait for krf creation and throw an exception
ExpectedException ex = addExpectedException("DiskAccessException");
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER=true;
try {
final CountDownLatch krfCreated = new CountDownLatch(1);
final AtomicBoolean oplogSwitched = new AtomicBoolean(false);
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
@Override
public void afterKrfCreated() {
krfCreated.countDown();
}
@Override
public void afterWritingBytes() {
if(oplogSwitched.get()) {
try {
if(!krfCreated.await(3000, TimeUnit.SECONDS)) {
fail("KRF was not created in 30 seconds!");
}
} catch (InterruptedException e) {
fail("interrupted");
}
//Force a failure
throw new DiskAccessException();
}
}
@Override
public void afterSwitchingOplog() {
oplogSwitched.set(true);
}
});
//This is just to make sure the first oplog is not completely garbage.
region.put("testkey", "key");
//Do some puts to trigger an oplog roll.
try {
//Starting with a value of 1 means the value should match
//the region version for easier debugging.
int i = 1;
while(krfCreated.getCount() > 0) {
i++;
region.put("key" + (i % 3), i);
Thread.sleep(2);
}
} catch(CacheClosedException expected) {
//do nothing
}
//Wait for the region to be destroyed. The region won't be destroyed
//until the async flusher thread ends up switching oplogs
waitForCriterion(new WaitCriterion() {
@Override
public boolean done() {
return region.isDestroyed();
}
@Override
public String description() {
return "Region was not destroyed : " + region.isDestroyed();
}
}, 3000 * 1000, 100, true);
closeCache();
} finally {
ex.remove();
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER=false;
CacheObserverHolder.setInstance(null);
}
//Get the version tags from the krf
LocalRegion recoveredRegion = (LocalRegion) createAsyncRegionWithSmallQueue(vm0);
VersionTag[] tagsFromKrf = new VersionTag[3];
for(int i = 0; i < 3; i++) {
NonTXEntry entry = (NonTXEntry) recoveredRegion.getEntry("key" + i);
tagsFromKrf[i] = entry.getRegionEntry().getVersionStamp().asVersionTag();
getLogWriter().info("krfTag[" + i + "]="+ tagsFromKrf[i] + ",value=" + entry.getValue());
}
closeCache();
//Set a system property to skip recovering from the krf so we can
//get the version tag from the crf.
System.setProperty(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, "true");
try {
//Get the version tags from the crf
recoveredRegion = (LocalRegion) createAsyncRegionWithSmallQueue(vm0);
VersionTag[] tagsFromCrf = new VersionTag[3];
for(int i = 0; i < 3; i++) {
NonTXEntry entry = (NonTXEntry) recoveredRegion.getEntry("key" + i);
tagsFromCrf[i] = entry.getRegionEntry().getVersionStamp().asVersionTag();
getLogWriter().info("crfTag[" + i + "]="+ tagsFromCrf[i] + ",value=" + entry.getValue());
}
//Make sure the version tags from the krf and the crf match.
for(int i = 0; i < 3; i++) {
assertEquals(tagsFromCrf[i], tagsFromKrf[i]);
}
} finally {
System.setProperty(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, "false");
}
}
private Region createAsyncRegionWithSmallQueue(final VM vm0) {
Cache cache = getCache();
DiskStoreFactoryImpl dsf = (DiskStoreFactoryImpl) cache.createDiskStoreFactory();
File dir = getDiskDirForVM(vm0);
dir.mkdirs();
dsf.setDiskDirs(new File[] {dir});
dsf.setMaxOplogSizeInBytes(500);
dsf.setQueueSize(1000);
dsf.setTimeInterval(1000);
DiskStore ds = dsf.create(REGION_NAME);
RegionFactory rf = new RegionFactory();
rf.setDiskStoreName(ds.getName());
rf.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
rf.setScope(Scope.DISTRIBUTED_ACK);
rf.setDiskSynchronous(false);
Region region = rf.create(REGION_NAME);
return region;
}
private void deleteKRFs(final VM vm0) {
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
File file = getDiskDirForVM(vm0);
File[] krfs = file.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith(".krf");
}
});
assertTrue(krfs.length > 0);
for(File krf : krfs) {
assertTrue(krf.delete());
}
}
});
}
private void assertSameRVV(RegionVersionVector rvv1,
RegionVersionVector rvv2) {
if(!rvv1.sameAs(rvv2)) {
fail("Expected " + rvv1 + " but was " + rvv2);
}
}
protected void createData(VM vm, final int startKey, final int endKey,
final String value) {
SerializableRunnable createData = new SerializableRunnable("createData") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(REGION_NAME);
for(int i =startKey; i < endKey; i++) {
region.put(i, value);
}
}
};
vm.invoke(createData);
}
protected void checkData(VM vm0, final int startKey, final int endKey,
final String value) {
SerializableRunnable checkData = new SerializableRunnable("CheckData") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(REGION_NAME);
for(int i =startKey; i < endKey; i++) {
assertEquals("For key " + i, value, region.get(i));
}
}
};
vm0.invoke(checkData);
}
protected void delete(VM vm, final int startKey, final int endKey) {
SerializableRunnable createData = new SerializableRunnable("destroy") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(REGION_NAME);
for(int i =startKey; i < endKey; i++) {
region.destroy(i);
}
}
};
vm.invoke(createData);
}
protected RegionVersionVector getRVV(VM vm) throws IOException, ClassNotFoundException {
SerializableCallable createData = new SerializableCallable("getRVV") {
public Object call() throws Exception {
Cache cache = getCache();
LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME);
RegionVersionVector rvv = region.getVersionVector();
rvv = rvv.getCloneForTransmission();
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
//Using gemfire serialization because
//RegionVersionVector is not java serializable
DataSerializer.writeObject(rvv, hdos);
return hdos.toByteArray();
}
};
byte[] result= (byte[]) vm.invoke(createData);
ByteArrayInputStream bais = new ByteArrayInputStream(result);
return DataSerializer.readObject(new DataInputStream(bais));
}
protected RegionVersionVector getDiskRVV(VM vm) throws IOException, ClassNotFoundException {
SerializableCallable createData = new SerializableCallable("getRVV") {
public Object call() throws Exception {
Cache cache = getCache();
LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME);
RegionVersionVector rvv = region.getDiskRegion().getRegionVersionVector();
rvv = rvv.getCloneForTransmission();
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
//Using gemfire serialization because
//RegionVersionVector is not java serializable
DataSerializer.writeObject(rvv, hdos);
return hdos.toByteArray();
}
};
byte[] result= (byte[]) vm.invoke(createData);
ByteArrayInputStream bais = new ByteArrayInputStream(result);
return DataSerializer.readObject(new DataInputStream(bais));
}
}