blob: 57c0e933117fbe99c0f6c194de8dc2d50694a5e7 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
/**
* The test will verify <br>
* 1. Multiple oplogs are being rolled at once <br>
* 2. The Number of entries getting logged to the HTree are taking care of creation
*
* @author Pratik Batra
*/
@Category(IntegrationTest.class)
public class MultipleOplogsRollingFeatureJUnitTest extends
DiskRegionTestingBase
{
protected Object mutex = new Object();
protected boolean CALLBACK_SET = false;
protected volatile boolean FLAG = false;
DiskRegionProperties diskProps = new DiskRegionProperties();
@After
public void tearDown() throws Exception
{
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
super.tearDown();
diskProps.setDiskDirs(dirs);
}
/**
* The test will verify <br>
* 1. Multiple oplogs are being rolled at once
* 2. The Number of entries are properly conflated
*/
@Test
public void testMultipleRolling()
{
System.setProperty("gemfire.MAX_OPLOGS_PER_COMPACTION", "17");
try {
deleteFiles();
diskProps.setMaxOplogSize(450);
diskProps.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskProps, Scope.LOCAL);
assertNotNull(region);
DiskRegion diskRegion = ((LocalRegion)region).getDiskRegion();
assertNotNull(diskRegion);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
CacheObserverHolder.setInstance(getCacheObserver());
try {
CALLBACK_SET = true;
assertEquals(null, diskRegion.getOplogToBeCompacted());
logWriter.info("testMultipleRolling adding entry 1");
addEntries(1 /* oplogNumber*/, 50 /* byte array size*/);
((LocalRegion)region).getDiskStore().forceCompaction();
waitForCompactor(3000/*wait for forceRolling to finish */);
logWriter.info("testMultipleRolling after waitForCompactor");
// the compactor copied two tombstone and 1 entry to oplog #2
// The total oplog size will become 429, that why we need to
// set oplogmaxsize to be 450. After compaction, the size become 151
// the compactor thread is now stuck waiting for mutex.notify
// check the oplog rolling is null (since the compactor was able to delete it)
if (diskRegion.getOplogToBeCompacted() != null) {
logWriter.info("testMultipleRolling OplogToBeCompacted=" + java.util.Arrays.toString(diskRegion.getOplogToBeCompacted()));
}
assertEquals(null, diskRegion.getOplogToBeCompacted());
// update key3 with 360 bytes to cause it not fit in oplog #2 so it will create oplog #3
// this does an update to key 3 (3 is no longer in oplog#2)
// So oplog#2 is EMPTY.
logWriter.info("testMultipleRolling adding entry 2");
addEntries(2 /* oplogNumber*/, 360 /* byte array size*/);
// #2 is now owned by the compactor so even though it is empty it
// will not be deleted until the compactor does so.
// only one oplog to be compacted because there's no drf
assertEquals(1, diskRegion.getOplogToBeCompacted().length);
// This add will not fit in oplog #3 so it will create oplog #4
// It does an update to key 3 (so 3 is no longer in oplog#3)
// which empties it out
logWriter.info("testMultipleRolling adding entry 3");
addEntries(3 /* oplogNumber*/, 180 /* byte array size*/);
// #3 is was ready to be compacted but since it was EMPTIED
// but because the compaction thread (the pool defaults to 1 thread)
// is hung it will not be deleted immedaitely.
assertEquals(2, diskRegion.getOplogToBeCompacted().length);
// this add will fit in the current oplog
// Just added an update to oplog #4
logWriter.info("testMultipleRolling adding entry 4");
addEntries(4 /* oplogNumber*/, 1 /* byte array size*/);
assertEquals(2, diskRegion.getOplogToBeCompacted().length);
logWriter.info("testMultipleRolling forceRolling");
region.forceRolling();
assertEquals(3, diskRegion.getOplogToBeCompacted().length);
} finally {
synchronized (mutex) {
// let the compactor go
CALLBACK_SET = false;
FLAG = false;
logWriter.info("testMultipleRolling letting compactor go");
mutex.notify();
}
}
// let the main thread sleep so that rolling gets over
waitForCompactor(5000);
assertTrue(
"Number of Oplogs to be rolled is not null : this is unexpected",
diskRegion.getOplogToBeCompacted() == null);
cache.close();
cache = createCache();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache,
diskProps, Scope.LOCAL);
assertTrue("Recreated region size is not 1 ", region.size() == 1);
closeDown();
deleteFiles();
}
catch (Exception ex) {
ex.printStackTrace();
fail("testMultipleRolling: test failed due to " + ex);
} finally {
System.clearProperty("gemfire.MAX_OPLOGS_PER_COMPACTION");
}
}
private void waitForCompactor(long maxWaitingTime)
{
long maxWaitTime = maxWaitingTime;
long start = System.currentTimeMillis();
while (!FLAG) { // wait until
// condition is met
assertTrue("Waited over " + maxWaitTime + "entry to get refreshed",
(System.currentTimeMillis() - start) < maxWaitTime);
try {
Thread.sleep(1);
}
catch (InterruptedException ie) {
fail("Interrupted while waiting " + ie);
}
}
}
private void addEntries(int opLogNum, int valueSize)
{
assertNotNull(region);
byte[] val = new byte[valueSize];
for (int i = 0; i < valueSize; ++i) {
val[i] = (byte)i;
}
// Creating opLog1
if (opLogNum == 1) {
for (int i = 1; i < 4; i++) {
// create 3 entries
region.create(new Integer(i), val);
}
// destroy Entry 1 and 2
region.destroy(new Integer(1));
region.destroy(new Integer(2));
}
else if (opLogNum == 2) {
// update Entry 3
region.put(new Integer(3), val);
}
else if (opLogNum == 3) {
// update Entry 3
region.put(new Integer(3), val);
}
else if (opLogNum == 4) {
// update Entry 3
region.put(new Integer(3), val);
}
}
private CacheObserver getCacheObserver()
{
return (new CacheObserverAdapter() {
public void beforeGoingToCompact()
{
if (logWriter.fineEnabled()) {
logWriter.fine("In beforeGoingToCompact");
}
}
public void afterHavingCompacted()
{
FLAG = true;
if (CALLBACK_SET) {
synchronized (mutex) {
try {
mutex.wait();
}
catch (InterruptedException e) {
fail("interrupted");
}
}
}
if (logWriter.fineEnabled()) {
logWriter.fine("In afterHavingCompacted");
}
}
});
}
}