blob: a08fa7eaab94959ecf2a91d9a4a3041d3462a932 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheWriterAdapter;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.Oplog.OPLOG_TYPE;
import org.apache.geode.internal.cache.entries.AbstractDiskLRURegionEntry;
import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.test.dunit.ThreadUtils;
/**
* Testing Oplog API's
*/
public class OplogJUnitTest extends DiskRegionTestingBase {
private boolean proceed = false;
private final DiskRegionProperties diskProps = new DiskRegionProperties();
private long delta;
private volatile boolean assertDone = false;
private boolean failure = false;
@Override
protected final void postSetUp() throws Exception {
diskProps.setDiskDirs(dirs);
DiskStoreImpl.SET_IGNORE_PREALLOCATE = true;
}
@Override
protected final void postTearDown() throws Exception {
DiskStoreImpl.SET_IGNORE_PREALLOCATE = false;
}
/**
* Test method for 'org.apache.geode.internal.cache.Oplog.isBackup()'
*/
@Test
public void testIsBackup() {
InternalRegion overFlowAndPersistRegionRegion =
(InternalRegion) DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
assertTrue("Not correctly setup for overflow and persist",
overFlowAndPersistRegionRegion.getDiskRegion().isBackup());
closeDown(overFlowAndPersistRegionRegion);
InternalRegion overFlowOnlyRegion =
(InternalRegion) DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
assertFalse("Not correctly setup for overflow only mode",
overFlowOnlyRegion.getDiskRegion().isBackup());
closeDown(overFlowOnlyRegion);
InternalRegion persistOnlyRegion = (InternalRegion) DiskRegionHelperFactory
.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertTrue("Not correctly setup for persist only mode",
persistOnlyRegion.getDiskRegion().isBackup());
closeDown(persistOnlyRegion);
}
/*
* Test method for 'org.apache.geode.internal.cache.Oplog.useSyncWrites()'
*/
@Test
public void testUseSyncWritesWhenSet() {
diskProps.setSynchronous(true);
InternalRegion syncOverFlowAndPersistRegion =
(InternalRegion) DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
assertTrue(syncOverFlowAndPersistRegion.getAttributes().isDiskSynchronous());
closeDown(syncOverFlowAndPersistRegion);
InternalRegion syncOverFlowOnlyRegion =
(InternalRegion) DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
assertTrue(syncOverFlowOnlyRegion.getAttributes().isDiskSynchronous());
closeDown(syncOverFlowOnlyRegion);
InternalRegion syncPersistOnlyRegion = (InternalRegion) DiskRegionHelperFactory
.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertTrue(syncPersistOnlyRegion.getAttributes().isDiskSynchronous());
closeDown(syncPersistOnlyRegion);
}
@Test
public void testNotUseSyncWritesWhenNotSet() {
diskProps.setSynchronous(false);
InternalRegion asyncOverFlowAndPersistRegion =
(InternalRegion) DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, diskProps);
assertFalse(asyncOverFlowAndPersistRegion.getAttributes().isDiskSynchronous());
closeDown(asyncOverFlowAndPersistRegion);
InternalRegion asyncOverFlowOnlyRegion =
(InternalRegion) DiskRegionHelperFactory.getAsyncOverFlowOnlyRegion(cache, diskProps);
assertFalse(asyncOverFlowOnlyRegion.getAttributes().isDiskSynchronous());
closeDown(asyncOverFlowOnlyRegion);
InternalRegion asyncPersistOnlyRegion =
(InternalRegion) DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, diskProps);
assertFalse(asyncPersistOnlyRegion.getAttributes().isDiskSynchronous());
closeDown(asyncPersistOnlyRegion);
}
/**
* Test method for 'org.apache.geode.internal.cache.Oplog.clear(File)'
*/
@Test
public void testClear() {
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
putTillOverFlow(region);
region.clear();
region.close();
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
assertTrue(" failed in get OverflowAndPersist ", region.get(0) == null);
closeDown();
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
putTillOverFlow(region);
region.clear();
region.close();
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
assertTrue(" failed in get OverflowOnly ", region.get(0) == null);
closeDown();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
put100Int();
region.clear();
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertTrue(" failed in get PersistOnly ", region.get(0) == null);
closeDown();
}
/**
* Test method for 'org.apache.geode.internal.cache.Oplog.close()'
*/
@Test
public void testClose() {
{
deleteFiles();
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
Oplog oplog = dr.testHook_getChild();
long id = oplog.getOplogId();
oplog.close();
StatisticsFactory factory = cache.getDistributedSystem();
Oplog newOplog =
new Oplog(id, dr.getOplogSet(), new DirectoryHolder(factory, dirs[0], 1000, 0));
dr.getOplogSet().setChild(newOplog);
closeDown();
}
{
deleteFiles();
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
dr.testHookCloseAllOverflowOplogs();
checkIfContainsFile("OVERFLOW");
closeDown();
}
{
deleteFiles();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
Oplog oplog = dr.testHook_getChild();
long id = oplog.getOplogId();
oplog.close();
StatisticsFactory factory = cache.getDistributedSystem();
Oplog newOplog =
new Oplog(id, dr.getOplogSet(), new DirectoryHolder(factory, dirs[0], 1000, 2));
dr.setChild(newOplog);
closeDown();
}
}
private void closeDown(InternalRegion region) {
super.closeDown(region);
DiskRegion diskRegion = region != null ? region.getDiskRegion() : null;
if (diskRegion != null) {
diskRegion.getDiskStore().close();
((InternalCache) cache).removeDiskStore(diskRegion.getDiskStore());
}
}
@Override
protected void closeDown() {
DiskRegion dr = null;
if (region != null) {
dr = ((LocalRegion) region).getDiskRegion();
}
super.closeDown();
if (dr != null) {
dr.getDiskStore().close();
((LocalRegion) region).getGemFireCache().removeDiskStore(dr.getDiskStore());
}
}
private void checkIfContainsFile(String fileExtension) {
for (File dir : dirs) {
File[] files = dir.listFiles();
for (File file : files) {
if (file.getAbsolutePath().endsWith(fileExtension)) {
fail("file " + file + " still exists after oplog.close()");
}
}
}
}
/**
* Test method for 'org.apache.geode.internal.cache.Oplog.destroy()'
*/
@Test
public void testDestroy() {
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
put100Int();
putTillOverFlow(region);
try {
region.destroy(0);
} catch (EntryNotFoundException e1) {
logWriter.error("Exception occurred", e1);
fail(" Entry not found when it was expected to be there");
}
region.close();
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
assertTrue(" failed in get OverflowAndPersist ", region.get(0) == null);
closeDown();
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
put100Int();
putTillOverFlow(region);
try {
region.destroy(0);
} catch (EntryNotFoundException e1) {
logWriter.error("Exception occurred", e1);
fail(" Entry not found when it was expected to be there");
}
region.close();
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
assertTrue(" failed in get OverflowOnly ", region.get(0) == null);
closeDown();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
put100Int();
try {
region.destroy(0);
} catch (EntryNotFoundException e1) {
logWriter.error("Exception occurred", e1);
fail(" Entry not found when it was expected to be there");
}
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertTrue(" failed in get PersistOnly ", region.get(0) == null);
closeDown();
}
/**
* Test method for 'org.apache.geode.internal.cache.Oplog.remove(long)'
*/
@Test
public void testRemove() {
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
putTillOverFlow(region);
region.remove(0);
region.close();
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
assertTrue(" failed in get OverflowAndPersist ", region.get(0) == null);
closeDown();
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
putTillOverFlow(region);
region.remove(0);
assertTrue(" failed in get OverflowOnly ", region.get(0) == null);
region.close();
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
closeDown();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
put100Int();
region.remove(0);
assertTrue(" failed in get PersistOnly ", region.get(0) == null);
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
closeDown();
}
/**
* Tests whether the data is written in the right format on the disk
*
*/
@Test
public void testFaultInOfValuesFromDisk() {
try {
// Asif First create a persist only disk region which is of aysnch
// & switch of OplOg type
diskProps.setMaxOplogSize(1000);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(true);
diskProps.setTimeInterval(-1);
diskProps.setOverflow(false);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
byte[] val = new byte[10];
for (int i = 0; i < val.length; ++i) {
val[i] = (byte) i;
}
region.put(1, val);
DiskEntry entry = ((DiskEntry) ((LocalRegion) region).basicGetEntry(1));
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
val = (byte[]) dr.getNoBuffer(entry.getDiskId());
for (int i = 0; i < val.length; ++i) {
if (val[i] != (byte) i) {
fail("Test for fault in from disk failed");
}
}
val = (byte[]) DiskStoreImpl.convertBytesAndBitsIntoObject(
dr.getBytesAndBitsWithoutLock(entry.getDiskId(), true, false), (InternalCache) cache);
for (int i = 0; i < val.length; ++i) {
if (val[i] != (byte) i) {
fail("Test for fault in from disk failed");
}
}
region.invalidate(1);
assertTrue(dr.getNoBuffer(entry.getDiskId()) == Token.INVALID);
} catch (Exception e) {
logWriter.error("Exception occurred", e);
fail(e.toString());
}
closeDown();
}
/**
* Tests the original ByteBufferPool gets transferred to the new Oplog for synch mode
*
*/
@Test
public void testByteBufferPoolTransferForSynchMode() {
diskProps.setMaxOplogSize(1024);
diskProps.setBytesThreshold(0);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(true);
diskProps.setTimeInterval(10000);
diskProps.setOverflow(false);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
// Populate data just below the switch over threshold
byte[] val = new byte[5];
for (int i = 0; i < val.length; ++i) {
val[i] = (byte) i;
}
region.put(1, val);
((LocalRegion) region).basicGetEntry(1);
Oplog old = dr.testHook_getChild();
ByteBuffer oldWriteBuf = old.getWriteBuf();
dr.forceRolling();
region.put(2, val);
Oplog switched = dr.testHook_getChild();
assertTrue(old != switched);
assertEquals(dr.getDiskStore().getPersistentOplogs().getChild(2), switched);
assertEquals(oldWriteBuf, switched.getWriteBuf());
assertEquals(null, old.getWriteBuf());
closeDown();
}
/**
* Tests the bug which arises in case of asynch mode during oplog switching caused by conflation
* of create/destroy operation.The bug occurs if a create operation is followed by destroy but
* before destroy proceeds some other operation causes oplog switching
*
*/
@Test
public void testBug34615() {
final int MAX_OPLOG_SIZE = 100;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(false);
diskProps.setOverflow(false);
diskProps.setBytesThreshold(150);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
final byte[] val = new byte[50];
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, diskProps);
final CacheObserver old = CacheObserverHolder.setInstance(new CacheObserverAdapter() {
@Override
public void afterConflation(ByteBuffer orig, ByteBuffer conflated) {
Thread th = new Thread(() -> region.put("2", new byte[75]));
assertNull(conflated);
th.start();
ThreadUtils.join(th, 30 * 1000);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
});
region.put("1", val);
region.remove("1");
assertFalse(failureCause, testFailed);
CacheObserverHolder.setInstance(old);
closeDown();
}
@Test
public void testConflation() throws Exception {
final int MAX_OPLOG_SIZE = 1000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(false);
diskProps.setOverflow(false);
diskProps.setBytesThreshold(1500);
final byte[] val = new byte[50];
final byte[][] bb = new byte[2][];
bb[0] = new byte[5];
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, diskProps);
try {
region.put("1", val);
region.put("1", new byte[10]);
region.put("2", val);
region.put("2", new byte[100]);
region.create("3", null);
region.put("3", new byte[10]);
region.create("4", null);
region.put("4", new byte[0]);
// tests for byte[][]
region.create("5", bb);
region.put("6", val);
region.put("6", bb);
region.create("7", null);
region.put("7", bb);
region.create("8", new byte[9]);
region.invalidate("8");
region.create("9", new byte[0]);
region.invalidate("9");
region.create("10", new byte[9]);
region.localInvalidate("10");
region.create("11", new byte[0]);
region.localInvalidate("11");
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
dr.flushForTesting();
byte[] val_1 = ((byte[]) ((LocalRegion) region).getValueOnDisk("1"));
assertEquals(val_1.length, 10);
byte[] val_2 = ((byte[]) ((LocalRegion) region).getValueOnDisk("2"));
assertEquals(val_2.length, 100);
byte[] val_3 = ((byte[]) ((LocalRegion) region).getValueOnDisk("3"));
assertEquals(val_3.length, 10);
byte[] val_4 = ((byte[]) ((LocalRegion) region).getValueOnDisk("4"));
assertEquals(val_4.length, 0);
byte[][] val_5 = (byte[][]) ((LocalRegion) region).getValueOnDisk("5");
assertEquals(val_5.length, 2);
assertEquals(val_5[0].length, 5);
assertNull(val_5[1]);
byte[][] val_6 = (byte[][]) ((LocalRegion) region).getValueOnDisk("6");
assertEquals(val_6.length, 2);
assertEquals(val_6[0].length, 5);
assertNull(val_6[1]);
byte[][] val_7 = (byte[][]) ((LocalRegion) region).getValueOnDisk("7");
assertEquals(val_7.length, 2);
assertEquals(val_7[0].length, 5);
assertNull(val_7[1]);
Object val_8 = ((LocalRegion) region).getValueOnDisk("8");
assertEquals(val_8, Token.INVALID);
Object val_9 = ((LocalRegion) region).getValueOnDisk("9");
assertEquals(val_9, Token.INVALID);
Object val_10 = ((LocalRegion) region).getValueOnDisk("10");
assertEquals(val_10, Token.LOCAL_INVALID);
Object val_11 = ((LocalRegion) region).getValueOnDisk("11");
assertEquals(val_11, Token.LOCAL_INVALID);
} finally {
closeDown();
}
}
/**
* This tests the retrieval of empty byte array when present in asynch buffers
*
*/
@Test
public void testGetEmptyByteArrayInAsynchBuffer() {
final int MAX_OPLOG_SIZE = 1000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(false);
diskProps.setOverflow(false);
diskProps.setBytesThreshold(1500);
final byte[] val = new byte[50];
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, diskProps);
try {
region.put("1", val);
region.put("1", new byte[0]);
byte[] val_1 = ((byte[]) ((LocalRegion) region).getValueOnDiskOrBuffer("1"));
assertEquals(val_1.length, 0);
} catch (Exception e) {
logWriter.error("Exception occurred", e);
fail("The test failed due to exception = " + e);
}
closeDown();
}
/**
* This tests the retrieval of empty byte array in synch mode
*
*/
@Test
public void testGetEmptyByteArrayInSynchMode() {
final int MAX_OPLOG_SIZE = 1000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
diskProps.setBytesThreshold(1500);
final byte[] val = new byte[50];
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
try {
region.put("1", val);
region.put("1", new byte[0]);
byte[] val_1 = ((byte[]) ((LocalRegion) region).getValueOnDiskOrBuffer("1"));
assertEquals(val_1.length, 0);
} catch (Exception e) {
logWriter.error("Exception occurred", e);
fail("The test failed due to exception = " + e);
}
closeDown();
}
/**
* This tests the bug which caused the oplogRoller to attempt to roll a removed entry whose value
* is Token.Removed This bug can occur if a remove operation causes oplog switching & hence roller
* thread gets notified, & the roller thread obtains the iterator of the concurrent region map
* before the remove
*
*/
@Test
public void testBug34702() {
final int MAX_OPLOG_SIZE = 500 * 2;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
final byte[] val = new byte[200];
proceed = false;
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
region.put("key1", val);
region.put("key2", val);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
final CacheObserver old = CacheObserverHolder.setInstance(new CacheObserverAdapter() {
@Override
public void afterSettingOplogOffSet(long offset) {
((LocalRegion) region).getDiskRegion().forceRolling();
// Let the operation thread yield to the Roller so that
// it is able to obtain the iterator of the concurrrent region map
// & thus get the reference to the entry which will contain
// value as Token.Removed as the entry though removed from
// concurrent
// map still will be available to the roller
Thread.yield();
// Sleep for some time
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
testFailed = true;
failureCause = "No guarantee that test is succesful";
fail("No guarantee that test is succesful");
}
}
@Override
public void afterHavingCompacted() {
proceed = true;
synchronized (OplogJUnitTest.this) {
OplogJUnitTest.this.notify();
}
}
});
try {
region.destroy("key1");
region.destroy("key2");
} catch (Exception e1) {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
CacheObserverHolder.setInstance(old);
fail("Test failed as entry deletion threw exception. Exception = " + e1);
}
// Wait for some time & check if the after having rolled callabck
// is issued sucessfully or not.
if (!proceed) {
synchronized (this) {
if (!proceed) {
try {
this.wait(20000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// The test will automatically fail due to proceed flag
}
}
}
}
assertFalse(failureCause, testFailed);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
CacheObserverHolder.setInstance(old);
if (!proceed) {
fail("Test failed as afterHavingCompacted callabck not issued even after sufficient wait");
}
closeDown();
}
/**
* tests a potential deadlock situation if the operation causing a swithcing of Oplog is waiting
* for roller to free space. The problem can arise if the operation causing Oplog switching is
* going on an Entry , which already has its oplog ID referring to the Oplog being switched. In
* such case, when the roller will try to roll the entries referencing the current oplog , it will
* not be able to acquire the lock on the entry as the switching thread has already taken a lock
* on it.
*
*/
@Test
public void testRollingDeadlockSituation() {
final int MAX_OPLOG_SIZE = 2000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0]}, new int[] {1400});
final byte[] val = new byte[500];
proceed = false;
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
region.put("key1", val);
region.put("key1", val);
try {
region.put("key1", val);
} catch (DiskAccessException dae) {
logWriter.error("Exception occurred", dae);
fail(
"Test failed as DiskAccessException was encountered where as the operation should ideally have proceeded without issue . exception = "
+ dae);
}
}
/**
* This tests whether an empty byte array is correctly writtem to the disk as a zero value length
* operation & hence the 4 bytes field for recording the value length is absent & also since the
* value length is zero no byte for it should also get added. Similary during recover from HTree
* as well as Oplog , the empty byte array should be read correctly
*
*/
@Test
public void testEmptyByteArrayPutAndRecovery() {
CacheObserver old = CacheObserverHolder.setInstance(new CacheObserverAdapter() {
@Override
public void afterConflation(ByteBuffer origBB, ByteBuffer conflatedBB) {
if ((2 + 4 + 1 + EntryEventImpl.serialize("key1").length) != origBB.capacity()) {
failureCause =
"For a backup region, addition of an empty array should result in an offset of 6 bytes where as actual offset is ="
+ origBB.capacity();
testFailed = true;
}
Assert.assertTrue(
"For a backup region, addition of an empty array should result in an offset of 6 bytes where as actual offset is ="
+ origBB.capacity(),
(2 + 4 + 1 + EntryEventImpl.serialize("key1").length) == origBB.capacity());
}
});
try {
final int MAX_OPLOG_SIZE = 2000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0]}, new int[] {1400});
final byte[] val = new byte[0];
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
region.put("key1", val);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
byte[] _val = (byte[]) region.get("key1");
assertTrue(
"value of key1 after restarting the region is not an empty byte array. This may indicate problem in reading from Oplog",
_val.length == 0);
if (this.logWriter.infoEnabled()) {
this.logWriter.info(
"After first region close & opening again no problems encountered & hence Oplog has been read successfully.");
this.logWriter.info(
"Closing the region again without any operation done, would indicate that next time data will be loaded from HTree .");
}
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
_val = (byte[]) region.get("key1");
assertTrue(
"value of key1 after restarting the region is not an empty byte array. This may indicate problem in reading from HTRee",
_val.length == 0);
assertFalse(failureCause, testFailed);
// region.close();
} finally {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
CacheObserverHolder.setInstance(old);
}
}
/**
* This is used to test bug 35012 where a remove operation on a key gets unrecorded due to
* switching of Oplog if it happens just after the remove operation has destroyed the in memory
* entry & is about to acquire the readlock in DiskRegion to record the same. If the Oplog has
* switched during that duration , the bug would appear
*
*/
@Test
public void testBug35012() {
final int MAX_OPLOG_SIZE = 500;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
final byte[] val = new byte[200];
try {
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
region.put("key1", val);
region.put("key2", val);
region.put("key3", val);
final Thread th = new Thread(() -> region.remove("key1"));
// main thread acquires the write lock
((LocalRegion) region).getDiskRegion().acquireWriteLock();
try {
th.start();
Thread.yield();
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
dr.testHook_getChild().forceRolling(dr);
} finally {
((LocalRegion) region).getDiskRegion().releaseWriteLock();
}
ThreadUtils.join(th, 30 * 1000);
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertEquals(region.size(), 2);
} catch (Exception e) {
this.logWriter.error("Exception occurred ", e);
fail("The test could not be completed because of exception .Exception=" + e);
}
closeDown();
}
/**
* Tests if buffer size & time are not set , the asynch writer gets awakened on time basis of
* default 1 second
*
*/
@Test
public void testAsynchWriterAttribBehaviour1() throws Exception {
DiskStoreFactory dsf = cache.createDiskStoreFactory();
((DiskStoreFactoryImpl) dsf).setMaxOplogSizeInBytes(10000);
File dir = new File("testingDirectoryDefault");
dir.mkdir();
dir.deleteOnExit();
File[] dirs = {dir};
dsf.setDiskDirs(dirs);
RegionFactory<Object, Object> factory =
cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT);
final long t1 = System.currentTimeMillis();
DiskStore ds = dsf.create("test");
factory.setDiskSynchronous(false);
factory.setDiskStoreName(ds.getName());
factory.setScope(Scope.LOCAL);
region = factory.create("test");
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
CacheObserver old = CacheObserverHolder.setInstance(
new CacheObserverAdapter() {
private long t2;
@Override
public void goingToFlush() {
t2 = System.currentTimeMillis();
delta = t2 - t1;
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
synchronized (OplogJUnitTest.this) {
OplogJUnitTest.this.notify();
}
}
});
region.put("key1", "111111111111");
synchronized (this) {
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
this.wait(10000);
assertFalse(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER);
}
}
CacheObserverHolder.setInstance(old);
// Windows clock has an accuracy of 15 ms. Accounting for the same.
assertTrue("delta is in miilliseconds=" + delta, delta >= 985);
closeDown();
}
/**
* Tests if buffer size is set but time is not set , the asynch writer gets awakened on buffer
* size basis
*/
@Ignore("TODO:DARREL_DISABLE: test is disabled")
@Test
public void testAsynchWriterAttribBehaviour2() throws Exception {
DiskStoreFactory dsf = cache.createDiskStoreFactory();
((DiskStoreFactoryImpl) dsf).setMaxOplogSizeInBytes(10000);
dsf.setQueueSize(2);
File dir = new File("testingDirectoryDefault");
dir.mkdir();
dir.deleteOnExit();
File[] dirs = {dir};
dsf.setDiskDirs(dirs);
RegionFactory<Object, Object> factory =
cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT);
DiskStore ds = dsf.create("test");
factory.setDiskSynchronous(false);
factory.setDiskStoreName(ds.getName());
factory.setScope(Scope.LOCAL);
region = factory.create("test");
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
CacheObserver old = CacheObserverHolder.setInstance(
new CacheObserverAdapter() {
@Override
public void goingToFlush() {
synchronized (OplogJUnitTest.this) {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
OplogJUnitTest.this.notify();
}
}
});
region.put("key1", new byte[25]);
Thread.sleep(1000);
assertTrue(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER);
region.put("key2", new byte[25]);
synchronized (this) {
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
OplogJUnitTest.this.wait(10000);
assertFalse(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER);
}
}
CacheObserverHolder.setInstance(old);
closeDown();
}
/**
* Tests if buffer size & time interval are explicitly set to zero then the flush will occur due
* to asynchForceFlush or due to switching of Oplog
*
*/
@Test
public void testAsynchWriterAttribBehaviour3() throws Exception {
DiskStoreFactory dsf = cache.createDiskStoreFactory();
((DiskStoreFactoryImpl) dsf).setMaxOplogSizeInBytes(500);
dsf.setQueueSize(0);
dsf.setTimeInterval(0);
File dir = new File("testingDirectoryDefault");
dir.mkdir();
dir.deleteOnExit();
File[] dirs = {dir};
dsf.setDiskDirs(dirs);
RegionFactory<Object, Object> factory =
cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT);
DiskStore ds = dsf.create("test");
factory.setDiskSynchronous(false);
factory.setDiskStoreName(ds.getName());
factory.setScope(Scope.LOCAL);
region = factory.create("test");
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
CacheObserver old = CacheObserverHolder.setInstance(
new CacheObserverAdapter() {
@Override
public void goingToFlush() {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
synchronized (OplogJUnitTest.this) {
OplogJUnitTest.this.notify();
}
}
});
region.put("key1", new byte[100]);
region.put("key2", new byte[100]);
region.put("key3", new byte[100]);
region.put("key4", new byte[100]);
region.put("key5", new byte[100]);
Thread.sleep(1000);
assertTrue(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER);
((LocalRegion) region).getDiskRegion().forceRolling();
synchronized (this) {
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
OplogJUnitTest.this.wait(10000);
}
}
assertFalse(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER);
CacheObserverHolder.setInstance(old);
closeDown();
}
/**
* Tests if the byte buffer pool in asynch mode tries to contain the pool size
*
*/
@Test
public void testByteBufferPoolContainment() throws Exception {
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setMaxOplogSize(1024 * 1024);
diskProps.setSynchronous(false);
diskProps.setOverflow(false);
diskProps.setBytesThreshold(10); // this is now item count
diskProps.setTimeInterval(0);
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, diskProps);
final byte[] val = new byte[1000];
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
@Override
public void goingToFlush() { // Delay flushing
assertEquals(10, region.size());
for (int i = 10; i < 20; ++i) {
region.put("" + i, val);
}
synchronized (OplogJUnitTest.this) {
OplogJUnitTest.this.notify();
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
}
});
for (int i = 0; i < 10; ++i) {
region.put("" + i, val);
}
synchronized (OplogJUnitTest.this) {
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
OplogJUnitTest.this.wait(9000);
assertEquals(false, LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER);
}
}
}
/**
* tests async stats are correctly updated
*/
@Test
public void testAsyncStats() throws InterruptedException {
diskProps.setBytesThreshold(101);
diskProps.setTimeInterval(1000000);
region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, diskProps);
final DiskStoreStats dss = ((LocalRegion) region).getDiskRegion().getDiskStore().getStats();
assertEquals(0, dss.getQueueSize());
put100Int();
await()
.timeout(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(100, dss.getQueueSize()));
assertEquals(0, dss.getFlushes());
DiskRegion diskRegion = ((LocalRegion) region).getDiskRegion();
diskRegion.getDiskStore().flush();
await()
.timeout(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(0, dss.getQueueSize()));
await()
.timeout(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(100, dss.getFlushes()));
put100Int();
await()
.timeout(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(100, dss.getQueueSize()));
diskRegion.getDiskStore().flush();
await()
.timeout(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(0, dss.getQueueSize()));
await()
.timeout(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(200, dss.getFlushes()));
closeDown();
}
/**
* Tests delayed creation of DiskID in overflow only mode
*
*/
@Test
public void testDelayedDiskIdCreationInOverflowOnlyMode() {
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setMaxOplogSize(1024 * 1024);
diskProps.setSynchronous(false);
diskProps.setOverflow(true);
diskProps.setBytesThreshold(10000);
diskProps.setTimeInterval(0);
diskProps.setOverFlowCapacity(1);
region = DiskRegionHelperFactory.getAsyncOverFlowOnlyRegion(cache, diskProps);
final byte[] val = new byte[1000];
region.put("1", val);
DiskEntry entry = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
assertTrue(entry instanceof AbstractDiskLRURegionEntry);
assertNull(entry.getDiskId());
region.put("2", val);
assertNotNull(entry.getDiskId());
entry = (DiskEntry) ((LocalRegion) region).basicGetEntry("2");
assertTrue(entry instanceof AbstractDiskLRURegionEntry);
assertNull(entry.getDiskId());
}
/**
* Tests immediate creation of DiskID in overflow With Persistence mode
*
*/
@Test
public void testImmediateDiskIdCreationInOverflowWithPersistMode() {
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setMaxOplogSize(1024 * 1024);
diskProps.setSynchronous(false);
diskProps.setOverflow(true);
diskProps.setBytesThreshold(10000);
diskProps.setTimeInterval(0);
diskProps.setOverFlowCapacity(1);
region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, diskProps);
final byte[] val = new byte[1000];
region.put("1", val);
DiskEntry entry = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
assertTrue(entry instanceof AbstractDiskLRURegionEntry);
assertNotNull(entry.getDiskId());
region.put("2", val);
assertNotNull(entry.getDiskId());
entry = (DiskEntry) ((LocalRegion) region).basicGetEntry("2");
assertTrue(entry instanceof AbstractDiskLRURegionEntry);
assertNotNull(entry.getDiskId());
}
/**
* An entry which is evicted to disk will have the flag already written to disk, appropriately set
*
*/
@Test
public void testEntryAlreadyWrittenIsCorrectlyUnmarkedForOverflowOnly() throws Exception {
diskProps.setPersistBackup(false);
diskProps.setRolling(false);
diskProps.setMaxOplogSize(1024 * 1024);
diskProps.setSynchronous(true);
diskProps.setOverflow(true);
diskProps.setBytesThreshold(10000);
diskProps.setTimeInterval(0);
diskProps.setOverFlowCapacity(1);
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
final byte[] val = new byte[1000];
region.put("1", val);
region.put("2", val);
// "1" should now be on disk
region.get("1");
// "2" should now be on disk
DiskEntry entry1 = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
DiskId did1 = entry1.getDiskId();
DiskId.isInstanceofOverflowIntOplogOffsetDiskId(did1);
assertTrue(!did1.needsToBeWritten());
region.put("1", "3");
assertTrue(did1.needsToBeWritten());
region.put("2", val);
DiskEntry entry2 = (DiskEntry) ((LocalRegion) region).basicGetEntry("2");
DiskId did2 = entry2.getDiskId();
assertTrue(!did2.needsToBeWritten() || !did1.needsToBeWritten());
tearDown();
setUp();
diskProps.setPersistBackup(false);
diskProps.setRolling(false);
long opsize = Integer.MAX_VALUE;
opsize += 100L;
diskProps.setMaxOplogSize(opsize);
diskProps.setSynchronous(true);
diskProps.setOverflow(true);
diskProps.setBytesThreshold(10000);
diskProps.setTimeInterval(0);
diskProps.setOverFlowCapacity(1);
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
region.put("1", val);
region.put("2", val);
region.get("1");
entry1 = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
did1 = entry1.getDiskId();
DiskId.isInstanceofOverflowOnlyWithLongOffset(did1);
assertTrue(!did1.needsToBeWritten());
region.put("1", "3");
assertTrue(did1.needsToBeWritten());
region.put("2", "3");
did2 = entry2.getDiskId();
assertTrue(!did2.needsToBeWritten() || !did1.needsToBeWritten());
}
/**
* An persistent or overflow with persistence entry which is evicted to disk, will have the flag
* already written to disk, appropriately set
*
*/
@Test
public void testEntryAlreadyWrittenIsCorrectlyUnmarkedForOverflowWithPersistence() {
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setMaxOplogSize(1024 * 1024);
diskProps.setSynchronous(true);
diskProps.setOverflow(true);
diskProps.setBytesThreshold(10000);
diskProps.setTimeInterval(0);
diskProps.setOverFlowCapacity(1);
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
final byte[] val = new byte[1000];
region.put("1", val);
DiskEntry entry1 = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
DiskId did1 = entry1.getDiskId();
DiskId.isInstanceofPersistIntOplogOffsetDiskId(did1);
assertTrue(!did1.needsToBeWritten());
region.put("2", val);
assertTrue(!did1.needsToBeWritten());
}
/**
* Tests the various DiskEntry.Helper APIs for correctness as there is now delayed creation of
* DiskId and accessing OplogkeyId will throw UnsupportedException
*/
@Test
public void testHelperAPIsForOverflowOnlyRegion() {
diskProps.setPersistBackup(false);
diskProps.setRolling(false);
diskProps.setMaxOplogSize(1024 * 1024);
diskProps.setSynchronous(true);
diskProps.setOverflow(true);
diskProps.setBytesThreshold(10000);
diskProps.setTimeInterval(0);
diskProps.setOverFlowCapacity(2);
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
final byte[] val = new byte[1000];
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
region.put("1", val);
region.put("2", val);
region.put("3", val);
DiskEntry entry1 = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
DiskEntry entry2 = (DiskEntry) ((LocalRegion) region).basicGetEntry("2");
DiskEntry entry3 = (DiskEntry) ((LocalRegion) region).basicGetEntry("3");
assertNull(entry2.getDiskId());
assertNull(entry3.getDiskId());
assertNotNull(entry1.getDiskId());
assertNull(DiskEntry.Helper.getValueOnDisk(entry3, dr));
assertNull(DiskEntry.Helper.getValueOnDisk(entry2, dr));
assertNotNull(DiskEntry.Helper.getValueOnDisk(entry1, dr));
assertNull(DiskEntry.Helper.getValueOnDisk(entry3, dr));
assertNull(DiskEntry.Helper.getValueOnDisk(entry2, dr));
assertNull(entry2.getDiskId());
assertNull(entry3.getDiskId());
assertNotNull(entry1.getDiskId());
assertNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry3, dr, (LocalRegion) region));
assertNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry2, dr, (LocalRegion) region));
assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry1, dr, (LocalRegion) region));
assertNull(DiskEntry.Helper.getValueOnDisk(entry3, dr));
assertNull(DiskEntry.Helper.getValueOnDisk(entry2, dr));
}
/**
* Tests the various DiskEntry.Helper APIs for correctness as there is now delayed creation of
* DiskId and accessing OplogkeyId will throw UnsupportedException
*/
@Test
public void testHelperAPIsForOverflowWithPersistenceRegion() {
helperAPIsForPersistenceWithOrWithoutOverflowRegion(true /* should overflow */);
}
/**
* Tests the various DiskEntry.Helper APIs for correctness as there is now delayed creation of
* DiskId and accessing OplogkeyId will throw UnsupportedException
*/
@Test
public void testHelperAPIsForPersistenceRegion() {
helperAPIsForPersistenceWithOrWithoutOverflowRegion(false /* should overflow */);
}
/**
* Tests the various DiskEntry.Helper APIs for correctness as there is now delayed creation of
* DiskId and accessing OplogkeyId will throw UnsupportedException
*/
private void helperAPIsForPersistenceWithOrWithoutOverflowRegion(boolean overflow) {
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setMaxOplogSize(1024 * 1024);
diskProps.setSynchronous(true);
diskProps.setOverflow(overflow);
diskProps.setBytesThreshold(10000);
diskProps.setTimeInterval(0);
diskProps.setOverFlowCapacity(2);
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
final byte[] val = new byte[1000];
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
region.put("1", val);
region.put("2", val);
region.put("3", val);
DiskEntry entry1 = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
DiskEntry entry2 = (DiskEntry) ((LocalRegion) region).basicGetEntry("2");
DiskEntry entry3 = (DiskEntry) ((LocalRegion) region).basicGetEntry("3");
assertNotNull(entry2.getDiskId());
assertNotNull(entry3.getDiskId());
assertNotNull(entry1.getDiskId());
assertNotNull(DiskEntry.Helper.getValueOnDisk(entry3, dr));
assertNotNull(DiskEntry.Helper.getValueOnDisk(entry2, dr));
assertNotNull(DiskEntry.Helper.getValueOnDisk(entry1, dr));
assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry3, dr, (LocalRegion) region));
assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry2, dr, (LocalRegion) region));
assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry1, dr, (LocalRegion) region));
region.close();
region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, diskProps);
dr = ((LocalRegion) region).getDiskRegion();
entry1 = (DiskEntry) ((LocalRegion) region).basicGetEntry("1");
entry2 = (DiskEntry) ((LocalRegion) region).basicGetEntry("2");
entry3 = (DiskEntry) ((LocalRegion) region).basicGetEntry("3");
assertNotNull(entry2.getDiskId());
assertNotNull(entry3.getDiskId());
assertNotNull(entry1.getDiskId());
assertNotNull(DiskEntry.Helper.getValueOnDisk(entry3, dr));
assertNotNull(DiskEntry.Helper.getValueOnDisk(entry2, dr));
assertNotNull(DiskEntry.Helper.getValueOnDisk(entry1, dr));
assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry3, dr, (LocalRegion) region));
assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry2, dr, (LocalRegion) region));
assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry1, dr, (LocalRegion) region));
}
/**
* Tests the condition when a 'put' is in progress and concurrent 'clear' and 'put'(on the same
* key) occur. Thus if after Htree ref was set (in 'put'), the region got cleared (and same key
* re-'put'), the entry will get recorded in the new Oplog without a corresponding create (
* because the Oplogs containing create have already been deleted due to the clear operation).
* This put should not proceed. Also, Region creation after closing should not give an exception.
*/
@Test
public void testPutClearPut() throws Exception {
try {
// Create a persist only region with rolling true
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setMaxOplogSize(1024);
diskProps.setSynchronous(true);
this.proceed = false;
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
final Thread clearOp = new Thread(() -> {
try {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
region.clear();
region.put("key1", "value3");
} catch (Exception e) {
testFailed = true;
failureCause = "Encountered Exception=" + e;
}
});
region.getAttributesMutator().setCacheWriter(new CacheWriterAdapter() {
@Override
public void beforeUpdate(EntryEvent event) throws CacheWriterException {
clearOp.start();
}
});
try {
ThreadUtils.join(clearOp, 30 * 1000);
} catch (Exception e) {
testFailed = true;
failureCause = "Encountered Exception=" + e;
e.printStackTrace();
}
region.create("key1", "value1");
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
region.put("key1", "value2");
if (!testFailed) {
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
} else {
fail(failureCause);
}
} finally {
testFailed = false;
proceed = false;
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
}
/**
* Tests the condition when a 'put' on an alreay created entry and concurrent 'clear' are
* happening. Thus if after HTree ref was set (in 'put'), the region got cleared (and same key
* re-'put'), the entry will actually become a create in the VM The new Oplog should record it as
* a create even though the Htree ref in ThreadLocal will not match with the current Htree Ref.
* But the operation is valid & should get recorded in Oplog
*
*/
@Test
public void testPutClearCreate() throws Exception {
failure = false;
try {
// Create a persist only region with rolling true
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setMaxOplogSize(1024);
diskProps.setSynchronous(true);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
region.create("key1", "value1");
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
CacheObserverHolder.setInstance(new CacheObserverAdapter() {
@Override
public void afterSettingDiskRef() {
Thread clearTh = new Thread(() -> region.clear());
clearTh.start();
try {
ThreadUtils.join(clearTh, 120 * 1000);
failure = clearTh.isAlive();
failureCause = "Clear Thread still running !";
} catch (Exception e) {
failure = true;
failureCause = e.toString();
}
}
});
region.put("key1", "value2");
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
assertFalse(failureCause, failure);
assertEquals(1, region.size());
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertEquals(1, region.size());
assertEquals("value2", region.get("key1"));
} finally {
testFailed = false;
proceed = false;
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
CacheObserverHolder.setInstance(new CacheObserverAdapter());
failure = false;
}
}
/**
* Tests if 'destroy' transaction is working correctly for sync-overflow-only disk region entry
*/
@Test
public void testOverFlowOnlySyncDestroyTx() {
diskProps.setMaxOplogSize(20480);
diskProps.setOverFlowCapacity(1);
diskProps.setDiskDirs(dirs);
region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, diskProps);
assertNotNull(region);
region.put("key", "createValue");
region.put("key1", "createValue1");
cache.getCacheTransactionManager().begin();
region.destroy("key");
cache.getCacheTransactionManager().commit();
assertNull("The deleted entry should have been null",
((LocalRegion) region).entries.getEntry("key"));
}
/**
* Test to force a recovery to follow the path of switchOutFilesForRecovery and ensuring that
* IOExceptions do not come as a result. This is also a bug test for bug 37682
*/
@Test
public void testSwitchFilesForRecovery() throws Exception {
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, Scope.LOCAL);
put100Int();
((LocalRegion) region).getDiskRegion().forceRolling();
Thread.sleep(2000);
put100Int();
int sizeOfRegion = region.size();
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, Scope.LOCAL);
if (sizeOfRegion != region.size()) {
fail(" Expected region size to be " + sizeOfRegion + " after recovery but it is "
+ region.size());
}
}
/**
* tests directory stats are correctly updated in case of single directory (for bug 37531)
*/
@Test
public void testPersist1DirStats() {
final AtomicBoolean freezeRoller = new AtomicBoolean();
CacheObserver old = CacheObserverHolder.setInstance(new CacheObserverAdapter() {
private volatile boolean didBeforeCall = false;
@Override
public void beforeGoingToCompact() {
this.didBeforeCall = true;
synchronized (freezeRoller) {
if (!assertDone) {
try {
// Here, we are not allowing the Roller thread to roll the old oplog into htree
while (!freezeRoller.get()) {
freezeRoller.wait();
}
freezeRoller.set(false);
} catch (InterruptedException e) {
fail("interrupted");
}
}
}
}
@Override
public void afterHavingCompacted() {
if (this.didBeforeCall) {
this.didBeforeCall = false;
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
checkDiskStats();
}
}
});
try {
final int MAX_OPLOG_SIZE = 500;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0]}, new int[] {4000});
final byte[] val = new byte[200];
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
region.put("key1", val);
checkDiskStats();
region.put("key2", val);
checkDiskStats();
// This put will cause a switch as max-oplog size (500) will be exceeded (600)
region.put("key3", val);
synchronized (freezeRoller) {
checkDiskStats();
assertDone = true;
freezeRoller.set(true);
freezeRoller.notifyAll();
}
region.close();
closeDown();
// Stop rolling to get accurate estimates:
diskProps.setRolling(false);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
// On recreating the region after closing, old Oplog file gets rolled into htree
// "Disk space usage zero when region recreated"
checkDiskStats();
region.put("key4", val);
checkDiskStats();
region.put("key5", val);
checkDiskStats();
assertDone = false;
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
region.put("key6", val);
// again we expect a switch in oplog here
synchronized (freezeRoller) {
checkDiskStats();
assertDone = true;
freezeRoller.set(true);
freezeRoller.notifyAll();
}
region.close();
} catch (Exception e) {
e.printStackTrace();
fail("Test failed due to exception" + e);
} finally {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
CacheObserverHolder.setInstance(old);
synchronized (freezeRoller) {
assertDone = true;
freezeRoller.set(true);
freezeRoller.notifyAll();
}
}
}
/**
* Tests reduction in size of disk stats when the oplog is rolled.
*/
@Test
public void testStatsSizeReductionOnRolling() throws Exception {
final int MAX_OPLOG_SIZE = 500 * 2;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setCompactionThreshold(100);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0]}, new int[] {4000});
final byte[] val = new byte[333];
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
final DiskRegion dr = ((LocalRegion) region).getDiskRegion();
// calculate sizes
final int extra_byte_num_per_entry =
InternalDataSerializer.calculateBytesForTSandDSID(getDSID((LocalRegion) region));
final int key3_size =
CompactOfflineDiskStoreJUnitTest.getSize4Create(extra_byte_num_per_entry, "key3", val);
final int tombstone_key1 =
CompactOfflineDiskStoreJUnitTest.getSize4TombstoneWithKey(extra_byte_num_per_entry, "key1");
final int tombstone_key2 =
CompactOfflineDiskStoreJUnitTest.getSize4TombstoneWithKey(extra_byte_num_per_entry, "key2");
CountDownLatch putsCompleted = new CountDownLatch(1);
// TODO: move static methods from CompactOfflineDiskStoreJUnitTest to shared util class
StatSizeTestCacheObserverAdapter testObserver = new StatSizeTestCacheObserverAdapter(dr,
key3_size, tombstone_key1, tombstone_key2, putsCompleted);
CacheObserver old = CacheObserverHolder.setInstance(testObserver);
try {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
cache.getLogger().info("putting key1");
region.put("key1", val);
checkDiskStats();
cache.getLogger().info("putting key2");
region.put("key2", val);
checkDiskStats();
cache.getLogger().info("removing key1");
region.remove("key1");
cache.getLogger().info("removing key2");
region.remove("key2");
// This put will cause a switch as max-oplog size (900) will be exceeded (999)
testObserver.setSwitchExpected();
cache.getLogger().info("putting key3");
region.put("key3", val);
putsCompleted.countDown();
cache.getLogger().info("waiting for compaction");
await().until(() -> testObserver.hasCompacted());
assertFalse(testObserver.exceptionOccured());
region.close();
} finally {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
CacheObserverHolder.setInstance(old);
}
}
@Test
public void testUnPreblowOnRegionCreate() throws Exception {
final int MAX_OPLOG_SIZE = 20000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0]}, new int[] {40000});
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
try {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
for (int i = 0; i < 10; ++i) {
region.put("key-" + i, "value-");
}
assertEquals(18000, getOplogFileSizeSum(dirs[0], ".crf"));
assertEquals(2000, getOplogFileSizeSum(dirs[0], ".drf"));
// make a copy of inflated crf. use this to replace compacted crf to
// simulate incomplete diskStore close
File[] files = dirs[0].listFiles();
for (File file : files) {
if (file.getName().endsWith(".crf") || file.getName().endsWith(".drf")) {
File inflated = new File(file.getAbsolutePath() + "_inflated");
FileUtils.copyFile(file, inflated);
}
}
cache.close();
assertTrue(500 > getOplogFileSizeSum(dirs[0], ".crf"));
assertTrue(100 > getOplogFileSizeSum(dirs[0], ".drf"));
// replace compacted crf with inflated crf and remove krf
files = dirs[0].listFiles();
for (File file : files) {
String name = file.getName();
if (name.endsWith(".krf") || name.endsWith(".crf") || name.endsWith(".drf")) {
file.delete();
}
}
for (File file : files) {
String name = file.getName();
if (name.endsWith("_inflated")) {
assertTrue(file.renameTo(new File(file.getAbsolutePath().replace("_inflated", ""))));
}
}
assertEquals(18000, getOplogFileSizeSum(dirs[0], ".crf"));
assertEquals(2000, getOplogFileSizeSum(dirs[0], ".drf"));
createCache();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
for (int i = 10; i < 20; ++i) {
region.put("key-" + i, "value-");
}
int sizeCrf = getOplogFileSizeSum(dirs[0], ".crf");
assertTrue("crf too big:" + sizeCrf, sizeCrf < 18000 + 500);
assertTrue("crf too small:" + sizeCrf, sizeCrf > 18000);
int sizeDrf = getOplogFileSizeSum(dirs[0], ".drf");
assertTrue("drf too big:" + sizeDrf, sizeDrf < 2000 + 100);
assertTrue("drf too small:" + sizeDrf, sizeDrf > 2000);
// test that region recovery does not cause unpreblow
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
assertEquals(sizeCrf, getOplogFileSizeSum(dirs[0], ".crf"));
assertEquals(sizeDrf, getOplogFileSizeSum(dirs[0], ".drf"));
} finally {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
}
private int getOplogFileSizeSum(File dir, String type) {
int sum = 0;
File[] files = dir.listFiles();
for (File file : files) {
String name = file.getName();
if (name.endsWith(type)) {
sum += file.length();
}
}
return sum;
}
@Test
public void testMagicSeqPresence() throws Exception {
final int MAX_OPLOG_SIZE = 200;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(true);
diskProps.setSynchronous(true);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0]}, new int[] {4000});
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
// 3 types of oplog files will be verified
verifyOplogHeader(dirs[0], ".if", ".crf", ".drf");
try {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
IntStream.range(0, 20).forEach(i -> region.put("key-" + i, "value-" + i));
// krf is created, so 4 types of oplog files will be verified
verifyOplogHeader(dirs[0], ".if", ".crf", ".drf", ".krf");
region.close();
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
verifyOplogHeader(dirs[0], ".if", ".crf", ".drf", ".krf");
region.close();
} finally {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
}
private void verifyOplogHeader(File dir, String... oplogTypes) throws IOException {
await().until(() -> {
List<String> types = new ArrayList<>(Arrays.asList(oplogTypes));
Arrays.stream(dir.listFiles()).map(File::getName).map(f -> f.substring(f.indexOf(".")))
.forEach(types::remove);
return types.isEmpty();
});
File[] files = dir.listFiles();
HashSet<String> verified = new HashSet<>();
for (File file : files) {
String name = file.getName();
byte[] expect = new byte[Oplog.OPLOG_MAGIC_SEQ_REC_SIZE];
if (name.endsWith(".crf")) {
expect[0] = Oplog.OPLOG_MAGIC_SEQ_ID;
System.arraycopy(OPLOG_TYPE.CRF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen());
verified.add(".crf");
} else if (name.endsWith(".drf")) {
expect[0] = Oplog.OPLOG_MAGIC_SEQ_ID;
System.arraycopy(OPLOG_TYPE.DRF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen());
verified.add(".drf");
} else if (name.endsWith(".krf")) {
expect[0] = Oplog.OPLOG_MAGIC_SEQ_ID;
System.arraycopy(OPLOG_TYPE.KRF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen());
verified.add(".krf");
} else if (name.endsWith(".if")) {
expect[0] = DiskInitFile.OPLOG_MAGIC_SEQ_ID;
System.arraycopy(OPLOG_TYPE.IF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen());
verified.add(".if");
} else {
System.out.println("Ignored: " + file);
continue;
}
expect[expect.length - 1] = 21; // EndOfRecord
byte[] buf = new byte[Oplog.OPLOG_MAGIC_SEQ_REC_SIZE];
FileInputStream fis = new FileInputStream(file);
int count = fis.read(buf, 0, 8);
fis.close();
System.out.println("Verifying: " + file);
assertEquals("expected a read to return 8 but it returned " + count + " for file " + file, 8,
count);
assertTrue(Arrays.equals(expect, buf));
}
assertEquals(oplogTypes.length, verified.size());
}
/**
* Tests if without rolling the region size before close is same as after recreation
*/
@Test
public void testSizeStatsAfterRecreationInAsynchMode() throws Exception {
final int MAX_OPLOG_SIZE = 1000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(false);
diskProps.setBytesThreshold(800);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0], dirs[1]}, new int[] {4000, 4000});
final byte[] val = new byte[25];
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, diskProps);
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
try {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
for (int i = 0; i < 42; ++i) {
region.put("key" + i, val);
}
// need to wait for writes to happen before getting size
dr.flushForTesting();
long size1 = 0;
for (DirectoryHolder dh : dr.getDirectories()) {
size1 += dh.getDirStatsDiskSpaceUsage();
}
System.out.println("Size before close = " + size1);
region.close();
diskProps.setSynchronous(true);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
dr = ((LocalRegion) region).getDiskRegion();
long size2 = 0;
for (DirectoryHolder dh : dr.getDirectories()) {
size2 += dh.getDirStatsDiskSpaceUsage();
}
System.out.println("Size after recreation= " + size2);
assertEquals(size1, size2);
region.close();
} finally {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
}
@Test
public void testAsynchModeStatsBehaviour() throws Exception {
final int MAX_OPLOG_SIZE = 1000;
diskProps.setMaxOplogSize(MAX_OPLOG_SIZE);
diskProps.setPersistBackup(true);
diskProps.setRolling(false);
diskProps.setSynchronous(false);
diskProps.setBytesThreshold(800);
diskProps.setTimeInterval(Long.MAX_VALUE);
diskProps.setOverflow(false);
diskProps.setDiskDirsAndSizes(new File[] {dirs[0]}, new int[] {4000});
final byte[] val = new byte[25];
region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, diskProps);
DiskRegion dr = ((LocalRegion) region).getDiskRegion();
try {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
for (int i = 0; i < 4; ++i) {
region.put("key" + i, val);
}
// This test now has a race condition in it since
// async puts no longer increment disk space.
// It is not until a everything is flushed that we will know the disk size.
dr.flushForTesting();
checkDiskStats();
long size1 = 0;
for (DirectoryHolder dh : dr.getDirectories()) {
size1 += dh.getDirStatsDiskSpaceUsage();
}
System.out.println("Size before close = " + size1);
region.close();
diskProps.setSynchronous(true);
region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
dr = ((LocalRegion) region).getDiskRegion();
long size2 = 0;
for (DirectoryHolder dh : dr.getDirectories()) {
size2 += dh.getDirStatsDiskSpaceUsage();
}
System.out.println("Size after recreation= " + size2);
assertEquals(size1, size2);
region.close();
} finally {
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
}
private long diskSpaceUsageStats() {
return ((LocalRegion) region).getDiskRegion().getInfoFileDir().getDirStatsDiskSpaceUsage();
}
private long calculatedDiskSpaceUsageStats() {
return oplogSize();
}
private void checkDiskStats() {
long actualDiskStats = diskSpaceUsageStats();
long computedDiskStats = calculatedDiskSpaceUsageStats();
int tries = 0;
while (actualDiskStats != computedDiskStats && tries++ <= 100) {
// race conditions exist in which the stats change
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
actualDiskStats = diskSpaceUsageStats();
computedDiskStats = calculatedDiskSpaceUsageStats();
}
assertEquals(computedDiskStats, actualDiskStats);
}
private long oplogSize() {
long size = ((LocalRegion) region).getDiskRegion().getDiskStore().undeletedOplogSize.get();
Oplog[] opArray =
((LocalRegion) region).getDiskRegion().getDiskStore().getPersistentOplogs().getAllOplogs();
if (opArray != null) {
for (Oplog log : opArray) {
size += log.getOplogSize();
}
}
return size;
}
private int getDSID(LocalRegion lr) {
return lr.getDistributionManager().getDistributedSystemId();
}
private class StatSizeTestCacheObserverAdapter extends CacheObserverAdapter {
private final AtomicBoolean switchExpected = new AtomicBoolean(false);
private final DiskRegion dr;
private final AtomicBoolean hasCompacted = new AtomicBoolean(false);
private final int key3Size;
private final int tombstoneKey1;
private final int tombstoneKey2;
private final AtomicBoolean exceptionOccurred = new AtomicBoolean(true);
private volatile long spaceUsageBefore = -1;
private DirectoryHolder dh;
private final AtomicLong oplogsSize = new AtomicLong();
private final CountDownLatch putsCompleted;
StatSizeTestCacheObserverAdapter(DiskRegion dr, int key3Size, int tombstoneKey1,
int tombstoneKey2, CountDownLatch putsCompleted) {
this.dr = dr;
this.key3Size = key3Size;
this.tombstoneKey1 = tombstoneKey1;
this.tombstoneKey2 = tombstoneKey2;
this.putsCompleted = putsCompleted;
}
@Override
public void beforeSwitchingOplog() {
cache.getLogger().info("beforeSwitchingOplog");
if (!switchExpected.get()) {
fail("unexpected oplog switch");
}
if (spaceUsageBefore == -1) {
// only want to call this once; before the 1st oplog destroy
this.dh = dr.getNextDir();
this.spaceUsageBefore = this.dh.getDirStatsDiskSpaceUsage();
}
}
@Override
public void beforeDeletingCompactedOplog(Oplog oplog) {
cache.getLogger().info("beforeDeletingCompactedOplog");
oplogsSize.addAndGet(oplog.getOplogSize());
}
@Override
public void afterHavingCompacted() {
try {
putsCompleted.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
exceptionOccurred.set(true);
throw new RuntimeException(e);
}
cache.getLogger().info("afterHavingCompacted");
if (spaceUsageBefore > -1) {
hasCompacted.set(true);
long after = this.dh.getDirStatsDiskSpaceUsage();
// after compaction, in _2.crf, key3 is an create-entry,
// key1 and key2 are tombstones.
// _2.drf contained a rvvgc with drMap.size()==1
int expectedDrfSize = Oplog.OPLOG_DISK_STORE_REC_SIZE + Oplog.OPLOG_MAGIC_SEQ_REC_SIZE
+ Oplog.OPLOG_GEMFIRE_VERSION_REC_SIZE
+ CompactOfflineDiskStoreJUnitTest.getRVVSize(1, new int[] {0}, true);
int expectedCrfSize = Oplog.OPLOG_DISK_STORE_REC_SIZE + Oplog.OPLOG_MAGIC_SEQ_REC_SIZE
+ Oplog.OPLOG_GEMFIRE_VERSION_REC_SIZE
+ CompactOfflineDiskStoreJUnitTest.getRVVSize(1, new int[] {1}, false)
+ Oplog.OPLOG_NEW_ENTRY_BASE_REC_SIZE + key3Size + tombstoneKey1 + tombstoneKey2;
int oplog2Size = expectedDrfSize + expectedCrfSize;
if (after != oplog2Size) {
cache.getLogger().info("test failed before=" + spaceUsageBefore + " after=" + after
+ " expected=" + oplog2Size);
exceptionOccurred.set(true);
} else {
exceptionOccurred.set(false);
}
LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
}
}
boolean hasCompacted() {
return hasCompacted.get();
}
boolean exceptionOccured() {
return exceptionOccurred.get();
}
void setSwitchExpected() {
switchExpected.set(true);
}
}
}