blob: 572c1c4502914b252f08354f6f1c425cdcaf0b13 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
/**
* This JUnit tests concurrent rolling and normal region operations
* put,get,clear,destroy in both sync and async mode
*
* A region operation is done on the same key that is about to be rolled or has
* just been rolled and the region operation is verified to have been correctly
* executed.
*
* @author Mitul Bid
* @author Asif
*
*/
@Category(IntegrationTest.class)
public class ConcurrentFlushingAndRegionOperationsJUnitTest extends
DiskRegionTestingBase
{
@Before
public void setUp() throws Exception
{
super.setUp();
}
@After
public void tearDown() throws Exception
{
super.tearDown();
}
protected boolean alreadyComeHere = false;
/**
* A put is done on the same entry before flushing that entry
*
* The test ensures that put is not done twice by using a alreadyComeHere
* boolean.
*
* @param region
*/
void putBeforeFlush(final Region region)
{
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void goingToFlush()
{
if (!alreadyComeHere) {
// this should do an in place update of the re we just took of the queue
region.put("Key", "Value2");
}
alreadyComeHere = true;
}
});
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
try {
Assert.assertEquals("Value2", region.get("Key"));
Assert.assertEquals("Value2", getValueOnDisk(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
/**
* a single get is done on the entry about to be flushed. multiple gets are
* avoided by the alreadyComeHere boolean
*
*
*
* @param region
*/
void getBeforeFlush(final Region region)
{
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void goingToFlush()
{
if (!alreadyComeHere) {
region.get("Key");
}
alreadyComeHere = true;
}
});
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
try {
Assert.assertEquals("Value1", getValueOnDisk(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
/**
* the entry which is about to be flushed is deleted
*
* @param region
*/
void delBeforeFlush(final Region region)
{
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void goingToFlush()
{
if (!alreadyComeHere) {
try {
region.destroy("Key");
}
catch (Exception e) {
logWriter.error("Exception occured", e);
fail("Exception occured when it was not supposed to occur");
}
}
alreadyComeHere = true;
}
});
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
boolean entryNotFound = false;
Object v = null;
try {
v = getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound && v != null && !v.equals(Token.TOMBSTONE)) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
/**
* region is closed before going to be flushed The region is going to be
* closed in a separate thread. A 3000 ms wait is done to ensure that the
* separate thread has successfully closed the region
*
* @param region
*/
void closeBeforeFlush(final Region region)
{
hasBeenNotified = false;
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void goingToFlush()
{
if (!alreadyComeHere) {
try {
new Close(region).start();
try {
Thread.sleep(3000);
} catch (InterruptedException ok) {
// this is ok; the async writer thread is interrupted during shutdown
Thread.currentThread().interrupt();
}
}
catch (Exception e) {
logWriter.error("Exception occured", e);
fail("Exception occured when it was not supposed to occur");
}
}
alreadyComeHere = true;
}
});
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
synchronized (region) {
try {
if (!hasBeenNotified) {
region.wait();
}
}
catch (InterruptedException e) {
logWriter.error("Exception occured", e);
fail("interrupted");
}
}
}
/**
* A region close is done after flush is over. The close is done in a separate
* thread and a 3000 ms wait is put to ensure that the separate thread has
* closed the region
*
* @param region
*/
void closeAfterFlush(final Region region)
{
hasBeenNotified = false;
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterWritingBytes()
{
if (!alreadyComeHere) {
try {
new Close(region).start();
try {
Thread.sleep(3000);
} catch (InterruptedException ok) {
// this is ok; the async writer thread is interrupted during shutdown
Thread.currentThread().interrupt();
}
}
catch (Exception e) {
logWriter.error("Exception occured", e);
fail("Exception occured when it was not supposed to occur");
}
}
alreadyComeHere = true;
}
});
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
synchronized (region) {
try {
if (!hasBeenNotified) {
region.wait();
}
}
catch (InterruptedException e) {
logWriter.error("Exception occured", e);
fail("interrupted");
}
}
}
void clearBeforeFlush(final Region region)
{
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
region.clear();
((LocalRegion)region).getDiskRegion().flushForTesting();
boolean entryNotFound = false;
try {
getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
void putAfterFlush(final Region region)
{
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterWritingBytes()
{
if (!alreadyComeHere) {
DiskEntry de = (DiskEntry)((LocalRegion)region).basicGetEntry("Key");
if (de == null) return; // this is caused by the first flush
DiskId id = de.getDiskId();
long oldOplogId = id.getOplogId();
long oldOplogOffset = id.getOffsetInOplog();
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
// region.getCache().getLogger().info("putting value2");
region.put("Key", "Value2");
id = ((DiskEntry)(((LocalRegion)region).basicGetEntry("Key")))
.getDiskId();
long newOplogId = id.getOplogId();
long newOplogOffset = id.getOffsetInOplog();
id.setOplogId(oldOplogId);
id.setOffsetInOplog(oldOplogOffset);
Assert.assertEquals("Value1", ((LocalRegion)region).getDiskRegion()
.getNoBuffer(id));
id.setOplogId(newOplogId);
id.setOffsetInOplog(newOplogOffset);
}
alreadyComeHere = true;
}
});
// region.getCache().getLogger().info("before pause");
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
// region.getCache().getLogger().info("putting value1");
region.put("Key", "Value1");
// region.getCache().getLogger().info("before flush");
((LocalRegion)region).getDiskRegion().flushForTesting();
try {
// region.getCache().getLogger().info("getting value2");
Assert.assertEquals("Value2", region.get("Key"));
}
catch (Exception e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
((LocalRegion)region).getDiskRegion().flushForTesting();
try {
Assert.assertEquals("Value2", getValueOnDisk(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
void getAfterFlush(final Region region)
{
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterWritingBytes()
{
if (!alreadyComeHere) {
region.get("key");
}
alreadyComeHere = true;
}
});
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
try {
Assert.assertEquals("Value1", getValueOnDisk(region));
}
catch (EntryNotFoundException e) {
logWriter.error("Exception occured", e);
fail("Entry not found although was supposed to be there");
}
}
void delAfterFlush(final Region region)
{
alreadyComeHere = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
public void afterWritingBytes()
{
if (!alreadyComeHere) {
try {
region.destroy("Key");
}
catch (Exception e1) {
logWriter.error("Exception occured", e1);
fail("encounter exception when not expected " + e1);
}
}
alreadyComeHere = true;
}
});
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
boolean entryNotFound = false;
Object v = null;
try {
v = getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound && v != null && !v.equals(Token.TOMBSTONE)) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
void clearAfterFlush(final Region region)
{
((LocalRegion)region).getDiskRegion().pauseFlusherForTesting();
region.put("Key", "Value1");
((LocalRegion)region).getDiskRegion().flushForTesting();
region.clear();
boolean entryNotFound = false;
try {
getValueOnDisk(region);
}
catch (EntryNotFoundException e) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
entryNotFound = false;
Object obj = ((LocalRegion)region).basicGetEntry("Key");
if (obj == null) {
entryNotFound = true;
}
if (!entryNotFound) {
fail("EntryNotFoundException was expected but did not get it");
}
}
Object getValueOnDisk(Region region) throws EntryNotFoundException
{
((LocalRegion)region).getDiskRegion().forceFlush();
return ((LocalRegion)region).getValueOnDisk("Key");
}
@Test
public void testPutBeforeFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
putBeforeFlush(region);
region.destroyRegion();
}
@Test
public void testPutAfterFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
putAfterFlush(region);
region.destroyRegion();
}
@Test
public void testGetBeforeFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
getBeforeFlush(region);
region.destroyRegion();
}
@Test
public void testGetAfterFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
getAfterFlush(region);
region.destroyRegion();
}
@Test
public void testClearBeforeFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
clearBeforeFlush(region);
region.destroyRegion();
}
@Test
public void testClearAfterFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
clearAfterFlush(region);
region.destroyRegion();
}
@Test
public void testDelBeforeFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
delBeforeFlush(region);
region.destroyRegion();
}
@Test
public void testDelAfterFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
delAfterFlush(region);
region.destroyRegion();
}
@Test
public void testCloseBeforeFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
closeBeforeFlush(region);
assertFalse(failureCause, testFailed);
}
@Test
public void testCloseAfterFlush()
{
DiskRegionProperties diskRegionProperties = new DiskRegionProperties();
diskRegionProperties.setBytesThreshold(100000000L);
diskRegionProperties.setTimeInterval(100000000L);
diskRegionProperties.setDiskDirs(dirs);
diskRegionProperties.setRolling(true);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache,
diskRegionProperties);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
closeAfterFlush(region);
assertFalse(failureCause, testFailed);
}
protected boolean hasBeenNotified = false;
class Close extends Thread
{
private Region region;
Close(Region region) {
this.region = region;
}
public void run()
{
try {
region.close();
synchronized (region) {
region.notify();
hasBeenNotified = true;
}
}
catch (Exception e) {
logWriter.error("Exception occured", e);
testFailed = true;
failureCause = "Exception occured when it was not supposed to occur, due to "
+ e + "in Close::run";
fail("Exception occured when it was not supposed to occur, due to " + e);
}
}
}
}