blob: d7e30e8870a2ec5a62c5c3ce277d84bda57d9dc2 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.internal.cache.CacheObserver;
import com.gemstone.gemfire.internal.cache.CacheObserverAdapter;
import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
import com.gemstone.gemfire.internal.cache.DiskEntry;
import com.gemstone.gemfire.internal.cache.DiskRegionHelperFactory;
import com.gemstone.gemfire.internal.cache.DiskRegionProperties;
import com.gemstone.gemfire.internal.cache.DiskRegionTestingBase;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.RegionEntry;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
import dunit.DistributedTestCase;
/**
* This JUnit test tests concurrent rolling and normal region operations
* put,get,clear,destroy in both sync and async mode
*
* A region operation is done on the same key that is about to be rolled or has
* just been rolled and the region operation is verified to have been correctly
* executed.
*
* @author Mitul Bid
*
*/
@Category(IntegrationTest.class)
public class ConcurrentRollingAndRegionOperationsJUnitTest extends
DiskRegionTestingBase
{
protected volatile boolean hasBeenNotified;
protected int rollingCount = 0;
protected boolean encounteredFailure = false;
@Before
public void setUp() throws Exception
{
this.hasBeenNotified = false;
super.setUp();
}
void putBeforeRoll(final Region region)
{
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
region.put("Key", "Value2");
}
public void afterHavingCompacted()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
try {
Assert.assertEquals("Value2", getValueOnDisk(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
void getBeforeRoll(final Region region)
{
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
region.get("Key");
}
public void afterHavingCompacted()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
try {
Assert.assertEquals("Value1", getValueOnDisk(region));
Assert.assertEquals("Value1", getValueInHTree(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
void delBeforeRoll(final Region region)
{
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
try {
region.destroy("Key");
}
catch (Exception e) {
logWriter.error("Exception occured", e);
fail("failed while trying to destroy due to " + e);
}
boolean entryNotFound = false;
try {
getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
void clearBeforeRoll(final Region region)
{
this.hasBeenNotified = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
region.clear();
boolean entryNotFound = false;
try {
getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
void putAfterRoll(final Region region)
{
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
region.put("Key", "Value1");
}
public void afterHavingCompacted()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("makeNonEmpty", "needSomethingSoIt can be compacted");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
region.put("Key", "Value2");
try {
Assert.assertEquals("Value2", getValueOnDisk(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
void getAfterRoll(final Region region)
{
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterHavingCompacted()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
region.get("Key");
try {
Assert.assertEquals("Value1", getValueOnDisk(region));
Assert.assertEquals("Value1", getValueInHTree(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
void delAfterRoll(final Region region)
{
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterHavingCompacted()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
try {
region.destroy("Key");
}
catch (Exception e1) {
logWriter.error("Exception occured", e1);
fail("encounter exception when not expected " + e1);
}
boolean entryNotFound = false;
try {
getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
void clearAfterRoll(final Region region)
{
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterHavingCompacted()
{
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
if (!hasBeenNotified) {
try {
region.wait(10000);
assertTrue(hasBeenNotified);
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
}
region.clear();
boolean entryNotFound = false;
try {
getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
private void switchOplog(Region region)
{
((LocalRegion)region).getDiskRegion().forceFlush();
region.forceRolling();
}
Object getValueOnDisk(Region region) throws EntryNotFoundException
{
((LocalRegion)region).getDiskRegion().forceFlush();
return ((LocalRegion)region).getValueOnDisk("Key");
}
Object getValueInHTree(Region region)
{
RegionEntry re = ((LocalRegion)region).basicGetEntry("Key");
return ((LocalRegion)region).getDiskRegion().getNoBuffer(
((DiskEntry)re).getDiskId());
}
public void DARREL_DISABLE_testSyncPutBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
putBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncPutBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
putBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testSyncPutAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
putAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncPutAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
putAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testSyncGetBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
getBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncGetBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
getBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testSyncGetAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
getAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncGetAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
getAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testSyncClearBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
clearBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncClearBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
clearBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testSyncClearAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
clearAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncClearAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
clearAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testSyncDelBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
delBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncDelBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
delBeforeRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testSyncDelAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
delAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testAsyncDelAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
delAfterRoll(region);
region.destroyRegion();
}
public void DARREL_DISABLE_testCloseBeforeRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
closeBeforeRoll(region);
}
public void DARREL_DISABLE_testCloseAfterRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
closeAfterRoll(region);
//Asif :Recreate the region so it gets destroyed in tear down
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
}
public void DARREL_DISABLE_testconcurrentPutAndRoll()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
diskRegionProperties.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskRegionProperties, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
concurrentPutAndRoll(region);
region.destroyRegion();
}
private void concurrentPutAndRoll(final Region region)
{
hasBeenNotified = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
long startTime, endTime, totalTime = 0;
boolean localHasBeenNotified = false;
public void beforeGoingToCompact()
{
final Object obj = new Object();
Thread thread1 = new Thread() {
public void run()
{
RegionEntry re = ((LocalRegion)region).basicGetEntry("Key");
synchronized (re) {
synchronized (obj) {
obj.notify();
localHasBeenNotified = true;
}
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
testFailed = true;
failureCause = "Exception occured when it was not supposed to occur, Exception is "
+ e + "in concurrentPutAndRoll";
fail("exception not expected here");
}
}
}
};
thread1.start();
synchronized (obj) {
try {
if (!localHasBeenNotified) {
obj.wait(10000);
assertTrue(localHasBeenNotified);
}
}
catch (InterruptedException e) {
testFailed = true;
failureCause = "Exception occured when it was not supposed to occur, Exception is "
+ e + "in concurrentPutAndRoll";
fail("exception not expected here");
}
}
startTime = System.currentTimeMillis();
}
public void afterHavingCompacted()
{
endTime = System.currentTimeMillis();
totalTime = endTime - startTime;
setTotalTime(totalTime);
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
try {
if (!hasBeenNotified) {
region.wait(10000);
assertTrue(hasBeenNotified);
}
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
if (this.totalTime < 2000) {
fail(" It should have taken more than 2000 millisecs but it took "
+ totalTime);
}
assertFalse(failureCause, testFailed);
}
/**
* Check if the roller thread cant skip rolling the entry & if a get is done
* on that entry , it is possible for the get operation to get the Oplog which
* is not yet destroyed but by the time a basicGet is done,the oplog gets
* destroyed & the get operation sees the file length zero or it may encounter
* null pointer exception while retrieving the oplog.
*
* @author Asif
*
*/
@Test
public void testConcurrentRollingAndGet()
{
final int MAX_OPLOG_SIZE = 1000*2;
DiskRegionProperties diskProps = new DiskRegionProperties();
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setCompactionThreshold(100);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
final int TOTAL_SWITCHING = 200;
final int TOTAL_KEYS = 20;
final List threads = new ArrayList();
final byte[] val = new byte[100];
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
CacheObserver old = CacheObserverHolder
.setInstance(new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
for (int k = 0; k < TOTAL_KEYS; ++k) {
final int num = k;
Thread th = new Thread(new Runnable() {
public void run()
{
byte[] val_on_disk = null;
try {
val_on_disk = (byte[])((LocalRegion)region)
.getValueOnDisk("key" + (num + 1));
assertTrue(
"byte array was not of right size as its size was "
+ val_on_disk.length, val_on_disk.length == 100);
}
catch (Exception e) {
encounteredFailure = true;
logWriter.error("Test encountered exception ", e);
fail(" Test failed as could not obtain value from disk.Exception = "
+ e);
}
}
});
threads.add(th);
}
for (int j = 0; j < TOTAL_KEYS; ++j) {
((Thread)threads.get(rollingCount++)).start();
}
}
});
for (int i = 0; i < TOTAL_SWITCHING; ++i) {
for (int j = 0; j < TOTAL_KEYS; ++j) {
region.put("key" + (j + 1), val);
}
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
logWriter.error("Main thread encountered exception ", e);
fail(" Test failed as main thread encountered exception = " + e);
}
}
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
for (int i = 0; i < threads.size(); ++i) {
Thread th = (Thread)threads.get(i);
if (th != null) {
DistributedTestCase.join(th, 30 * 1000, null);
}
}
assertTrue(
"The test will fail as atleast one thread doing get operation encounetred exception",
!encounteredFailure);
CacheObserverHolder.setInstance(old);
closeDown();
}
private volatile long totalTime = 0;
protected void setTotalTime(long time)
{
this.totalTime = time;
}
void closeAfterRoll(final Region region)
{
hasBeenNotified = false;
final Close th = new Close(region);
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterHavingCompacted()
{
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
try {
th.start();
Thread.sleep(3000);
}
catch (Exception e) {
logWriter.error("Exception occured", e);
fail("Exception occured when it was not supposed to occur");
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
try {
if (!hasBeenNotified) {
region.wait(10000);
assertTrue(hasBeenNotified);
}
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
try {
th.join(5000);
} catch (InterruptedException ignore) {
fail("exception not expected here");
}
assertFalse(th.isAlive());
assertFalse(failureCause, testFailed);
}
void closeBeforeRoll(final Region region)
{
hasBeenNotified = false;
final Close th = new Close(region);
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
try {
th.start();
Thread.sleep(3000);
}
catch (Exception e) {
logWriter.error("Exception occured", e);
fail("Exception occured when it was not supposed to occur");
}
}
});
region.put("Key", "Value1");
switchOplog(region);
synchronized (region) {
try {
if (!hasBeenNotified) {
region.wait(10000);
assertTrue(hasBeenNotified);
}
}
catch (InterruptedException e) {
fail("exception not expected here");
}
}
try {
th.join(5000);
} catch (InterruptedException ignore) {
fail("exception not expected here");
}
assertFalse(th.isAlive());
assertFalse(failureCause, testFailed);
}
class Close extends Thread
{
private Region region;
Close(Region region) {
this.region = region;
}
public void run()
{
try {
region.close();
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
catch (Exception e) {
logWriter.error("Exception occured", e);
testFailed = true;
failureCause = "Exception occured when it was not supposed to occur, due to "
+ e;
fail("Exception occured when it was not supposed to occur, due to " + e);
}
}
}
}