blob: 87ac3529b24ef6a409eca1bdfd49dd86548c3ca4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.internal.DistributionConfig;
/**
* The test will verify <br>
* 1. Multiple oplogs are being rolled at once <br>
* 2. The Number of entries getting logged to the HTree are taking care of creation
*/
public class MultipleOplogsRollingFeatureJUnitTest extends DiskRegionTestingBase {
private volatile boolean FLAG = false;
private Object mutex = new Object();
private boolean CALLBACK_SET = false;
private DiskRegionProperties diskProps = new DiskRegionProperties();
@Override
protected final void preTearDown() throws Exception {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
@Override
protected final void postTearDown() throws Exception {
System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_COMPACTION");
diskProps.setDiskDirs(dirs);
}
/**
* The test will verify <br>
* 1. Multiple oplogs are being rolled at once 2. The Number of entries are properly conflated
*/
@Test
public void testMultipleRolling() throws Exception {
System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_COMPACTION", "17");
deleteFiles();
diskProps.setMaxOplogSize(450);
diskProps.setCompactionThreshold(100);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertNotNull(region);
DiskRegion diskRegion = ((LocalRegion) region).getDiskRegion();
assertNotNull(diskRegion);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
CacheObserverHolder.setInstance(getCacheObserver());
try {
CALLBACK_SET = true;
assertEquals(null, diskRegion.getOplogToBeCompacted());
logWriter.info("testMultipleRolling adding entry 1");
addEntries(1 /* oplogNumber */, 50 /* byte array size */);
((LocalRegion) region).getDiskStore().forceCompaction();
await().until(() -> FLAG == true);
logWriter.info("testMultipleRolling after waitForCompactor");
// the compactor copied two tombstone and 1 entry to oplog #2
// The total oplog size will become 429, that why we need to
// set oplogmaxsize to be 450. After compaction, the size become 151
// the compactor thread is now stuck waiting for mutex.notify
// check the oplog rolling is null (since the compactor was able to delete it)
if (diskRegion.getOplogToBeCompacted() != null) {
logWriter.info("testMultipleRolling OplogToBeCompacted="
+ java.util.Arrays.toString(diskRegion.getOplogToBeCompacted()));
}
assertEquals(null, diskRegion.getOplogToBeCompacted());
// update key3 with 360 bytes to cause it not fit in oplog #2 so it will create oplog #3
// this does an update to key 3 (3 is no longer in oplog#2)
// So oplog#2 is EMPTY.
logWriter.info("testMultipleRolling adding entry 2");
addEntries(2 /* oplogNumber */, 360 /* byte array size */);
// #2 is now owned by the compactor so even though it is empty it
// will not be deleted until the compactor does so.
// only one oplog to be compacted because there's no drf
assertEquals(1, diskRegion.getOplogToBeCompacted().length);
// This add will not fit in oplog #3 so it will create oplog #4
// It does an update to key 3 (so 3 is no longer in oplog#3)
// which empties it out
logWriter.info("testMultipleRolling adding entry 3");
addEntries(3 /* oplogNumber */, 180 /* byte array size */);
// #3 is was ready to be compacted but since it was EMPTIED
// but because the compaction thread (the pool defaults to 1 thread)
// is hung it will not be deleted immedaitely.
assertEquals(2, diskRegion.getOplogToBeCompacted().length);
// this add will fit in the current oplog
// Just added an update to oplog #4
logWriter.info("testMultipleRolling adding entry 4");
addEntries(4 /* oplogNumber */, 1 /* byte array size */);
assertEquals(2, diskRegion.getOplogToBeCompacted().length);
logWriter.info("testMultipleRolling forceRolling");
region.forceRolling();
assertEquals(3, diskRegion.getOplogToBeCompacted().length);
} finally {
synchronized (mutex) {
// let the compactor go
CALLBACK_SET = false;
FLAG = false;
logWriter.info("testMultipleRolling letting compactor go");
mutex.notify();
}
}
// let the main thread sleep so that rolling gets over
await().until(() -> FLAG == true);
assertTrue("Number of Oplogs to be rolled is not null : this is unexpected",
diskRegion.getOplogToBeCompacted() == null);
cache.close();
cache = createCache();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertTrue("Recreated region size is not 1 ", region.size() == 1);
closeDown();
deleteFiles();
}
private void addEntries(int opLogNum, int valueSize) {
assertNotNull(region);
byte[] val = new byte[valueSize];
for (int i = 0; i < valueSize; ++i) {
val[i] = (byte) i;
}
// Creating opLog1
if (opLogNum == 1) {
for (int i = 1; i < 4; i++) {
// create 3 entries
region.create(new Integer(i), val);
}
// destroy Entry 1 and 2
region.destroy(new Integer(1));
region.destroy(new Integer(2));
}
else if (opLogNum == 2) {
// update Entry 3
region.put(new Integer(3), val);
}
else if (opLogNum == 3) {
// update Entry 3
region.put(new Integer(3), val);
}
else if (opLogNum == 4) {
// update Entry 3
region.put(new Integer(3), val);
}
}
private CacheObserver getCacheObserver() {
return (new CacheObserverAdapter() {
@Override
public void beforeGoingToCompact() {
if (logWriter.fineEnabled()) {
logWriter.fine("In beforeGoingToCompact");
}
}
@Override
public void afterHavingCompacted() {
FLAG = true;
if (CALLBACK_SET) {
synchronized (mutex) {
try {
mutex.wait();
} catch (InterruptedException e) {
fail("interrupted");
}
}
}
if (logWriter.fineEnabled()) {
logWriter.fine("In afterHavingCompacted");
}
}
});
}
}