| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Random; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import junit.framework.Assert; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import com.gemstone.gemfire.StatisticsFactory; |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.CacheWriterException; |
| import com.gemstone.gemfire.cache.CommitConflictException; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.DiskAccessException; |
| import com.gemstone.gemfire.cache.DiskStore; |
| import com.gemstone.gemfire.cache.DiskStoreFactory; |
| import com.gemstone.gemfire.cache.EntryEvent; |
| import com.gemstone.gemfire.cache.EntryNotFoundException; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.util.CacheWriterAdapter; |
| import com.gemstone.gemfire.internal.InternalDataSerializer; |
| import com.gemstone.gemfire.internal.cache.Oplog.OPLOG_TYPE; |
| import com.gemstone.gemfire.test.junit.categories.IntegrationTest; |
| |
| import dunit.DistributedTestCase; |
| import dunit.DistributedTestCase.WaitCriterion; |
| |
| /** |
| * Testing Oplog API's |
| * |
| * @author Asif |
| * @author Mitul |
| */ |
| @Category(IntegrationTest.class) |
| public class OplogJUnitTest extends DiskRegionTestingBase |
| { |
| boolean proceed = false; |
| |
| private final DiskRegionProperties diskProps = new DiskRegionProperties(); |
| |
| static final int OP_CREATE = 1; |
| |
| static final int OP_MODIFY = 2; |
| |
| static final int OP_DEL = 3; |
| |
| protected volatile static Random random = new Random(); |
| |
| protected long expectedOplogSize = Oplog.OPLOG_NEW_ENTRY_BASE_REC_SIZE; |
| |
| volatile int totalSuccessfulOperations = 0; |
| |
| protected int numCreate = 0; |
| |
| protected int numModify = 0; |
| |
| protected int numDel = 0; |
| |
| protected long delta; |
| |
| protected boolean flushOccuredAtleastOnce = false; |
| |
| volatile protected boolean assertDone = false; |
| |
| boolean failure = false; |
| |
| /** The key for entry */ |
| static final String KEY = "KEY1"; |
| |
| /** The initial value for key */ |
| static final String OLD_VALUE = "VAL1"; |
| |
| /** The updated value for key */ |
| static final String NEW_VALUE = "VAL2"; |
| |
| /** The value read from cache using LocalRegion.getValueOnDiskOrBuffer API */ |
| static volatile String valueRead = null; |
| |
| /** Boolean to indicate test to proceed for validation */ |
| static volatile boolean proceedForValidation = false; |
| |
| protected volatile Thread rollerThread = null; |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| super.setUp(); |
| diskProps.setDiskDirs(dirs); |
| DiskStoreImpl.SET_IGNORE_PREALLOCATE = true; |
| } |
| |
| @Override |
| @After |
| public void tearDown() throws Exception { |
| super.tearDown(); |
| DiskStoreImpl.SET_IGNORE_PREALLOCATE = false; |
| } |
| |
| /** |
| * Test method for 'com.gemstone.gemfire.internal.cache.Oplog.isBackup()' |
| */ |
| @Test |
| public void testIsBackup() |
| { |
| |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| if (!((LocalRegion)region).getDiskRegion().isBackup()) { |
| fail("Test persist backup not being correctly set for overflow and persist"); |
| } |
| closeDown(); |
| |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| if (((LocalRegion)region).getDiskRegion().isBackup()) { |
| fail("Test persist backup not being correctly set for overflow only mode"); |
| } |
| closeDown(); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| if (!((LocalRegion)region).getDiskRegion().isBackup()) { |
| fail("Test persist backup not being correctly set for persist only"); |
| } |
| closeDown(); |
| } |
| |
| /* |
| * Test method for 'com.gemstone.gemfire.internal.cache.Oplog.useSyncWrites()' |
| */ |
| @Test |
| public void testUseSyncWrites() |
| { |
| boolean result; |
| diskProps.setSynchronous(true); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| result = ((LocalRegion)region).getAttributes().isDiskSynchronous(); |
| if (!result) { |
| fail("Synchronous is false when it is supposed to be true"); |
| } |
| closeDown(); |
| |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| |
| result = ((LocalRegion)region).getAttributes().isDiskSynchronous(); |
| if (!result) { |
| fail("Synchronous is false when it is supposed to be true"); |
| } |
| closeDown(); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| |
| result = ((LocalRegion)region).getAttributes().isDiskSynchronous(); |
| if (!result) { |
| fail("Synchronous is false when it is supposed to be true"); |
| } |
| closeDown(); |
| |
| diskProps.setSynchronous(false); |
| region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| |
| result = ((LocalRegion)region).getAttributes().isDiskSynchronous(); |
| if (result) { |
| fail("Synchronous is true when it is supposed to be false"); |
| } |
| |
| closeDown(); |
| region = DiskRegionHelperFactory.getAsyncOverFlowOnlyRegion(cache, |
| diskProps); |
| |
| result = ((LocalRegion)region).getAttributes().isDiskSynchronous(); |
| if (result) { |
| fail("Synchronous is true when it is supposed to be false"); |
| } |
| closeDown(); |
| |
| region = DiskRegionHelperFactory |
| .getAsyncPersistOnlyRegion(cache, diskProps); |
| |
| result = ((LocalRegion)region).getAttributes().isDiskSynchronous(); |
| if (result) { |
| fail("Synchronous is true when it is supposed to be false"); |
| } |
| closeDown(); |
| } |
| |
| // @todo port testBufferOperations |
| /** |
| * Asif: Tests the correct behaviour of attributes like byte-threshhold, |
| * asynch thread wait time,etc. |
| * 'com.gemstone.gemfire.internal.cache.Oplog.bufferOperations()' |
| */ |
| // @Test |
| // public void testBufferOperations() |
| // { |
| // boolean result; |
| |
| // diskProps.setBytesThreshold(0); |
| // region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, |
| // diskProps); |
| // Oplog.WriterThread writer = ((LocalRegion)region).getDiskRegion() |
| // .getChild().getAsynchWriter(); |
| // long waitTime = writer.getAsynchThreadWaitTime(); |
| // long buffSize = writer.getBufferSize(); |
| // result = waitTime == writer.getDefaultAsynchThreadWaitTime() |
| // && buffSize == 0; |
| |
| // assertTrue("buffer operations is true when it is supposed to be false", |
| // result); |
| |
| // closeDown(); |
| |
| // region = DiskRegionHelperFactory.getAsyncOverFlowOnlyRegion(cache, |
| // diskProps); |
| // writer = ((LocalRegion)region).getDiskRegion().getChild().getAsynchWriter(); |
| // waitTime = writer.getAsynchThreadWaitTime(); |
| // buffSize = writer.getBufferSize(); |
| // result = waitTime == writer.getDefaultAsynchThreadWaitTime() |
| // && buffSize == 0; |
| |
| // assertTrue("buffer operations is true when it is supposed to be false", |
| // result); |
| // closeDown(); |
| |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| // writer = ((LocalRegion)region).getDiskRegion().getChild().getAsynchWriter(); |
| // waitTime = writer.getAsynchThreadWaitTime(); |
| // buffSize = writer.getBufferSize(); |
| // result = waitTime == writer.getDefaultAsynchThreadWaitTime() |
| // && buffSize == 0; |
| |
| // assertTrue("buffer operations is true when it is supposed to be false", |
| // result); |
| |
| // closeDown(); |
| |
| // diskProps.setBytesThreshold(100); |
| |
| // region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, |
| // diskProps); |
| |
| // writer = ((LocalRegion)region).getDiskRegion().getChild().getAsynchWriter(); |
| // waitTime = writer.getAsynchThreadWaitTime(); |
| // buffSize = writer.getBufferSize(); |
| // result = waitTime <= 0 && buffSize > 0; |
| // assertTrue("bufferoperations is false when it is supposed to be true", |
| // result); |
| |
| // closeDown(); |
| |
| // region = DiskRegionHelperFactory.getAsyncOverFlowOnlyRegion(cache, |
| // diskProps); |
| |
| // writer = ((LocalRegion)region).getDiskRegion().getChild().getAsynchWriter(); |
| // waitTime = writer.getAsynchThreadWaitTime(); |
| // buffSize = writer.getBufferSize(); |
| // result = waitTime <= 0 && buffSize > 0; |
| // assertTrue("baufferoperations is false when it is supposed to be true", |
| // result); |
| |
| // closeDown(); |
| |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| |
| // writer = ((LocalRegion)region).getDiskRegion().getChild().getAsynchWriter(); |
| // waitTime = writer.getAsynchThreadWaitTime(); |
| // buffSize = writer.getBufferSize(); |
| // result = waitTime <= 0 && buffSize > 0; |
| // assertTrue("baufferoperations is false when it is supposed to be true", |
| // result); |
| |
| // closeDown(); |
| // } |
| |
| /** |
| * Test method for 'com.gemstone.gemfire.internal.cache.Oplog.clear(File)' |
| */ |
| @Test |
| public void testClear() |
| { |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| putTillOverFlow(region); |
| region.clear(); |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| assertTrue(" failed in get OverflowAndPersist ", |
| region.get(new Integer(0)) == null); |
| closeDown(); |
| |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| putTillOverFlow(region); |
| region.clear(); |
| region.close(); |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| assertTrue(" failed in get OverflowOnly ", |
| region.get(new Integer(0)) == null); |
| closeDown(); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| put100Int(); |
| region.clear(); |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| assertTrue(" failed in get PersistOnly ", |
| region.get(new Integer(0)) == null); |
| closeDown(); |
| } |
| |
| /** |
| * Test method for 'com.gemstone.gemfire.internal.cache.Oplog.close()' |
| */ |
| @Test |
| public void testClose() |
| { |
| { |
| deleteFiles(); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| Oplog oplog = dr.testHook_getChild(); |
| long id = oplog.getOplogId(); |
| oplog.close(); |
| // lk should still exist since it locks DiskStore not just one oplog |
| //checkIfContainsFile(".lk"); |
| |
| StatisticsFactory factory = region.getCache().getDistributedSystem(); |
| Oplog newOplog = new Oplog(id, dr.getOplogSet(), new DirectoryHolder(factory, dirs[0], |
| 1000, 0)); |
| dr.getOplogSet().setChild(newOplog); |
| closeDown(); |
| } |
| { |
| deleteFiles(); |
| region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, |
| diskProps); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| dr.testHookCloseAllOverflowOplogs(); |
| // lk should still exist since it locks DiskStore not just one oplog |
| //checkIfContainsFile(".lk"); |
| checkIfContainsFile("OVERFLOW"); |
| closeDown(); |
| } |
| { |
| deleteFiles(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| Oplog oplog = dr.testHook_getChild(); |
| long id = oplog.getOplogId(); |
| oplog.close(); |
| // lk should still exist since it locks DiskStore not just one oplog |
| //checkIfContainsFile(".lk"); |
| StatisticsFactory factory = region.getCache().getDistributedSystem(); |
| Oplog newOplog = new Oplog(id, dr.getOplogSet(), new DirectoryHolder(factory, dirs[0], |
| 1000, 2)); |
| dr.setChild(newOplog); |
| closeDown(); |
| } |
| |
| } |
| |
| @Override |
| protected void closeDown() { |
| DiskRegion dr = null; |
| if (region != null) { |
| dr = ((LocalRegion)region).getDiskRegion(); |
| } |
| super.closeDown(); |
| if (dr != null) { |
| dr.getDiskStore().close(); |
| ((LocalRegion) region).getGemFireCache().removeDiskStore(dr.getDiskStore()); |
| } |
| } |
| |
| |
| |
| void checkIfContainsFile(String fileExtension) |
| { |
| for (int i = 0; i < 4; i++) { |
| File[] files = dirs[i].listFiles(); |
| for (int j = 0; j < files.length; j++) { |
| if (files[j].getAbsolutePath().endsWith(fileExtension)) { |
| fail("file "+ files[j] + " still exists after oplog.close()"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Test method for 'com.gemstone.gemfire.internal.cache.Oplog.destroy()' |
| */ |
| @Test |
| public void testDestroy() |
| { |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| put100Int(); |
| putTillOverFlow(region); |
| try { |
| region.destroy(new Integer(0)); |
| } |
| catch (EntryNotFoundException e1) { |
| logWriter.error("Exception occured", e1); |
| fail(" Entry not found when it was expected to be there"); |
| } |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| assertTrue(" failed in get OverflowAndPersist ", |
| region.get(new Integer(0)) == null); |
| closeDown(); |
| |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| put100Int(); |
| putTillOverFlow(region); |
| try { |
| region.destroy(new Integer(0)); |
| } |
| catch (EntryNotFoundException e1) { |
| logWriter.error("Exception occured", e1); |
| fail(" Entry not found when it was expected to be there"); |
| } |
| region.close(); |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| assertTrue(" failed in get OverflowOnly ", |
| region.get(new Integer(0)) == null); |
| |
| closeDown(); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| put100Int(); |
| try { |
| region.destroy(new Integer(0)); |
| } |
| catch (EntryNotFoundException e1) { |
| logWriter.error("Exception occured", e1); |
| fail(" Entry not found when it was expected to be there"); |
| } |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| assertTrue(" failed in get PersistOnly ", |
| region.get(new Integer(0)) == null); |
| closeDown(); |
| |
| } |
| |
| /** |
| * Test method for 'com.gemstone.gemfire.internal.cache.Oplog.remove(long)' |
| */ |
| @Test |
| public void testRemove() |
| { |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| putTillOverFlow(region); |
| region.remove(new Integer(0)); |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| assertTrue(" failed in get OverflowAndPersist ", |
| region.get(new Integer(0)) == null); |
| closeDown(); |
| |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| putTillOverFlow(region); |
| region.remove(new Integer(0)); |
| assertTrue(" failed in get OverflowOnly ", |
| region.get(new Integer(0)) == null); |
| region.close(); |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| closeDown(); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| put100Int(); |
| region.remove(new Integer(0)); |
| assertTrue(" failed in get PersistOnly ", |
| region.get(new Integer(0)) == null); |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| closeDown(); |
| |
| } |
| |
| // @todo: port testByteBufferCreationForCreateModifyAndDeleteOperation |
| /** |
| * This tests the final ByteBuffer object that gets created for synch/Asynch |
| * operation for a create / modify & Delete operation |
| * |
| * @author Asif |
| */ |
| // @Test |
| // public void testByteBufferCreationForCreateModifyAndDeleteOperation() |
| // { |
| // // Asif First create a persist only disk region which is of aysnch |
| // // & switch of OplOg type |
| // diskProps.setMaxOplogSize(1000); |
| // diskProps.setBytesThreshold(500); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setTimeInterval(-1); |
| // diskProps.setOverflow(false); |
| |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| // byte[] val = new byte[10]; |
| // for (int i = 0; i < 10; ++i) { |
| // val[i] = (byte)i; |
| // } |
| // region.put(new Integer(1), val); |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(1))); |
| // long opKey = entry.getDiskId().getKeyId(); |
| // // The final position in the Byte Buffer created in Asynch Op should be |
| // int createPos = 2 + 4 + val.length; |
| // if (opKey > Integer.MAX_VALUE) { |
| // createPos += 8; |
| // } |
| // else if (opKey > Short.MAX_VALUE) { |
| // createPos += 4; |
| // } |
| // else { |
| // createPos += 2; |
| // } |
| // createPos += 4; |
| // createPos += EntryEventImpl.serialize(new Integer(1)).length; |
| // DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| // Oplog.WriterThread writer = dr.getChild().getAsynchWriter(); |
| // Oplog.AsyncOp asynchOp = writer |
| // .getAsynchOpForEntryFromPendingFlushMap(entry.getDiskId()); |
| // ByteBuffer bb = asynchOp.getByteBuffer(); |
| // assertTrue(createPos == bb.position()); |
| // assertTrue(bb.limit() == bb.capacity()); |
| // byte val1[] = new byte[20]; |
| // for (int i = 0; i < 20; ++i) { |
| // val1[i] = (byte)i; |
| // } |
| // region.put(new Integer(1), val1); |
| // bb = writer.getAsynchOpForEntryFromPendingFlushMap(entry.getDiskId()) |
| // .getByteBuffer(); |
| // createPos += 10; |
| // assertTrue(createPos == bb.position()); |
| // assertTrue(bb.limit() == bb.capacity()); |
| // byte val2[] = new byte[30]; |
| // for (int i = 0; i < 30; ++i) { |
| // val2[i] = (byte)i; |
| // } |
| // region.put(new Integer(1), val2); |
| // bb = writer.getAsynchOpForEntryFromPendingFlushMap(entry.getDiskId()) |
| // .getByteBuffer(); |
| // createPos += 10; |
| // assertTrue(createPos == bb.position()); |
| // assertTrue(bb.limit() == bb.capacity()); |
| // long opSizeBeforeCreateRemove = dr.getChild().getOplogSize(); |
| // long pendingFlushSize = dr.getChild().getAsynchWriter() |
| // .getCurrentBufferedBytesSize(); |
| // region.put(new Integer(2), val2); |
| // DiskEntry entry2 = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(2))); |
| // bb = writer.getAsynchOpForEntryFromPendingFlushMap(entry2.getDiskId()) |
| // .getByteBuffer(); |
| // assertNotNull(bb); |
| // region.remove(new Integer(2)); |
| // assertNull(writer |
| // .getAsynchOpForEntryFromPendingFlushMap(entry2.getDiskId())); |
| // assertEquals(opSizeBeforeCreateRemove, dr.getChild().getOplogSize()); |
| // assertEquals(pendingFlushSize, dr.getChild().getAsynchWriter() |
| // .getCurrentBufferedBytesSize()); |
| |
| // closeDown(); |
| |
| // } |
| |
| /** |
| * Tests whether the data is written in the right format on the disk |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testFaultInOfValuesFromDisk() |
| { |
| try { |
| // Asif First create a persist only disk region which is of aysnch |
| // & switch of OplOg type |
| diskProps.setMaxOplogSize(1000); |
| |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(true); |
| diskProps.setTimeInterval(-1); |
| diskProps.setOverflow(false); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| byte[] val = new byte[10]; |
| for (int i = 0; i < 10; ++i) { |
| val[i] = (byte)i; |
| } |
| region.put(new Integer(1), val); |
| |
| DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| .basicGetEntry(new Integer(1))); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| |
| val = (byte[])dr.getNoBuffer(entry.getDiskId()); |
| for (int i = 0; i < 10; ++i) { |
| if (val[i] != (byte)i) { |
| fail("Test for fault in from disk failed"); |
| } |
| } |
| val = (byte[])DiskStoreImpl.convertBytesAndBitsIntoObject(dr |
| .getBytesAndBitsWithoutLock(entry.getDiskId(), true, false)); |
| for (int i = 0; i < 10; ++i) { |
| if (val[i] != (byte)i) { |
| fail("Test for fault in from disk failed"); |
| } |
| } |
| region.invalidate(new Integer(1)); |
| assertTrue(dr.getNoBuffer(entry.getDiskId()) == Token.INVALID); |
| |
| } |
| catch (Exception e) { |
| logWriter.error("Exception occured", e); |
| fail(e.toString()); |
| } |
| closeDown(); |
| } |
| |
| // @todo port testAsynchWriterTerminationOnSwitch |
| /** |
| * Tests the termination of asynch writer for an Oplog after the switch has |
| * been made |
| * |
| * @author Asif |
| */ |
| // @Test |
| // public void testAsynchWriterTerminationOnSwitch() |
| // { |
| // // & switch of OplOg type |
| // diskProps.setMaxOplogSize(23); |
| // diskProps.setBytesThreshold(0); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setTimeInterval(10000); |
| // diskProps.setOverflow(false); |
| // // diskProps.setDiskDirs(new File[]{new File("test1"), new |
| // // File("test2"), |
| // // new File("test3")}); |
| |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| // DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| // Oplog.WriterThread writer = dr.getChild().getAsynchWriter(); |
| // // Populate data just below the switch over threshhold |
| // byte[] val = new byte[5]; |
| // for (int i = 0; i < 5; ++i) { |
| // val[i] = (byte)i; |
| // } |
| |
| // region.put(new Integer(1), val); |
| |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(1))); |
| // long opKey = entry.getDiskId().getKeyId(); |
| // // The final position in the Byte Buffer created in Asynch Op should be |
| // int createPos = 2 + 4 + val.length; |
| // if (opKey > Integer.MAX_VALUE) { |
| // createPos += 8; |
| // } |
| // else if (opKey > Short.MAX_VALUE) { |
| // createPos += 4; |
| // } |
| // else { |
| // createPos += 2; |
| // } |
| // createPos += 4; |
| // createPos += EntryEventImpl.serialize(new Integer(1)).length; |
| // assertTrue(createPos == 22); |
| // region.put(new Integer(2), val); |
| // DistributedTestCase.join(writer.getThread(), 10 * 1000, null); |
| // closeDown(); |
| // } |
| |
| /** |
| * Tests the original ByteBufferPool gets transferred to the new Oplog for |
| * synch mode |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testByteBufferPoolTransferForSynchMode() |
| { |
| diskProps.setMaxOplogSize(1024); |
| diskProps.setBytesThreshold(0); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(true); |
| diskProps.setTimeInterval(10000); |
| diskProps.setOverflow(false); |
| // diskProps.setDiskDirs(new File[]{new File("test1"), new |
| // File("test2"), |
| // new File("test3")}); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| //assertNull(dr.getChild().getAsynchWriter()); |
| // Populate data just below the switch over threshhold |
| byte[] val = new byte[5]; |
| for (int i = 0; i < 5; ++i) { |
| val[i] = (byte)i; |
| } |
| |
| region.put(new Integer(1), val); |
| |
| ((LocalRegion)region).basicGetEntry(new Integer(1)); |
| Oplog old = dr.testHook_getChild(); |
| ByteBuffer oldWriteBuf = old.getWriteBuf(); |
| region.forceRolling(); // start a new oplog |
| region.put(new Integer(2), val); |
| Oplog switched = dr.testHook_getChild(); |
| assertTrue(old != switched); |
| assertEquals(dr.getDiskStore().persistentOplogs.getChild(2), switched); |
| assertEquals(oldWriteBuf, switched.getWriteBuf()); |
| assertEquals(null, old.getWriteBuf()); |
| closeDown(); |
| |
| } |
| |
| // @todo port this test if needed. ByteBufferPool code is going to change |
| /** |
| * Tests the ByteBufferPool usage during asynch mode operation & ensuring that |
| * GetOperation does not get corrupted data due to returing of ByetBuffer to |
| * the pool. There are 5 pre created pools in Oplog . Each pool has size of 1. |
| * Out of 5 pools , only one pool is used by the test. Thus there are 4 |
| * bytebuffers which will always be free. Thus if the asynch writer had |
| * initially 8 byte buffers only 4 will be released |
| * |
| * @author Asif |
| */ |
| // @Test |
| // public void testByteBufferPoolUsageForAsynchMode() |
| // { |
| // final int PRCREATED_POOL_NUM = 5; |
| // try { |
| // // Asif First create a persist only disk region which is of aysnch |
| // // & switch of OplOg type |
| // diskProps.setMaxOplogSize(1000); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setTimeInterval(-1); |
| // diskProps.setOverflow(false); |
| // final int byte_threshold = 500; |
| // diskProps.setBytesThreshold(byte_threshold); |
| // byte[] val = new byte[50]; |
| // region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, |
| // diskProps); |
| // for (int i = 0; i < 50; ++i) { |
| // val[i] = (byte)i; |
| // } |
| // region.put(new Integer(1), val); |
| // final int singleOpSize = evaluateSizeOfOperationForPersist( |
| // new Integer(1), val, ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(1))).getDiskId(), OP_CREATE); |
| |
| // final int loopCount = byte_threshold / singleOpSize + 1; |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| |
| // final Thread th = new Thread(new Runnable() { |
| // public void run() |
| // { |
| // takeRecursiveLockOnAllEntries(1); |
| // DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| // // Asif : Sleep for somemore time |
| // try { |
| // Thread.yield(); |
| // Thread.sleep(4000); |
| // } |
| // catch (InterruptedException ie) { |
| // logWriter.error("Exception occured", ie); |
| // failureCause = "No guarantee of vaildity of result hence failing. Exception = " |
| // + ie; |
| // testFailed = true; |
| // fail("No guarantee of vaildity of result hence failing. Exception = " |
| // + ie); |
| // } |
| |
| // // There shoudl beatleast one Pool which has active counts |
| // // as two |
| // Oplog.ByteBufferPool bbp = null; |
| // List pools = dr.getChild().getByteBufferPoolList(); |
| // Iterator itr = pools.iterator(); |
| // boolean found = false; |
| // while (itr.hasNext()) { |
| // bbp = (Oplog.ByteBufferPool)itr.next(); |
| // int len = bbp.getByteBufferHolderList().size(); |
| // if (len == (loopCount - (PRCREATED_POOL_NUM - 1))) { |
| // found = true; |
| // break; |
| // } |
| // } |
| |
| // if (!found) { |
| // testFailed = true; |
| // failureCause = "Test failed as the Asynch writer did not release ByetBuffer after get operation"; |
| // fail("Test failed as the Asynch writer did not release ByetBuffer after get operation"); |
| |
| // } |
| |
| // } |
| |
| // private void takeRecursiveLockOnAllEntries(int key) |
| // { |
| // // Get the DisKID |
| // DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| // if (key > loopCount) { |
| // // Interrupt the writer thread so as to start releasing |
| // // bytebuffer to pool |
| // //dr.getChild().getAsynchWriter().interrupt(); |
| // // Sleep for a while & check the active ByteBuffer |
| // // count. |
| // // It should be two |
| // try { |
| // Thread.yield(); |
| // Thread.sleep(5000); |
| // } |
| // catch (InterruptedException ie) { |
| // logWriter.error("Exception occured", ie); |
| // failureCause = "No guarantee of vaildity of result hence failing. Exception = " |
| // + ie; |
| // testFailed = true; |
| // fail("No guarantee of vaildity of result hence failing. Exception = " |
| // + ie); |
| // } |
| // // Check the number of ByteBuffers in the pool. |
| // List pools = dr.getChild().getByteBufferPoolList(); |
| // // There shoudl beatleast one Pool which has active |
| // // counts as two |
| // Oplog.ByteBufferPool bbp = null; |
| // Iterator itr = pools.iterator(); |
| // boolean found = true; |
| // int len = -1; |
| // while (itr.hasNext()) { |
| // bbp = (Oplog.ByteBufferPool)itr.next(); |
| // len = bbp.getByteBufferHolderList().size(); |
| // if (len > 1) { |
| // found = false; |
| // break; |
| // } |
| // } |
| // if (!found) { |
| // failureCause = "Test failed as the Asynch writer released ByteBuffer before get operation. The length of byte buffer pool is found to be greater than 0. the length is" |
| // + len; |
| // testFailed = true; |
| // fail("Test failed as the Asynch writer released ByteBuffer before get operation"); |
| // } |
| // } |
| // else { |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(key))); |
| // DiskId id = entry.getDiskId(); |
| |
| // synchronized (id) { |
| // takeRecursiveLockOnAllEntries(++key); |
| |
| // } |
| // } |
| // } |
| |
| // }); |
| |
| // CacheObserver old = CacheObserverHolder |
| // .setInstance(new CacheObserverAdapter() { |
| // public void afterWritingBytes() |
| // { |
| // // Asif Start a Thread & do a get in the thread without |
| // // releasing the |
| // // lock on dik ID |
| // th.start(); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.proceed = true; |
| // OplogJUnitTest.this.notify(); |
| // } |
| // try { |
| // th.join(30 * 1000); // Yes, really use Thread#join here |
| // fail("never interrupted"); |
| // } |
| // catch (InterruptedException ie) { |
| // // OK. Expected the interrupted Exception |
| // if (debug) |
| // System.out.println("Got the right exception"); |
| // } |
| |
| // } |
| // }); |
| |
| // int totalOpSize = singleOpSize; |
| // for (int j = 1; j < loopCount; ++j) { |
| // region.put(new Integer(j + 1), val); |
| // totalOpSize += evaluateSizeOfOperationForPersist(new Integer(j + 1), |
| // val, ((DiskEntry)((LocalRegion)region).basicGetEntry(new Integer( |
| // j + 1))).getDiskId(), OP_CREATE); |
| // } |
| // assertTrue(totalOpSize - byte_threshold <= singleOpSize); |
| |
| // if (!proceed) { |
| // synchronized (this) { |
| // if (!proceed) { |
| // this.wait(25000); |
| // if (!proceed) { |
| // fail("Test failed as no callback recieved from asynch writer"); |
| // } |
| // } |
| // } |
| // } |
| // DistributedTestCase.join(th, 30 * 1000, null); |
| // CacheObserverHolder.setInstance(old); |
| // } |
| // catch (Exception e) { |
| // logWriter.error("Exception occured", e); |
| // fail(e.toString()); |
| // } |
| // assertFalse(failureCause, testFailed); |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| // closeDown(); |
| // } |
| |
| // give the new oplog record format it is too hard for the test to calculate |
| // the expected size |
| // /** |
| // * @author Asif |
| // */ |
| // @Test |
| // public void testSynchModeConcurrentOperations() |
| // { |
| // final Map map = new HashMap(); |
| // diskProps.setMaxOplogSize(1024 * 1024 * 20); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(true); |
| // diskProps.setOverflow(false); |
| // final int THREAD_COUNT = 90; |
| |
| // final byte[] val = new byte[50]; |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps); |
| // for (int i = 1; i < 101; ++i) { |
| // map.put(new Integer(i), new Integer(i)); |
| // } |
| // Thread[] threads = new Thread[THREAD_COUNT]; |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // threads[i] = new Thread(new Runnable() { |
| |
| // public void run() |
| // { |
| // int sizeOfOp = 0; |
| // DiskId id = null; |
| // for (int j = 0; j < 50; ++j) { |
| // int keyNum = random.nextInt(10) + 1; |
| // Integer key = new Integer(keyNum); |
| // Integer intgr = (Integer)map.get(key); |
| // try { |
| // synchronized (intgr) { |
| |
| // region.create(key, val); |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(key)); |
| // id = entry.getDiskId(); |
| |
| // } |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist(key, |
| // val, id, OP_CREATE); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // ++OplogJUnitTest.this.numCreate; |
| // } |
| // } |
| // catch (EntryExistsException eee) { |
| // if (OplogJUnitTest.this.logWriter.finerEnabled()) { |
| // OplogJUnitTest.this.logWriter |
| // .finer("The entry already exists so this operation will not increase the size of oplog"); |
| // } |
| // } |
| // try { |
| // boolean isUpdate = false; |
| // synchronized (intgr) { |
| // isUpdate = region.containsKey(key); |
| // region.put(key, val); |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(key)); |
| // id = entry.getDiskId(); |
| // } |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist(key, |
| // val, id, (isUpdate ? OP_MODIFY : OP_CREATE)); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // if (!isUpdate) { |
| // ++OplogJUnitTest.this.numCreate; |
| // } |
| // else { |
| // ++OplogJUnitTest.this.numModify; |
| // } |
| // } |
| // } |
| // catch (EntryDestroyedException ede) { |
| // if (OplogJUnitTest.this.logWriter.finerEnabled()) { |
| // OplogJUnitTest.this.logWriter |
| // .finer("The entry already exists so this operation will not increase the size of oplog"); |
| // } |
| // } |
| |
| // boolean deleted = false; |
| // synchronized (intgr) { |
| // if (region.containsKey(key)) { |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(key)); |
| // id = entry.getDiskId(); |
| // region.remove(key); |
| // deleted = true; |
| // } |
| |
| // } |
| // if (deleted) { |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist(key, |
| // null, id, OP_DEL); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // ++OplogJUnitTest.this.numDel; |
| |
| // } |
| // } |
| |
| // } |
| |
| // } |
| |
| // }); |
| // threads[i].start(); |
| // } |
| |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // DistributedTestCase.join(threads[i], 30 * 1000, null); |
| // } |
| // long inMemOplogSize = 0; |
| // File opFile = null; |
| // try { |
| // opFile = ((LocalRegion)region).getDiskRegion().getChild().getOplogFile(); |
| // } |
| // catch (Exception e) { |
| // logWriter |
| // .error( |
| // "Exception in synching data present in the buffers of RandomAccessFile of Oplog, to the disk", |
| // e); |
| // fail("Test failed because synching of data present in buffer of RandomAccesFile "); |
| // } |
| // synchronized (opFile) { |
| // inMemOplogSize = ((LocalRegion)region).getDiskRegion().getChild().getOplogSize(); |
| // } |
| |
| // long actFileSize = 0; |
| // try { |
| |
| // actFileSize = ((LocalRegion)region).getDiskRegion().getChild().testGetOplogFileLength(); |
| // } |
| // catch (IOException e) { |
| |
| // fail("exception not expected" + e); |
| // fail("The test failed as the oplog could not eb synched to disk"); |
| // } |
| // assertEquals((this.numCreate + this.numDel + this.numModify), |
| // this.totalSuccessfulOperations); |
| // assertTrue(" The expected oplog size =" + inMemOplogSize |
| // + " Actual Oplog file size =" + actFileSize, |
| // inMemOplogSize == actFileSize); |
| // assertTrue(" The expected oplog size =" + this.expectedOplogSize |
| // + " In memeory Oplog size =" + inMemOplogSize, |
| // this.expectedOplogSize == inMemOplogSize); |
| // closeDown(); |
| |
| // } |
| |
| static int evaluateSizeOfOperationForPersist(Object key, byte[] val, |
| DiskId id, int OperationType) |
| { |
| int size = 1; |
| long opKey = id.getKeyId(); |
| switch (OperationType) { |
| case OP_CREATE: |
| size += 4 + EntryEventImpl.serialize(key).length + 1 + 4 + val.length; |
| break; |
| |
| case OP_MODIFY: |
| // @todo how do a know if the key needed to be serialized? |
| size += 1 + 4 + val.length + Oplog.bytesNeeded(Oplog.abs(opKey)); |
| break; |
| case OP_DEL: |
| size += Oplog.bytesNeeded(Oplog.abs(opKey)); |
| break; |
| } |
| return size; |
| |
| } |
| |
| // give the new oplog record format it is too hard for the test to calculate |
| // the expected size |
| // /** |
| // * Tests whether the switching of Oplog happens correctly without size |
| // * violation in case of concurrent region operations for synch mode. |
| // */ |
| // @Test |
| // public void testSwitchingForConcurrentSynchedOperations() |
| // { |
| // final Map map = new HashMap(); |
| // final int MAX_OPLOG_SIZE = 500; |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(true); |
| // diskProps.setOverflow(false); |
| // final int THREAD_COUNT = 5; |
| // final byte[] val = new byte[50]; |
| // final byte[] uval = new byte[1]; |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps); |
| // for (int i = 1; i < 101; ++i) { |
| // map.put(new Integer(i), new Integer(i)); |
| // } |
| // final AI uniqueCtr = CFactory.createAI(); |
| // Thread[] threads = new Thread[THREAD_COUNT]; |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // threads[i] = new Thread(new Runnable() { |
| // public void run() |
| // { |
| // int sizeOfOp = 0; |
| // DiskId id = null; |
| // for (int j = 0; j < 50; ++j) { |
| // int keyNum = random.nextInt(10) + 1; |
| // Integer key = new Integer(keyNum); |
| // Integer intgr = (Integer)map.get(key); |
| // try { |
| // String uniqueKey = "UK" + uniqueCtr.incrementAndGet(); |
| // // since the files for "empty" oplogs now get cleaned up early |
| // // create a unique key to keep this oplog alive. |
| // region.create(uniqueKey, uval); |
| // DiskEntry uentry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(uniqueKey)); |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist(uniqueKey, uval, uentry.getDiskId(), OP_CREATE); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // ++OplogJUnitTest.this.numCreate; |
| // } |
| |
| // synchronized (intgr) { |
| |
| // region.create(key, val); |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(key)); |
| // id = entry.getDiskId(); |
| |
| // } |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist(key, |
| // val, id, OP_CREATE); |
| |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // ++OplogJUnitTest.this.numCreate; |
| // } |
| // } |
| // catch (EntryExistsException eee) { |
| // if (logWriter.finerEnabled()) { |
| // logWriter |
| // .finer("The entry already exists so this operation will not increase the size of oplog"); |
| // } |
| // } |
| // try { |
| // boolean isUpdate = false; |
| // synchronized (intgr) { |
| // isUpdate = region.containsKey(key) && region.get(key) != null |
| // && region.get(key) != Token.DESTROYED; |
| // region.put(key, val); |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(key)); |
| // id = entry.getDiskId(); |
| // } |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist(key, |
| // val, id, (isUpdate ? OP_MODIFY : OP_CREATE)); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // if (!isUpdate) { |
| // ++OplogJUnitTest.this.numCreate; |
| // } |
| // else { |
| // ++OplogJUnitTest.this.numModify; |
| // } |
| // } |
| // } |
| // catch (EntryDestroyedException ede) { |
| // if (logWriter.finerEnabled()) { |
| // logWriter |
| // .finer("The entry already exists so this operation will not increase the size of oplog"); |
| // } |
| // } |
| |
| // boolean deleted = false; |
| // synchronized (intgr) { |
| |
| // if (region.containsKey(key) && region.get(key) != null |
| // && region.get(key) != Token.DESTROYED) { |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(key)); |
| // id = entry.getDiskId(); |
| // region.remove(key); |
| // deleted = true; |
| // } |
| |
| // } |
| // if (deleted) { |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist(key, |
| // null, id, OP_DEL); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // ++OplogJUnitTest.this.numDel; |
| |
| // } |
| // } |
| |
| // } |
| |
| // } |
| |
| // }); |
| // threads[i].start(); |
| // } |
| |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // DistributedTestCase.join(threads[i], 30 * 1000, null); |
| // } |
| |
| // long currentOplogID = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getOplogId(); |
| // assertTrue( |
| // " Switching did not happen, increase the iterations to insert more data ", |
| // currentOplogID > 1); |
| // long inMemOplogSize = 0; |
| |
| // for (int j = 1; j <= currentOplogID; ++j) { |
| |
| // Oplog oplog = ((LocalRegion)region).getDiskRegion().getChild(j); |
| // // if (j < currentOplogID) { |
| // // // oplogs are now closed to save memory and file descriptors |
| // // // once they are no longer needed |
| // // assertEquals(null, oplog); |
| // // } else { |
| // inMemOplogSize += oplog.getOplogSize(); |
| // logWriter.info(" Oplog size="+ oplog.getOplogSize() + " Max Oplog size acceptable="+MAX_OPLOG_SIZE ); |
| // assertTrue( |
| // " The max Oplog Size limit is violated when taken the inmemory oplog size", |
| // oplog.getOplogSize() <= MAX_OPLOG_SIZE); |
| |
| // // File opFile = null; |
| // try { |
| // oplog.getOplogFile(); |
| // } |
| // catch (Exception e) { |
| // logWriter |
| // .error( |
| // "Exception in synching data present in the buffers of RandomAccessFile of Oplog, to the disk", |
| // e); |
| // fail("Test failed because synching of data present in buffer of RandomAccesFile "); |
| // } |
| |
| // assertTrue( |
| // " The max Oplog Size limit is violated when taken the actual file size", |
| // oplog.getActualFileLength() <= MAX_OPLOG_SIZE); |
| // assertEquals(oplog.getOplogSize(), oplog.getActualFileLength()); |
| // // } |
| // } |
| |
| // inMemOplogSize += ((LocalRegion)region).getDiskRegion().getDiskStore().undeletedOplogSize.get(); |
| |
| // assertTrue(" The sum of all oplogs size as expected =" |
| // + this.expectedOplogSize + " Actual sizes of all oplogs =" |
| // + inMemOplogSize, this.expectedOplogSize == inMemOplogSize); |
| |
| // assertEquals((this.numCreate + this.numDel + this.numModify), |
| // this.totalSuccessfulOperations); |
| // closeDown(); |
| |
| // } |
| |
| // give the new oplog record format it is too hard for the test to calculate |
| // the expected size |
| // /** |
| // * Tests whether the switching of Oplog happens correctly without size |
| // * violation in case of concurrent region operations for asynch mode. |
| // * |
| // * @author Asif |
| // */ |
| // @Test |
| // public void testSwitchingForConcurrentASynchedOperations() |
| // { |
| // final int MAX_OPLOG_SIZE = 500; |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setOverflow(false); |
| // diskProps.setBytesThreshold(100); |
| // final int THREAD_COUNT = 40; |
| // final byte[] val = new byte[50]; |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| |
| // Thread[] threads = new Thread[THREAD_COUNT]; |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // final int threadNum = (i + 1); |
| // threads[i] = new Thread(new Runnable() { |
| // public void run() |
| // { |
| // int sizeOfOp = 0; |
| // DiskId id = null; |
| // try { |
| // region.create(new Integer(threadNum), val); |
| // } |
| |
| // catch (EntryExistsException e) { |
| // e.printStackTrace(); |
| // testFailed = true; |
| // failureCause = "Entry existed with key =" + threadNum; |
| // fail("Entry existed with key =" + threadNum); |
| // } |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(threadNum))); |
| // id = entry.getDiskId(); |
| |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist( |
| // new Integer(threadNum), val, id, OP_CREATE); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // ++OplogJUnitTest.this.numCreate; |
| // } |
| |
| // } |
| |
| // }); |
| |
| // threads[i].start(); |
| // } |
| |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // DistributedTestCase.join(threads[i], 30 * 1000, null); |
| // } |
| |
| // long currentOplogID = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getOplogId(); |
| // assertTrue( |
| // " Switching did not happen, increase the iterations to insert more data ", |
| // currentOplogID > 1); |
| // if (debug) |
| // System.out.print("Total number of oplogs created = " + currentOplogID); |
| // long inMemOplogSize = 0; |
| |
| // for (int j = 1; j <= currentOplogID; ++j) { |
| // Oplog oplog = ((LocalRegion)region).getDiskRegion().getChild(j); |
| // // if (j < currentOplogID) { |
| // // // oplogs are now closed to save memory and file descriptors |
| // // // once they are no longer needed |
| // // assertEquals(null, oplog); |
| // // } else { |
| // inMemOplogSize += oplog.getOplogSize(); |
| // //oplog.forceFlush(); |
| // assertTrue( |
| // " The max Oplog Size limit is violated when taken the inmemory oplog size", |
| // oplog.getOplogSize() <= MAX_OPLOG_SIZE); |
| // // File opFile = null; |
| // try { |
| // oplog.getOplogFile(); |
| // } |
| // catch (Exception e) { |
| // logWriter |
| // .error( |
| // "Exception in synching data present in the buffers of RandomAccessFile of Oplog, to the disk", |
| // e); |
| // fail("Test failed because synching of data present in buffer of RandomAccesFile "); |
| // } |
| // assertTrue( |
| // " The max Oplog Size limit is violated when taken the actual file size", |
| // oplog.getActualFileLength() <= MAX_OPLOG_SIZE); |
| // assertEquals(oplog.getOplogSize(), oplog.getActualFileLength()); |
| // // } |
| // } |
| |
| // inMemOplogSize += ((LocalRegion)region).getDiskRegion().getDiskStore().undeletedOplogSize.get(); |
| |
| // assertTrue(" The sum of all oplogs size as expected =" |
| // + this.expectedOplogSize + " Actual sizes of all oplogs =" |
| // + inMemOplogSize, this.expectedOplogSize == inMemOplogSize); |
| // assertEquals((this.numCreate + this.numDel + this.numModify), |
| // this.totalSuccessfulOperations); |
| // assertFalse(failureCause, testFailed); |
| // closeDown(); |
| |
| // } |
| |
| // /** |
| // * @author Asif |
| // */ |
| // @Test |
| // public void testAsyncWriterTerminationAfterSwitch() |
| // { |
| // final int MAX_OPLOG_SIZE = 500; |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setOverflow(false); |
| // diskProps.setBytesThreshold(100); |
| // final int THREAD_COUNT = 40; |
| // final byte[] val = new byte[50]; |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| |
| // Thread[] threads = new Thread[THREAD_COUNT]; |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // final int threadNum = (i + 1); |
| // threads[i] = new Thread(new Runnable() { |
| // public void run() |
| // { |
| // int sizeOfOp = 0; |
| // DiskId id = null; |
| // try { |
| // region.create(new Integer(threadNum), val); |
| // } |
| |
| // catch (EntryExistsException e) { |
| // testFailed = true; |
| // failureCause = "Entry existed with key =" + threadNum; |
| // fail("Entry existed with key =" + threadNum); |
| // } |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(threadNum))); |
| // id = entry.getDiskId(); |
| |
| // sizeOfOp = OplogJUnitTest.evaluateSizeOfOperationForPersist( |
| // new Integer(threadNum), val, id, OP_CREATE); |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.expectedOplogSize += sizeOfOp; |
| // ++OplogJUnitTest.this.totalSuccessfulOperations; |
| // ++OplogJUnitTest.this.numCreate; |
| // } |
| |
| // } |
| |
| // }); |
| |
| // threads[i].start(); |
| // } |
| |
| // for (int i = 0; i < THREAD_COUNT; ++i) { |
| // DistributedTestCase.join(threads[i], 30 * 1000, null); |
| // } |
| |
| // long currentOplogID = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getOplogId(); |
| // assertTrue( |
| // " Switching did not happen, increase the iterations to insert more data ", |
| // currentOplogID > 1); |
| |
| // for (int j = 1; j < currentOplogID; ++j) { |
| // Oplog oplog = ((LocalRegion)region).getDiskRegion().getChild(j); |
| // // if (oplog != null) { |
| // // DistributedTestCase.join(oplog.getAsynchWriter().getThread(), 10 * 1000, null); |
| // // } |
| // } |
| // assertFalse(failureCause, testFailed); |
| // closeDown(); |
| |
| // } |
| |
| // /** |
| // * @author Asif |
| // */ |
| // @Test |
| // public void testMultipleByteBuffersASynchOperations() |
| // { |
| // final int MAX_OPLOG_SIZE = 100000; |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setOverflow(false); |
| // diskProps.setBytesThreshold(1000); |
| // Oplog.testSetMaxByteBufferSize(100); |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| // final int OP_COUNT = 40; |
| // final byte[] val = new byte[50]; |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| // CacheObserver old = CacheObserverHolder |
| // .setInstance(new CacheObserverAdapter() { |
| // public void afterWritingBytes() |
| // { |
| |
| // synchronized (OplogJUnitTest.this) { |
| // flushOccuredAtleastOnce = true; |
| // OplogJUnitTest.this.notify(); |
| // } |
| |
| // } |
| // }); |
| |
| // int sizeOfOp = 0; |
| // DiskId id = null; |
| // for (int i = 0; i < OP_COUNT; ++i) { |
| // try { |
| // region.create(new Integer(i), val); |
| // DiskEntry entry = ((DiskEntry)((LocalRegion)region) |
| // .basicGetEntry(new Integer(i))); |
| // id = entry.getDiskId(); |
| // sizeOfOp += evaluateSizeOfOperationForPersist(new Integer(i), val, id, |
| // OP_CREATE); |
| |
| // } |
| |
| // catch (EntryExistsException e) { |
| // fail("Entry existed with key =" + i); |
| // } |
| // } |
| // Oplog currOplog = ((LocalRegion)region).getDiskRegion().getChild(); |
| // long currentOplogID = currOplog.getOplogId(); |
| // long expectedSize = currOplog.getOplogSize(); |
| // // Ensure that now switching has happned during the operations |
| // assertEquals(1, currentOplogID); |
| // assertTrue( |
| // "The number of operations did not cause asynch writer to run atleast once , the expected file size = " |
| // + expectedSize, expectedSize > 1000); |
| // if (!flushOccuredAtleastOnce) { |
| // synchronized (this) { |
| // if (!flushOccuredAtleastOnce) { |
| // try { |
| // this.wait(20000); |
| // } |
| // catch (InterruptedException e) { |
| // fail("No guarantee as flushed occure deven once.Exception=" + e); |
| // } |
| // } |
| // } |
| // } |
| // if (!flushOccuredAtleastOnce) { |
| // fail("In the wait duration , flush did not occur even once. Try increasing the wait time"); |
| // } |
| // long actualFileSize = 0L; |
| |
| // try { |
| // actualFileSize = currOplog.getFileChannel().position(); |
| // } |
| // catch (IOException e) { |
| // fail(e.toString()); |
| // } |
| |
| // assertTrue( |
| // "The number of operations did not cause asynch writer to run atleast once as the actual file size = " |
| // + actualFileSize, actualFileSize >= 1000); |
| // //currOplog.forceFlush(); |
| // // File opFile = null; |
| // try { |
| // currOplog.getOplogFile(); |
| // } |
| // catch (Exception e) { |
| // logWriter |
| // .error( |
| // "Exception in synching data present in the buffers of RandomAccessFile of Oplog, to the disk", |
| // e); |
| // fail("Test failed because synching of data present in buffer of RandomAccesFile "); |
| // } |
| // actualFileSize = currOplog.getActualFileLength(); |
| // assertTrue( |
| // " The expected Oplog Size not equal to the actual file size. Expected size=" |
| // + expectedSize + " actual size = " + actualFileSize, |
| // expectedSize == actualFileSize); |
| // Oplog.testSetMaxByteBufferSize(Integer.MAX_VALUE); |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| // CacheObserverHolder.setInstance(old); |
| // closeDown(); |
| |
| // } |
| |
| /** |
| * Tests the bug which arises in case of asynch mode during oplog switching |
| * caused by conflation of create/destroy operation.The bug occurs if a create |
| * operation is followed by destroy but before destroy proceeds some other |
| * operation causes oplog switching |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testBug34615() |
| { |
| final int MAX_OPLOG_SIZE = 100; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(false); |
| diskProps.setOverflow(false); |
| diskProps.setBytesThreshold(150); |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| final byte[] val = new byte[50]; |
| region = DiskRegionHelperFactory |
| .getAsyncPersistOnlyRegion(cache, diskProps); |
| final CacheObserver old = CacheObserverHolder |
| .setInstance(new CacheObserverAdapter() { |
| @Override |
| public void afterConflation(ByteBuffer orig, ByteBuffer conflated) |
| { |
| Thread th = new Thread(new Runnable() { |
| public void run() |
| { |
| region.put("2", new byte[75]); |
| } |
| }); |
| assertNull(conflated); |
| th.start(); |
| DistributedTestCase.join(th, 30 * 1000, null); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| |
| } |
| }); |
| |
| region.put("1", val); |
| region.remove("1"); |
| assertFalse(failureCause, testFailed); |
| CacheObserverHolder.setInstance(old); |
| closeDown(); |
| |
| } |
| |
| /** |
| * @author Asif |
| */ |
| @Test |
| public void testConflation() throws Exception { |
| |
| final int MAX_OPLOG_SIZE = 1000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(false); |
| diskProps.setOverflow(false); |
| diskProps.setBytesThreshold(1500); |
| |
| final byte[] val = new byte[50]; |
| final byte[][] bb = new byte[2][]; |
| bb[0] = new byte[5]; |
| region = DiskRegionHelperFactory |
| .getAsyncPersistOnlyRegion(cache, diskProps); |
| try { |
| region.put("1", val); |
| region.put("1", new byte[10]); |
| region.put("2", val); |
| region.put("2", new byte[100]); |
| region.create("3", null); |
| region.put("3", new byte[10]); |
| region.create("4", null); |
| region.put("4", new byte[0]); |
| |
| // tests for byte[][] |
| region.create("5", bb); |
| region.put("6", val); |
| region.put("6", bb); |
| region.create("7", null); |
| region.put("7", bb); |
| |
| region.create("8", new byte[9]); |
| region.invalidate("8"); |
| region.create("9", new byte[0]); |
| region.invalidate("9"); |
| |
| region.create("10", new byte[9]); |
| region.localInvalidate("10"); |
| region.create("11", new byte[0]); |
| region.localInvalidate("11"); |
| |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| dr.flushForTesting(); |
| byte[] val_1 = ((byte[])((LocalRegion)region).getValueOnDisk("1")); |
| assertEquals(val_1.length, 10); |
| byte[] val_2 = ((byte[])((LocalRegion)region).getValueOnDisk("2")); |
| assertEquals(val_2.length, 100); |
| byte[] val_3 = ((byte[])((LocalRegion)region).getValueOnDisk("3")); |
| assertEquals(val_3.length, 10); |
| byte[] val_4 = ((byte[])((LocalRegion)region).getValueOnDisk("4")); |
| assertEquals(val_4.length, 0); |
| byte[][] val_5 = (byte[][])((LocalRegion)region).getValueOnDisk("5"); |
| assertEquals(val_5.length, 2); |
| assertEquals(val_5[0].length, 5); |
| assertNull(val_5[1]); |
| byte[][] val_6 = (byte[][])((LocalRegion)region).getValueOnDisk("6"); |
| assertEquals(val_6.length, 2); |
| assertEquals(val_6[0].length, 5); |
| assertNull(val_6[1]); |
| byte[][] val_7 = (byte[][])((LocalRegion)region).getValueOnDisk("7"); |
| assertEquals(val_7.length, 2); |
| assertEquals(val_7[0].length, 5); |
| assertNull(val_7[1]); |
| Object val_8 = ((LocalRegion)region).getValueOnDisk("8"); |
| assertEquals(val_8, Token.INVALID); |
| Object val_9 = ((LocalRegion)region).getValueOnDisk("9"); |
| assertEquals(val_9, Token.INVALID); |
| Object val_10 = ((LocalRegion)region).getValueOnDisk("10"); |
| assertEquals(val_10, Token.LOCAL_INVALID); |
| Object val_11 = ((LocalRegion)region).getValueOnDisk("11"); |
| assertEquals(val_11, Token.LOCAL_INVALID); |
| |
| } catch (Exception e) { |
| logWriter.error("Exception occured", e); |
| //fail("The test failed due to exception = " + e); |
| throw e; |
| } finally { |
| closeDown(); |
| } |
| } |
| |
| /** |
| * This tests the retrieval of empty byte array when present in asynch buffers |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testGetEmptyByteArrayInAsynchBuffer() |
| { |
| final int MAX_OPLOG_SIZE = 1000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(false); |
| diskProps.setOverflow(false); |
| diskProps.setBytesThreshold(1500); |
| |
| final byte[] val = new byte[50]; |
| region = DiskRegionHelperFactory |
| .getAsyncPersistOnlyRegion(cache, diskProps); |
| try { |
| region.put("1", val); |
| region.put("1", new byte[0]); |
| byte[] val_1 = ((byte[])((LocalRegion)region).getValueOnDiskOrBuffer("1")); |
| assertEquals(val_1.length, 0); |
| } |
| catch (Exception e) { |
| logWriter.error("Exception occured", e); |
| fail("The test failed due to exception = " + e); |
| } |
| closeDown(); |
| } |
| |
| /** |
| * This tests the retrieval of empty byte array in synch mode |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testGetEmptyByteArrayInSynchMode() |
| { |
| final int MAX_OPLOG_SIZE = 1000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setBytesThreshold(1500); |
| |
| final byte[] val = new byte[50]; |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| try { |
| region.put("1", val); |
| region.put("1", new byte[0]); |
| byte[] val_1 = ((byte[])((LocalRegion)region).getValueOnDiskOrBuffer("1")); |
| assertEquals(val_1.length, 0); |
| } |
| catch (Exception e) { |
| logWriter.error("Exception occured", e); |
| fail("The test failed due to exception = " + e); |
| } |
| closeDown(); |
| } |
| |
| /** |
| * This tests the bug which caused the oplogRoller to attempt to roll a |
| * removed entry whose value is Token.Removed This bug can occur if a remove |
| * operation causes oplog switching & hence roller thread gets notified, & the |
| * roller thread obtains the iterator of the concurrent region map before the |
| * remove |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testBug34702() |
| { |
| final int MAX_OPLOG_SIZE = 500*2; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| final byte[] val = new byte[200]; |
| proceed = false; |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| region.put("key1", val); |
| region.put("key2", val); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| final CacheObserver old = CacheObserverHolder |
| .setInstance(new CacheObserverAdapter() { |
| |
| @Override |
| public void afterSettingOplogOffSet(long offset) |
| { |
| ((LocalRegion)region).getDiskRegion().forceRolling(); |
| // Let the operation thread yield to the Roller so that |
| // it is able to obtain the iterator of the concurrrent region map |
| // & thus get the reference to the entry which will contain |
| // value as Token.Removed as the entry though removed from |
| // concurrent |
| // map still will be available to the roller |
| Thread.yield(); |
| // Sleep for some time |
| try { |
| Thread.sleep(5000); |
| } |
| catch (InterruptedException e) { |
| testFailed = true; |
| failureCause = "No guarantee that test is succesful"; |
| fail("No guarantee that test is succesful"); |
| } |
| } |
| |
| @Override |
| public void afterHavingCompacted() |
| { |
| proceed = true; |
| synchronized (OplogJUnitTest.this) { |
| OplogJUnitTest.this.notify(); |
| } |
| } |
| }); |
| try { |
| region.destroy("key1"); |
| region.destroy("key2"); |
| } |
| catch (Exception e1) { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| CacheObserverHolder.setInstance(old); |
| fail("Test failed as entry deletion threw exception. Exception = " + e1); |
| } |
| // Wait for some time & check if the after having rolled callabck |
| // is issued sucessfully or not. |
| if (!proceed) { |
| synchronized (this) { |
| if (!proceed) { |
| try { |
| this.wait(20000); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| // The test will automatically fail due to proceed flag |
| } |
| } |
| } |
| } |
| assertFalse(failureCause, testFailed); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| CacheObserverHolder.setInstance(old); |
| |
| if (!proceed) { |
| fail("Test failed as afterHavingCompacted callabck not issued even after sufficient wait"); |
| } |
| closeDown(); |
| |
| } |
| |
| /** |
| * tests a potential deadlock situation if the operation causing a swithcing |
| * of Oplog is waiting for roller to free space. The problem can arise if the |
| * operation causing Oplog switching is going on an Entry , which already has |
| * its oplog ID referring to the Oplog being switched. In such case, when the |
| * roller will try to roll the entries referencing the current oplog , it will |
| * not be able to acquire the lock on the entry as the switching thread has |
| * already taken a lock on it. |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testRollingDeadlockSituation() |
| { |
| final int MAX_OPLOG_SIZE = 2000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 1400 }); |
| final byte[] val = new byte[500]; |
| proceed = false; |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL); |
| region.put("key1", val); |
| region.put("key1", val); |
| try { |
| region.put("key1", val); |
| } |
| catch (DiskAccessException dae) { |
| logWriter.error("Exception occured", dae); |
| fail("Test failed as DiskAccessException was encountered where as the operation should ideally have proceeded without issue . exception = " |
| + dae); |
| } |
| } |
| |
| /** |
| * This tests whether an empty byte array is correctly writtem to the disk as |
| * a zero value length operation & hence the 4 bytes field for recording the |
| * value length is absent & also since the value length is zero no byte for it |
| * should also get added. Similary during recover from HTree as well as Oplog , |
| * the empty byte array should be read correctly |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testEmptyByteArrayPutAndRecovery() |
| { |
| CacheObserver old = CacheObserverHolder |
| .setInstance(new CacheObserverAdapter() { |
| @Override |
| public void afterConflation(ByteBuffer origBB, ByteBuffer conflatedBB) |
| { |
| if ((2 + 4 + 1 + EntryEventImpl.serialize("key1").length) != origBB |
| .capacity()) { |
| failureCause = "For a backup region, addition of an empty array should result in an offset of 6 bytes where as actual offset is =" |
| + origBB.capacity(); |
| testFailed = true; |
| } |
| Assert |
| .assertTrue( |
| "For a backup region, addition of an empty array should result in an offset of 6 bytes where as actual offset is =" |
| + origBB.capacity(), (2 + 4 + 1 + EntryEventImpl |
| .serialize("key1").length) == origBB.capacity()); |
| |
| } |
| }); |
| try { |
| final int MAX_OPLOG_SIZE = 2000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| // diskProps.setRolling(true); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 1400 }); |
| final byte[] val = new byte[0]; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| region.put("key1", val); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| byte[] _val = (byte[])region.get("key1"); |
| assertTrue( |
| "value of key1 after restarting the region is not an empty byte array. This may indicate problem in reading from Oplog", |
| _val.length == 0); |
| if (this.logWriter.infoEnabled()) { |
| this.logWriter |
| .info("After first region close & opening again no problems encountered & hence Oplog has been read successfully."); |
| this.logWriter |
| .info("Closing the region again without any operation done, would indicate that next time data will be loaded from HTree ."); |
| } |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| _val = (byte[])region.get("key1"); |
| assertTrue( |
| "value of key1 after restarting the region is not an empty byte array. This may indicate problem in reading from HTRee", |
| _val.length == 0); |
| assertFalse(failureCause, testFailed); |
| // region.close(); |
| |
| } |
| finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| CacheObserverHolder.setInstance(old); |
| |
| } |
| } |
| |
| /** |
| * This is used to test bug 35012 where a remove operation on a key gets |
| * unrecorded due to switching of Oplog if it happens just after the remove |
| * operation has destroyed the in memory entry & is about to acquire the |
| * readlock in DiskRegion to record the same. If the Oplog has switched during |
| * that duration , the bug would appear |
| * |
| * @author Asif |
| */ |
| |
| @Test |
| public void testBug35012() |
| { |
| final int MAX_OPLOG_SIZE = 500; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| final byte[] val = new byte[200]; |
| try { |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| |
| region.put("key1", val); |
| region.put("key2", val); |
| region.put("key3", val); |
| final Thread th = new Thread( |
| new Runnable() { |
| public void run() { |
| region.remove("key1"); |
| } |
| } |
| ); |
| // main thread acquires the write lock |
| ((LocalRegion)region).getDiskRegion().acquireWriteLock(); |
| try { |
| th.start(); |
| Thread.yield(); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| dr.testHook_getChild().forceRolling(dr); |
| } |
| finally { |
| ((LocalRegion)region).getDiskRegion().releaseWriteLock(); |
| } |
| DistributedTestCase.join(th, 30 * 1000, null); |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| assertEquals(region.size(), 2); |
| } |
| catch (Exception e) { |
| this.logWriter.error("Exception occurred ", e); |
| fail("The test could not be completed because of exception .Exception=" |
| + e); |
| } |
| closeDown(); |
| |
| } |
| |
| /** |
| * Tests the various configurable parameters used by the ByteBufferPool . The |
| * behaviour of parameters is based on the mode of DiskRegion ( synch or |
| * asynch) . Pls refer to the class documentation ( Oplog.ByteBufferPool) for |
| * the exact behaviour of the class |
| * |
| * @author Asif |
| */ |
| // @Test |
| // public void testByteBufferPoolParameters() |
| // { |
| // // If the mode is asynch , the ByteBuffer obtained should e non direct else |
| // // direct |
| // final int MAX_OPLOG_SIZE = 500; |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(true); |
| // diskProps.setOverflow(false); |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps); |
| // List bbPools = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getByteBufferPoolList(); |
| // ByteBuffer bb = ((Oplog.ByteBufferPool)bbPools.get(1)).getBufferFromPool(); |
| // assertTrue(" ByteBuffer is not of type direct", bb.isDirect()); |
| // region.destroyRegion(); |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setOverflow(false); |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| // bbPools = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getByteBufferPoolList(); |
| // bb = ((Oplog.ByteBufferPool)bbPools.get(1)).getBufferFromPool(); |
| // assertTrue(" ByteBuffer is not of type direct", bb.isDirect()); |
| // region.close(); |
| // // Test max pool limit & wait time ( valid only in synch mode). |
| // diskProps.setSynchronous(true); |
| // diskProps.setRegionName("testRegion"); |
| // System.setProperty("/testRegion_MAX_POOL_SIZE", "1"); |
| |
| // System.setProperty("/testRegion_WAIT_TIME", "4000"); |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps); |
| // bbPools = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getByteBufferPoolList(); |
| // bb = ((Oplog.ByteBufferPool)bbPools.get(1)).getBufferFromPool(); |
| // assertTrue("Since the Pool has one Entry , it should be direct", bb |
| // .isDirect()); |
| |
| // long t1 = System.currentTimeMillis(); |
| // bb = ((Oplog.ByteBufferPool)bbPools.get(1)).getBufferFromPool(); |
| // long t2 = System.currentTimeMillis(); |
| // assertTrue( |
| // "Since the Pool should have been exhausted hence non direct byte buffer should have been returned", |
| // !bb.isDirect()); |
| // assertTrue("The wait time for ByteBuffer pool was not respected ", |
| // (t2 - t1) > 3000); |
| // region.close(); |
| // // // In case of asynch mode , the upper limit should not have been imposed |
| // // System.setProperty("/testRegion_MAX_POOL_SIZE", "1"); |
| // // System.setProperty("/testRegion_WAIT_TIME", "5000"); |
| // // diskProps.setSynchronous(false); |
| // // diskProps.setRegionName("testRegion"); |
| // // region = DiskRegionHelperFactory |
| // // .getAsyncPersistOnlyRegion(cache, diskProps); |
| // // bbPools = ((LocalRegion)region).getDiskRegion().getChild() |
| // // .getByteBufferPoolList(); |
| // // bb = ((Oplog.ByteBufferPool)bbPools.get(1)).getBufferFromPool(); |
| // // t1 = System.currentTimeMillis(); |
| // // bb = ((Oplog.ByteBufferPool)bbPools.get(1)).getBufferFromPool(); |
| // // t2 = System.currentTimeMillis(); |
| // // assertTrue( |
| // // "There should not have been any wait time " + (t2-t1) + " for ByteBuffer pool ", |
| // // (t2 - t1) / 1000 < 3); |
| // // region.close(); |
| // System.setProperty("/testRegion_MAX_POOL_SIZE", "2"); |
| // System.setProperty("/testRegion_WAIT_TIME", "5000"); |
| // diskProps.setSynchronous(true); |
| // diskProps.setRegionName("testRegion"); |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps); |
| // bbPools = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getByteBufferPoolList(); |
| // Oplog.ByteBufferPool pool = (Oplog.ByteBufferPool)bbPools.get(1); |
| // ByteBuffer bb1 = pool.getBufferFromPool(); |
| // ByteBuffer bb2 = pool.getBufferFromPool(); |
| // assertEquals(2, pool.getTotalBuffers()); |
| // assertEquals(2, pool.getBuffersInUse()); |
| // ((LocalRegion)region).getDiskRegion().getChild().releaseBuffer(bb1); |
| // ((LocalRegion)region).getDiskRegion().getChild().releaseBuffer(bb2); |
| // assertEquals(0, pool.getBuffersInUse()); |
| // region.close(); |
| |
| // System.setProperty("/testRegion_MAX_POOL_SIZE", "1"); |
| // System.setProperty("/testRegion_WAIT_TIME", "1000"); |
| // diskProps.setSynchronous(true); |
| // diskProps.setRegionName("testRegion"); |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps); |
| // bbPools = ((LocalRegion)region).getDiskRegion().getChild() |
| // .getByteBufferPoolList(); |
| // pool = (Oplog.ByteBufferPool)bbPools.get(1); |
| // bb1 = pool.getBufferFromPool(); |
| // bb2 = pool.getBufferFromPool(); |
| // assertEquals(1, pool.getTotalBuffers()); |
| // assertEquals(1, pool.getBuffersInUse()); |
| // ((LocalRegion)region).getDiskRegion().getChild().releaseBuffer(bb1); |
| // ((LocalRegion)region).getDiskRegion().getChild().releaseBuffer(bb2); |
| // assertEquals(0, pool.getBuffersInUse()); |
| // closeDown(); |
| |
| // } |
| |
| /** |
| * Tests the ByteBuffer Pool operations for release of ByteBuffers in case the |
| * objects being put vary in size & hence use ByteBuffer Pools present at |
| * different indexes |
| * |
| * @author Asif |
| */ |
| // @Test |
| // public void testByteBufferPoolReleaseBugTest() |
| // { |
| |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(true); |
| // diskProps.setOverflow(false); |
| // System.setProperty("/testRegion_UNIT_BUFF_SIZE", "100"); |
| // System.setProperty("/testRegion_UNIT_BUFF_SIZE", "100"); |
| // System.setProperty("gemfire.log-level", getGemFireLogLevel()); |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps); |
| // region.put("key1", new byte[900]); |
| // region.put("key1", new byte[700]); |
| // closeDown(); |
| |
| // } |
| |
| /** |
| * Tests if buffer size & time are not set , the asynch writer gets awakened |
| * on time basis of default 1 second |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testAsynchWriterAttribBehaviour1() |
| { |
| DiskStoreFactory dsf = cache.createDiskStoreFactory(); |
| ((DiskStoreFactoryImpl)dsf).setMaxOplogSizeInBytes(10000); |
| File dir = new File("testingDirectoryDefault"); |
| dir.mkdir(); |
| dir.deleteOnExit(); |
| File[] dirs = { dir }; |
| dsf.setDiskDirs(dirs); |
| AttributesFactory factory = new AttributesFactory(); |
| final long t1 = System.currentTimeMillis(); |
| DiskStore ds = dsf.create("test"); |
| factory.setDiskSynchronous(false); |
| factory.setDiskStoreName(ds.getName()); |
| factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| factory.setScope(Scope.LOCAL); |
| try { |
| region = cache.createVMRegion("test", factory.createRegionAttributes()); |
| } |
| catch (Exception e1) { |
| logWriter.error("Test failed due to exception", e1); |
| fail("Test failed due to exception " + e1); |
| |
| } |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserver old = CacheObserverHolder.setInstance( |
| |
| new CacheObserverAdapter() { |
| private long t2; |
| @Override |
| public void goingToFlush() |
| { |
| t2 = System.currentTimeMillis(); |
| delta = t2 - t1; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| synchronized (OplogJUnitTest.this) { |
| OplogJUnitTest.this.notify(); |
| } |
| } |
| }); |
| |
| region.put("key1", "111111111111"); |
| synchronized (this) { |
| if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { |
| try { |
| this.wait(10000); |
| } |
| catch (InterruptedException e) { |
| logWriter.error("Test failed due to exception", e); |
| fail("Test failed due to exception " + e); |
| |
| } |
| assertFalse(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER); |
| } |
| } |
| CacheObserverHolder.setInstance(old); |
| // Windows clock has an accuracy of 15 ms. Accounting for the same. |
| assertTrue( |
| "delta is in miilliseconds=" + delta, |
| delta >= 985); |
| closeDown(); |
| } |
| |
| /** |
| * Tests if buffer size is set but time is not set , the asynch writer gets |
| * awakened on buffer size basis |
| * |
| * @author Asif |
| */ |
| public void DARREL_DISABLE_testAsynchWriterAttribBehaviour2() |
| { |
| DiskStoreFactory dsf = cache.createDiskStoreFactory(); |
| ((DiskStoreFactoryImpl)dsf).setMaxOplogSizeInBytes(10000); |
| dsf.setQueueSize(2); |
| File dir = new File("testingDirectoryDefault"); |
| dir.mkdir(); |
| dir.deleteOnExit(); |
| File[] dirs = { dir }; |
| dsf.setDiskDirs(dirs); |
| AttributesFactory factory = new AttributesFactory(); |
| DiskStore ds = dsf.create("test"); |
| factory.setDiskSynchronous(false); |
| factory.setDiskStoreName(ds.getName()); |
| factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| factory.setScope(Scope.LOCAL); |
| try { |
| region = cache.createVMRegion("test", factory.createRegionAttributes()); |
| } |
| catch (Exception e1) { |
| logWriter.error("Test failed due to exception", e1); |
| fail("Test failed due to exception " + e1); |
| |
| } |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserver old = CacheObserverHolder.setInstance( |
| |
| new CacheObserverAdapter() { |
| |
| @Override |
| public void goingToFlush() |
| { |
| synchronized (OplogJUnitTest.this) { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| OplogJUnitTest.this.notify(); |
| } |
| } |
| }); |
| |
| region.put("key1", new byte[25]); |
| try { |
| Thread.sleep(1000); |
| } |
| catch (InterruptedException e) { |
| logWriter.error("Test failed due to exception", e); |
| fail("Test failed due to exception " + e); |
| } |
| assertTrue(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER); |
| region.put("key2", new byte[25]); |
| synchronized (this) { |
| if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { |
| try { |
| OplogJUnitTest.this.wait(10000); |
| assertFalse(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER); |
| } |
| catch (InterruptedException e2) { |
| logWriter.error("Test failed due to exception", e2); |
| fail("Test failed due to exception " + e2); |
| } |
| } |
| } |
| CacheObserverHolder.setInstance(old); |
| closeDown(); |
| } |
| |
| /** |
| * Tests if buffer size & time interval are explicitly set to zero then the |
| * flush will occur due to asynchForceFlush or due to switching of Oplog |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testAsynchWriterAttribBehaviour3() |
| { |
| DiskStoreFactory dsf = cache.createDiskStoreFactory(); |
| ((DiskStoreFactoryImpl)dsf).setMaxOplogSizeInBytes(500); |
| dsf.setQueueSize(0); |
| dsf.setTimeInterval(0); |
| File dir = new File("testingDirectoryDefault"); |
| dir.mkdir(); |
| dir.deleteOnExit(); |
| File[] dirs = { dir }; |
| dsf.setDiskDirs(dirs); |
| AttributesFactory factory = new AttributesFactory(); |
| DiskStore ds = dsf.create("test"); |
| factory.setDiskSynchronous(false); |
| factory.setDiskStoreName(ds.getName()); |
| factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| factory.setScope(Scope.LOCAL); |
| try { |
| region = cache.createVMRegion("test", factory.createRegionAttributes()); |
| } |
| catch (Exception e1) { |
| logWriter.error("Test failed due to exception", e1); |
| fail("Test failed due to exception " + e1); |
| |
| } |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserver old = CacheObserverHolder.setInstance( |
| |
| new CacheObserverAdapter() { |
| |
| @Override |
| public void goingToFlush() |
| { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| synchronized (OplogJUnitTest.this) { |
| OplogJUnitTest.this.notify(); |
| } |
| } |
| }); |
| try { |
| region.put("key1", new byte[100]); |
| region.put("key2", new byte[100]); |
| region.put("key3", new byte[100]); |
| region.put("key4", new byte[100]); |
| region.put("key5", new byte[100]); |
| Thread.sleep(1000); |
| assertTrue(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER); |
| } |
| catch (Exception e) { |
| logWriter.error("Test failed due to exception", e); |
| fail("Test failed due to exception " + e); |
| } |
| region.forceRolling(); |
| synchronized (this) { |
| if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { |
| try { |
| OplogJUnitTest.this.wait(10000); |
| } |
| catch (InterruptedException e2) { |
| logWriter.error("Test failed due to exception", e2); |
| fail("Test failed due to exception " + e2); |
| } |
| } |
| } |
| assertFalse(LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER); |
| CacheObserverHolder.setInstance(old); |
| closeDown(); |
| } |
| |
| /** |
| * Tests if the preblowing of a file with size greater than the disk space |
| * available so that preblowing results in IOException , is able to recover |
| * without problem |
| * |
| * @author Asif |
| */ |
| //Now we preallocate spaces for if files and also crfs and drfs. So the below test is not valid |
| // any more. See revision: r42359 and r42320. So disabling this test. |
| public void _testPreblowErrorCondition() |
| { |
| DiskStoreFactory dsf = cache.createDiskStoreFactory(); |
| ((DiskStoreFactoryImpl)dsf).setMaxOplogSizeInBytes(100000000L * 1024L * 1024L * 1024L); |
| dsf.setAutoCompact(false); |
| File dir = new File("testingDirectoryDefault"); |
| dir.mkdir(); |
| dir.deleteOnExit(); |
| File[] dirs = { dir }; |
| int size[] = new int[] { Integer.MAX_VALUE }; |
| dsf.setDiskDirsAndSizes(dirs, size); |
| AttributesFactory factory = new AttributesFactory(); |
| logWriter.info("<ExpectedException action=add>" |
| + "Could not pregrow" |
| + "</ExpectedException>"); |
| try { |
| DiskStore ds = dsf.create("test"); |
| factory.setDiskStoreName(ds.getName()); |
| factory.setDiskSynchronous(true); |
| factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| factory.setScope(Scope.LOCAL); |
| try { |
| region = cache.createVMRegion("test", factory.createRegionAttributes()); |
| } |
| catch (Exception e1) { |
| logWriter.error("Test failed due to exception", e1); |
| fail("Test failed due to exception " + e1); |
| |
| } |
| region.put("key1", new byte[900]); |
| byte[] val = null; |
| try { |
| val = (byte[])((LocalRegion)region).getValueOnDisk("key1"); |
| } |
| catch (Exception e) { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| fail(e.toString()); |
| } |
| assertTrue(val.length == 900); |
| } finally { |
| logWriter.info("<ExpectedException action=remove>" |
| + "Could not pregrow" |
| + "</ExpectedException>"); |
| } |
| closeDown(); |
| } |
| |
| /** |
| * Tests if the byte buffer pool in asynch mode tries to contain the pool size |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testByteBufferPoolContainment() |
| { |
| |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setMaxOplogSize(1024 * 1024); |
| diskProps.setSynchronous(false); |
| diskProps.setOverflow(false); |
| diskProps.setBytesThreshold(10); // this is now item count |
| diskProps.setTimeInterval(0); |
| region = DiskRegionHelperFactory |
| .getAsyncPersistOnlyRegion(cache, diskProps); |
| final byte[] val = new byte[1000]; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| |
| CacheObserverHolder |
| .setInstance(new CacheObserverAdapter() { |
| |
| @Override |
| public void goingToFlush() |
| { // Delay flushing |
| |
| assertEquals(10, region.size()); |
| for (int i = 10; i < 20; ++i) { |
| region.put("" + i, val); |
| } |
| synchronized (OplogJUnitTest.this) { |
| OplogJUnitTest.this.notify(); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| |
| } |
| }); |
| for (int i = 0; i < 10; ++i) { |
| region.put("" + i, val); |
| } |
| try { |
| synchronized (OplogJUnitTest.this) { |
| if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { |
| OplogJUnitTest.this.wait(9000); |
| assertEquals(false, LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER); |
| } |
| } |
| |
| } |
| catch (InterruptedException ie) { |
| fail("interrupted"); |
| } |
| //((LocalRegion)region).getDiskRegion().getChild().forceFlush(); |
| // int x = ((LocalRegion)region).getDiskRegion().getChild().getAsynchWriter() |
| // .getApproxFreeBuffers(); |
| // assertEquals(10, x); |
| } |
| |
| // we no longer have a pendingFlushMap |
| // /** |
| // * This test does the following: <br> |
| // * 1)Create a diskRegion with async mode and byte-threshold as 25 bytes. <br> |
| // * 2)Put an entry into the region such that the async-buffer is just over 25 |
| // * bytes and the writer-thread is invoked. <br> |
| // * 3)Using CacheObserver.afterSwitchingWriteAndFlushMaps callback, perform a |
| // * put on the same key just after the async writer thread swaps the |
| // * pendingFlushMap and pendingWriteMap for flushing. <br> |
| // * 4)Using CacheObserver.afterWritingBytes, read the value for key |
| // * (LocalRegion.getValueOnDiskOrBuffer) just after the async writer thread has |
| // * flushed to the disk. <br> |
| // * 5) Verify that the value read in step3 is same as the latest value. This |
| // * will ensure that the flushBufferToggle flag is functioning as expected ( It |
| // * prevents the writer thread from setting the oplog-offset in diskId if that |
| // * particular entry has been updated by a put-thread while the |
| // * async-writer-thread is flushing that entry.) |
| // * |
| // * @throws Exception |
| // */ |
| // @Test |
| // public void testFlushBufferToggleFlag() throws Exception |
| // { |
| // final int MAX_OPLOG_SIZE = 100000; |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(false); |
| // diskProps.setSynchronous(false); |
| // diskProps.setOverflow(false); |
| // diskProps.setBytesThreshold(25); |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| |
| // region = DiskRegionHelperFactory |
| // .getAsyncPersistOnlyRegion(cache, diskProps); |
| // CacheObserver old = CacheObserverHolder |
| // .setInstance(new CacheObserverAdapter() { |
| // public void afterWritingBytes() |
| // { |
| // LocalRegion localregion = (LocalRegion)region; |
| // try { |
| // valueRead = (String)localregion.getValueOnDiskOrBuffer(KEY); |
| // synchronized (OplogJUnitTest.class) { |
| // proceedForValidation = true; |
| // OplogJUnitTest.class.notify(); |
| // } |
| // } |
| // catch (EntryNotFoundException e) { |
| // e.printStackTrace(); |
| // } |
| // } |
| |
| // public void afterSwitchingWriteAndFlushMaps() |
| // { |
| // region.put(KEY, NEW_VALUE); |
| // } |
| |
| // }); |
| |
| // region.put(KEY, OLD_VALUE); |
| |
| // if (!proceedForValidation) { |
| // synchronized (OplogJUnitTest.class) { |
| // if (!proceedForValidation) { |
| // try { |
| // OplogJUnitTest.class.wait(9000); |
| // assertEquals(true, proceedForValidation); |
| // } |
| // catch (InterruptedException e) { |
| // fail("interrupted"); |
| // } |
| // } |
| // } |
| // } |
| |
| // cache.getLogger().info("valueRead : " + valueRead); |
| // assertEquals("valueRead is stale, doesnt match with latest PUT", NEW_VALUE, |
| // valueRead); |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| // CacheObserverHolder.setInstance(old); |
| // closeDown(); |
| // } |
| |
| /** |
| * tests async stats are correctly updated |
| */ |
| @Test |
| public void testAsyncStats() throws InterruptedException |
| { |
| diskProps.setBytesThreshold(101); |
| diskProps.setTimeInterval(1000000); |
| region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| final DiskStoreStats dss = ((LocalRegion)region).getDiskRegion().getDiskStore().getStats(); |
| WaitCriterion evFull = new WaitCriterion() { |
| public boolean done() { |
| return dss.getQueueSize() == 100; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| WaitCriterion ev = new WaitCriterion() { |
| public boolean done() { |
| return dss.getQueueSize() == 0; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| WaitCriterion ev2 = new WaitCriterion() { |
| public boolean done() { |
| return dss.getFlushes() == 100; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| WaitCriterion ev3 = new WaitCriterion() { |
| public boolean done() { |
| return dss.getFlushes() == 200; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| |
| assertEquals(0, dss.getQueueSize()); |
| put100Int(); |
| DistributedTestCase.waitForCriterion(evFull, 2 * 1000, 200, true); |
| assertEquals(0, dss.getFlushes()); |
| region.writeToDisk(); |
| DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true); |
| DistributedTestCase.waitForCriterion(ev2, 1000, 200, true); |
| put100Int(); |
| DistributedTestCase.waitForCriterion(evFull, 2 * 1000, 200, true); |
| region.writeToDisk(); |
| DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true); |
| DistributedTestCase.waitForCriterion(ev3, 1000, 200, true); |
| closeDown(); |
| } |
| |
| /** |
| * Tests delayed creation of DiskID in overflow only mode |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testDelayedDiskIdCreationInOverflowOnlyMode() |
| { |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setMaxOplogSize(1024 * 1024); |
| diskProps.setSynchronous(false); |
| diskProps.setOverflow(true); |
| diskProps.setBytesThreshold(10000); |
| diskProps.setTimeInterval(0); |
| diskProps.setOverFlowCapacity(1); |
| region = DiskRegionHelperFactory.getAsyncOverFlowOnlyRegion(cache, |
| diskProps); |
| final byte[] val = new byte[1000]; |
| region.put("1", val); |
| DiskEntry entry = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| assertTrue(entry instanceof AbstractDiskLRURegionEntry); |
| assertNull(entry.getDiskId()); |
| region.put("2", val); |
| assertNotNull(entry.getDiskId()); |
| entry = (DiskEntry)((LocalRegion)region).basicGetEntry("2"); |
| assertTrue(entry instanceof AbstractDiskLRURegionEntry); |
| assertNull(entry.getDiskId()); |
| } |
| |
| /** |
| * Tests immediate creation of DiskID in overflow With Persistence mode |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testImmediateDiskIdCreationInOverflowWithPersistMode() |
| { |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setMaxOplogSize(1024 * 1024); |
| diskProps.setSynchronous(false); |
| diskProps.setOverflow(true); |
| diskProps.setBytesThreshold(10000); |
| diskProps.setTimeInterval(0); |
| diskProps.setOverFlowCapacity(1); |
| region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| final byte[] val = new byte[1000]; |
| region.put("1", val); |
| DiskEntry entry = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| assertTrue(entry instanceof AbstractDiskLRURegionEntry); |
| assertNotNull(entry.getDiskId()); |
| region.put("2", val); |
| assertNotNull(entry.getDiskId()); |
| entry = (DiskEntry)((LocalRegion)region).basicGetEntry("2"); |
| assertTrue(entry instanceof AbstractDiskLRURegionEntry); |
| assertNotNull(entry.getDiskId()); |
| } |
| |
| /** |
| * An entry which is evicted to disk will have the flag already written to |
| * disk, appropriately set |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testEntryAlreadyWrittenIsCorrectlyUnmarkedForOverflowOnly() |
| throws Exception |
| { |
| try { |
| diskProps.setPersistBackup(false); |
| diskProps.setRolling(false); |
| diskProps.setMaxOplogSize(1024 * 1024); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(true); |
| diskProps.setBytesThreshold(10000); |
| diskProps.setTimeInterval(0); |
| diskProps.setOverFlowCapacity(1); |
| region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, |
| diskProps); |
| final byte[] val = new byte[1000]; |
| region.put("1", val); |
| region.put("2", val); |
| // "1" should now be on disk |
| region.get("1"); |
| // "2" should now be on disk |
| DiskEntry entry1 = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| DiskId did1 = entry1.getDiskId(); |
| DiskId.isInstanceofOverflowIntOplogOffsetDiskId(did1); |
| assertTrue(!did1.needsToBeWritten()); |
| region.put("1", "3"); |
| assertTrue(did1.needsToBeWritten()); |
| region.put("2", val); |
| DiskEntry entry2 = (DiskEntry)((LocalRegion)region).basicGetEntry("2"); |
| DiskId did2 = entry2.getDiskId(); |
| assertTrue(!did2.needsToBeWritten() || !did1.needsToBeWritten()); |
| tearDown(); |
| setUp(); |
| diskProps.setPersistBackup(false); |
| diskProps.setRolling(false); |
| long opsize = Integer.MAX_VALUE; |
| opsize += 100L; |
| diskProps.setMaxOplogSize(opsize); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(true); |
| diskProps.setBytesThreshold(10000); |
| diskProps.setTimeInterval(0); |
| diskProps.setOverFlowCapacity(1); |
| region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, |
| diskProps); |
| region.put("1", val); |
| region.put("2", val); |
| region.get("1"); |
| entry1 = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| did1 = entry1.getDiskId(); |
| DiskId.isInstanceofOverflowOnlyWithLongOffset(did1); |
| assertTrue(!did1.needsToBeWritten()); |
| region.put("1", "3"); |
| assertTrue(did1.needsToBeWritten()); |
| region.put("2", "3"); |
| did2 = entry2.getDiskId(); |
| assertTrue(!did2.needsToBeWritten() || !did1.needsToBeWritten()); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail(e.toString()); |
| } |
| } |
| |
| /** |
| * An persistent or overflow with persistence entry which is evicted to disk, |
| * will have the flag already written to disk, appropriately set |
| * |
| * @author Asif |
| */ |
| @Test |
| public void testEntryAlreadyWrittenIsCorrectlyUnmarkedForOverflowWithPersistence() |
| { |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setMaxOplogSize(1024 * 1024); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(true); |
| diskProps.setBytesThreshold(10000); |
| diskProps.setTimeInterval(0); |
| diskProps.setOverFlowCapacity(1); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| final byte[] val = new byte[1000]; |
| region.put("1", val); |
| DiskEntry entry1 = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| DiskId did1 = entry1.getDiskId(); |
| DiskId.isInstanceofPersistIntOplogOffsetDiskId(did1); |
| assertTrue(!did1.needsToBeWritten()); |
| region.put("2", val); |
| assertTrue(!did1.needsToBeWritten()); |
| } |
| |
| /** |
| * Tests the various DiskEntry.Helper APIs for correctness as there is now |
| * delayed creation of DiskId and accessing OplogkeyId will throw |
| * UnsupportedException |
| */ |
| @Test |
| public void testHelperAPIsForOverflowOnlyRegion() |
| { |
| diskProps.setPersistBackup(false); |
| diskProps.setRolling(false); |
| diskProps.setMaxOplogSize(1024 * 1024); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(true); |
| diskProps.setBytesThreshold(10000); |
| diskProps.setTimeInterval(0); |
| diskProps.setOverFlowCapacity(2); |
| region = DiskRegionHelperFactory |
| .getSyncOverFlowOnlyRegion(cache, diskProps); |
| final byte[] val = new byte[1000]; |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| region.put("1", val); |
| // region.get("1"); |
| region.put("2", val); |
| // region.get("2"); |
| region.put("3", val); |
| // region.get("3"); |
| DiskEntry entry1 = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| // DiskId did1 = entry1.getDiskId(); |
| DiskEntry entry2 = (DiskEntry)((LocalRegion)region).basicGetEntry("2"); |
| // DiskId did2 = entry2.getDiskId(); |
| DiskEntry entry3 = (DiskEntry)((LocalRegion)region).basicGetEntry("3"); |
| // DiskId did3 = entry3.getDiskId(); |
| assertNull(entry2.getDiskId()); |
| assertNull(entry3.getDiskId()); |
| assertNotNull(entry1.getDiskId()); |
| assertNull(DiskEntry.Helper.getValueOnDisk(entry3, dr)); |
| assertNull(DiskEntry.Helper.getValueOnDisk(entry2, dr)); |
| assertNotNull(DiskEntry.Helper.getValueOnDisk(entry1, dr)); |
| assertNull(DiskEntry.Helper.getValueOnDisk(entry3, dr)); |
| assertNull(DiskEntry.Helper.getValueOnDisk(entry2, dr)); |
| |
| assertNull(entry2.getDiskId()); |
| assertNull(entry3.getDiskId()); |
| assertNotNull(entry1.getDiskId()); |
| assertNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry3, dr, (LocalRegion) region)); |
| assertNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry2, dr, (LocalRegion) region)); |
| assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry1, dr, (LocalRegion) region)); |
| assertNull(DiskEntry.Helper.getValueOnDisk(entry3, dr)); |
| assertNull(DiskEntry.Helper.getValueOnDisk(entry2, dr)); |
| } |
| |
| /** |
| * Tests the various DiskEntry.Helper APIs for correctness as there is now |
| * delayed creation of DiskId and accessing OplogkeyId will throw |
| * UnsupportedException |
| */ |
| @Test |
| public void testHelperAPIsForOverflowWithPersistenceRegion() |
| { |
| helperAPIsForPersistenceWithOrWithoutOverflowRegion(true /* should overflow */); |
| } |
| |
| /** |
| * Tests the various DiskEntry.Helper APIs for correctness as there is now |
| * delayed creation of DiskId and accessing OplogkeyId will throw |
| * UnsupportedException |
| */ |
| @Test |
| public void testHelperAPIsForPersistenceRegion() |
| { |
| helperAPIsForPersistenceWithOrWithoutOverflowRegion(false /* should overflow */); |
| } |
| |
| /** |
| * Tests the various DiskEntry.Helper APIs for correctness as there is now |
| * delayed creation of DiskId and accessing OplogkeyId will throw |
| * UnsupportedException |
| */ |
| private void helperAPIsForPersistenceWithOrWithoutOverflowRegion( |
| boolean overflow) |
| { |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setMaxOplogSize(1024 * 1024); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(overflow); |
| diskProps.setBytesThreshold(10000); |
| diskProps.setTimeInterval(0); |
| diskProps.setOverFlowCapacity(2); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| final byte[] val = new byte[1000]; |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| region.put("1", val); |
| // region.get("1"); |
| region.put("2", val); |
| // region.get("2"); |
| region.put("3", val); |
| // region.get("3"); |
| DiskEntry entry1 = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| // DiskId did1 = entry1.getDiskId(); |
| DiskEntry entry2 = (DiskEntry)((LocalRegion)region).basicGetEntry("2"); |
| // DiskId did2 = entry2.getDiskId(); |
| DiskEntry entry3 = (DiskEntry)((LocalRegion)region).basicGetEntry("3"); |
| // DiskId did3 = entry3.getDiskId(); |
| assertNotNull(entry2.getDiskId()); |
| assertNotNull(entry3.getDiskId()); |
| assertNotNull(entry1.getDiskId()); |
| assertNotNull(DiskEntry.Helper.getValueOnDisk(entry3, dr)); |
| assertNotNull(DiskEntry.Helper.getValueOnDisk(entry2, dr)); |
| assertNotNull(DiskEntry.Helper.getValueOnDisk(entry1, dr)); |
| |
| assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry3, dr, (LocalRegion) region)); |
| assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry2, dr, (LocalRegion) region)); |
| assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry1, dr, (LocalRegion) region)); |
| |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, |
| diskProps); |
| dr = ((LocalRegion)region).getDiskRegion(); |
| entry1 = (DiskEntry)((LocalRegion)region).basicGetEntry("1"); |
| // did1 = entry1.getDiskId(); |
| entry2 = (DiskEntry)((LocalRegion)region).basicGetEntry("2"); |
| // did2 = entry2.getDiskId(); |
| entry3 = (DiskEntry)((LocalRegion)region).basicGetEntry("3"); |
| // did3 = entry3.getDiskId(); |
| |
| assertNotNull(entry2.getDiskId()); |
| assertNotNull(entry3.getDiskId()); |
| assertNotNull(entry1.getDiskId()); |
| |
| assertNotNull(DiskEntry.Helper.getValueOnDisk(entry3, dr)); |
| assertNotNull(DiskEntry.Helper.getValueOnDisk(entry2, dr)); |
| assertNotNull(DiskEntry.Helper.getValueOnDisk(entry1, dr)); |
| |
| assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry3, dr, (LocalRegion) region)); |
| assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry2, dr, (LocalRegion) region)); |
| assertNotNull(DiskEntry.Helper.getValueOnDiskOrBuffer(entry1, dr, (LocalRegion) region)); |
| |
| } |
| |
| // @todo this test is failing for some reason. Does it need to be fixed? |
| /** |
| * Bug test to reproduce the bug 37261. The scenario which this test depicts |
| * is not actually the cause of Bug 37261. This test validates the case where |
| * a synch persist only entry1 is created in Oplog1. A put operation on entry2 |
| * causes the switch , but before Oplog1 is rolled , the entry1 is modified so |
| * that it references Oplog2. Thus in effect roller will skip rolling entry1 |
| * when rolling Oplog1.Now entry1 is deleted in Oplog2 and then a rolling |
| * happens. There should not be any error |
| */ |
| // @Test |
| // public void testBug37261_1() |
| // { |
| // CacheObserver old = CacheObserverHolder.getInstance(); |
| // try { |
| // // Create a persist only region with rolling true |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(true); |
| // diskProps.setCompactionThreshold(100); |
| // diskProps.setMaxOplogSize(1024); |
| // diskProps.setSynchronous(true); |
| // this.proceed = false; |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| // diskProps); |
| // // create an entry 1 in oplog1, |
| // region.put("key1", new byte[800]); |
| |
| // // Asif the second put will cause a switch to oplog 2 & also cause the |
| // // oplog1 |
| // // to be submitted to the roller |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| // CacheObserverHolder.setInstance(new CacheObserverAdapter() { |
| // public void beforeGoingToCompact() |
| // { |
| // // modify entry 1 so that it points to the new switched oplog |
| // Thread th = new Thread(new Runnable() { |
| // public void run() |
| // { |
| // region.put("key1", new byte[400]); |
| // } |
| // }); |
| // th.start(); |
| // try { |
| // DistributedTestCase.join(th, 30 * 1000, null); |
| // } |
| // catch (Exception e) { |
| // e.printStackTrace(); |
| // failureCause = e.toString(); |
| // failure = true; |
| // } |
| // } |
| |
| // public void afterHavingCompacted() |
| // { |
| // synchronized (OplogJUnitTest.this) { |
| // rollerThread = Thread.currentThread(); |
| // OplogJUnitTest.this.notify(); |
| // OplogJUnitTest.this.proceed = true; |
| // } |
| // } |
| // }); |
| // region.put("key2", new byte[300]); |
| // synchronized (this) { |
| // if (!this.proceed) { |
| // this.wait(15000); |
| // assertTrue(this.proceed); |
| // } |
| // } |
| // this.proceed = false; |
| // // Asif Delete the 1st entry |
| // region.destroy("key1"); |
| |
| // CacheObserverHolder.setInstance(new CacheObserverAdapter() { |
| // public void afterHavingCompacted() |
| // { |
| // synchronized (OplogJUnitTest.this) { |
| // OplogJUnitTest.this.notify(); |
| // OplogJUnitTest.this.proceed = true; |
| // } |
| // } |
| // }); |
| // // Coz another switch and wait till rolling done |
| // region.put("key2", new byte[900]); |
| |
| // synchronized (this) { |
| // if (!this.proceed) { |
| // this.wait(15000); |
| // assertFalse(this.proceed); |
| // } |
| // } |
| // // Check if the roller is stil alive |
| // assertTrue(rollerThread.isAlive()); |
| // } |
| // catch (Exception e) { |
| // e.printStackTrace(); |
| // fail("Test failed du toe xception" + e); |
| // } |
| // finally { |
| // CacheObserverHolder.setInstance(old); |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| // } |
| |
| // } |
| |
| /** |
| * Tests the condition when a 'put' is in progress and concurrent 'clear' and |
| * 'put'(on the same key) occur. Thus if after Htree ref was set (in |
| * 'put'), the region got cleared (and same key re-'put'), |
| * the entry will get recorded in the new Oplog without a corresponding |
| * create ( because the Oplogs containing create have already been deleted |
| * due to the clear operation). This put should not proceed. Also, Region |
| * creation after closing should not give an exception. |
| */ |
| @Test |
| public void testPutClearPut() |
| { |
| try { |
| // Create a persist only region with rolling true |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setMaxOplogSize(1024); |
| diskProps.setSynchronous(true); |
| this.proceed = false; |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| final Thread clearOp = new Thread(new Runnable() { |
| public void run() |
| { |
| try { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| region.clear(); |
| region.put("key1", "value3"); |
| } |
| catch (Exception e) { |
| testFailed = true; |
| failureCause = "Encountered Exception=" + e; |
| } |
| } |
| }); |
| region.getAttributesMutator().setCacheWriter(new CacheWriterAdapter() { |
| @Override |
| public void beforeUpdate(EntryEvent event) throws CacheWriterException { |
| clearOp.start(); |
| } |
| }); |
| try { |
| DistributedTestCase.join(clearOp, 30 * 1000, null); |
| } |
| catch (Exception e) { |
| testFailed = true; |
| failureCause = "Encountered Exception=" + e; |
| e.printStackTrace(); |
| } |
| region.create("key1", "value1"); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| region.put("key1", "value2"); |
| if (!testFailed) { |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| }else { |
| fail(failureCause); |
| } |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to exception" + e); |
| } |
| finally { |
| testFailed = false; |
| proceed = false; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| |
| /** |
| * Tests the condition when a 'put' on an alreay created entry |
| * and concurrent 'clear' are happening. |
| * Thus if after HTree ref was set (in |
| * 'put'), the region got cleared (and same key re-'put'), |
| * the entry will actually become a create in the VM |
| * The new Oplog should record it as a create even though the |
| * Htree ref in ThreadLocal will not match with the |
| * current Htree Ref. But the operation is valid & should get recorded |
| * in Oplog |
| * |
| */ |
| @Test |
| public void testPutClearCreate() |
| { |
| failure = false; |
| try { |
| // Create a persist only region with rolling true |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setMaxOplogSize(1024); |
| diskProps.setSynchronous(true); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| |
| region.create("key1", "value1"); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserverHolder.setInstance(new CacheObserverAdapter(){ |
| @Override |
| public void afterSettingDiskRef() { |
| Thread clearTh = new Thread( new Runnable() { |
| public void run() { |
| region.clear(); |
| } |
| }); |
| clearTh.start(); |
| try { |
| DistributedTestCase.join(clearTh, 120 * 1000, null); |
| failure = clearTh.isAlive(); |
| failureCause = "Clear Thread still running !"; |
| } catch(Exception e) { |
| failure = true; |
| failureCause = e.toString(); |
| } |
| } |
| }); |
| region.put("key1", "value2"); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| assertFalse(failureCause,failure); |
| assertEquals(1,region.size()); |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| assertEquals(1,region.size()); |
| assertEquals("value2",(String)region.get("key1")); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to exception" + e); |
| } |
| finally { |
| testFailed = false; |
| proceed = false; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| CacheObserverHolder.setInstance(new CacheObserverAdapter()); |
| failure = false; |
| } |
| } |
| |
| /** |
| * Tests if 'destroy' transaction is working correctly |
| * for sync-overflow-only disk region entry |
| */ |
| @Test |
| public void testOverFlowOnlySyncDestroyTx() |
| { |
| diskProps.setMaxOplogSize(20480); |
| diskProps.setOverFlowCapacity(1); |
| diskProps.setDiskDirs(dirs); |
| region = DiskRegionHelperFactory.getSyncOverFlowOnlyRegion(cache, |
| diskProps); |
| assertNotNull(region); |
| region.put("key", "createValue"); |
| region.put("key1", "createValue1"); |
| try { |
| cache.getCacheTransactionManager().begin(); |
| region.destroy("key"); |
| cache.getCacheTransactionManager().commit(); |
| assertNull("The deleted entry should have been null",((LocalRegion)region).entries.getEntry("key")); |
| } |
| catch (CommitConflictException e) { |
| testFailed = true; |
| fail("CommitConflitException encountered"); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to exception" + e); |
| } |
| } |
| |
| /** |
| * Test to force a recovery to follow the path of switchOutFilesForRecovery |
| * and ensuring that IOExceptions do not come as a result. This is also a bug test for |
| * bug 37682 |
| * @throws Exception |
| */ |
| @Test |
| public void testSwitchFilesForRecovery() throws Exception |
| { |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, Scope.LOCAL); |
| put100Int(); |
| region.forceRolling(); |
| Thread.sleep(2000); |
| put100Int(); |
| int sizeOfRegion = region.size(); |
| region.close(); |
| //this variable will set to false in the src code itself |
| //NewLBHTreeDiskRegion.setJdbmexceptionOccuredToTrueForTesting = true; |
| try { |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, Scope.LOCAL); |
| } |
| catch (Exception e) { |
| fail("failed in recreating region due to"+e); |
| } finally { |
| //NewLBHTreeDiskRegion.setJdbmexceptionOccuredToTrueForTesting = false; |
| } |
| if (sizeOfRegion != region.size()) { |
| fail(" Expected region size to be " + sizeOfRegion |
| + " after recovery but it is " + region.size()); |
| } |
| } |
| |
| /** |
| * tests directory stats are correctly updated in case of single directory |
| * (for bug 37531) |
| */ |
| @Test |
| public void testPersist1DirStats() |
| { |
| final AtomicBoolean freezeRoller = new AtomicBoolean(); |
| CacheObserver old = CacheObserverHolder |
| .setInstance(new CacheObserverAdapter() { |
| private volatile boolean didBeforeCall = false; |
| @Override |
| public void beforeGoingToCompact() { |
| this.didBeforeCall = true; |
| synchronized (freezeRoller) { |
| if (!assertDone) { |
| try { |
| // Here, we are not allowing the Roller thread to roll the old oplog into htree |
| while (!freezeRoller.get()) { |
| freezeRoller.wait(); |
| } |
| freezeRoller.set(false); |
| } |
| catch (InterruptedException e) { |
| fail("interrupted"); |
| } |
| } |
| } |
| } |
| @Override |
| public void afterHavingCompacted() { |
| if (this.didBeforeCall) { |
| this.didBeforeCall = false; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| //assertTrue("Assert failure for DSpaceUsage in afterHavingCompacted ", diskSpaceUsageStats() == calculatedDiskSpaceUsageStats()); |
| // what is the point of this assert? |
| checkDiskStats(); |
| } |
| } |
| }); |
| try { |
| final int MAX_OPLOG_SIZE = 500; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 4000 }); |
| final byte[] val = new byte[200]; |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| region.put("key1", val); |
| // Disk space should have changed due to 1 put |
| //assertTrue("stats did not increase after put 1 ", diskSpaceUsageStats() == calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| region.put("key2", val); |
| //assertTrue("stats did not increase after put 2", diskSpaceUsageStats() == calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| // This put will cause a switch as max-oplog size (500) will be exceeded (600) |
| region.put("key3", val); |
| synchronized (freezeRoller) { |
| //assertTrue("current disk space usage with Roller thread in wait and put key3 done is incorrect " + diskSpaceUsageStats() + " " + calculatedDiskSpaceUsageStats(), diskSpaceUsageStats()== calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| assertDone = true; |
| freezeRoller.set(true); |
| freezeRoller.notifyAll(); |
| } |
| |
| region.close(); |
| closeDown(); |
| // Stop rolling to get accurate estimates: |
| diskProps.setRolling(false); |
| |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| |
| // On recreating the region after closing, old Oplog file gets rolled into htree |
| // "Disk space usage zero when region recreated" |
| checkDiskStats(); |
| region.put("key4", val); |
| //assertTrue("stats did not increase after put 4", diskSpaceUsageStats() == calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| region.put("key5", val); |
| //assertTrue("stats did not increase after put 5", diskSpaceUsageStats() == calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| assertDone = false; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| region.put("key6", val); |
| // again we expect a switch in oplog here |
| synchronized (freezeRoller) { |
| //assertTrue("current disk space usage with Roller thread in wait and put key6 done is incorrect", diskSpaceUsageStats()== calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| assertDone = true; |
| freezeRoller.set(true); |
| freezeRoller.notifyAll(); |
| } |
| region.close(); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to exception" + e); |
| } |
| finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| CacheObserverHolder.setInstance(old); |
| synchronized (freezeRoller) { |
| assertDone = true; |
| freezeRoller.set(true); |
| freezeRoller.notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * Tests reduction in size of disk stats |
| * when the oplog is rolled. |
| */ |
| @Test |
| public void testStatsSizeReductionOnRolling() throws Exception |
| { |
| final int MAX_OPLOG_SIZE = 500*2; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setCompactionThreshold(100); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 4000 }); |
| final byte[] val = new byte[333]; |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| final DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| final Object lock = new Object(); |
| final boolean [] exceptionOccured = new boolean[] {true}; |
| final boolean [] okToExit = new boolean[] {false}; |
| final boolean [] switchExpected = new boolean[] {false}; |
| |
| // calculate sizes |
| final int extra_byte_num_per_entry = InternalDataSerializer.calculateBytesForTSandDSID(getDSID((LocalRegion)region)); |
| final int key3_size = DiskOfflineCompactionJUnitTest.getSize4Create(extra_byte_num_per_entry, "key3", val); |
| final int tombstone_key1 = DiskOfflineCompactionJUnitTest.getSize4TombstoneWithKey(extra_byte_num_per_entry, "key1"); |
| final int tombstone_key2 = DiskOfflineCompactionJUnitTest.getSize4TombstoneWithKey(extra_byte_num_per_entry, "key2"); |
| |
| CacheObserver old = CacheObserverHolder |
| .setInstance(new CacheObserverAdapter() { |
| private long before = -1; |
| private DirectoryHolder dh = null; |
| private long oplogsSize = 0; |
| |
| @Override |
| public void beforeSwitchingOplog() { |
| cache.getLogger().info("beforeSwitchingOplog"); |
| if (!switchExpected[0]) { |
| fail("unexpected oplog switch"); |
| } |
| if (before == -1) { |
| // only want to call this once; before the 1st oplog destroy |
| this.dh = dr.getNextDir(); |
| this.before = this.dh.getDirStatsDiskSpaceUsage(); |
| } |
| } |
| @Override |
| public void beforeDeletingCompactedOplog(Oplog oplog) |
| { |
| cache.getLogger().info("beforeDeletingCompactedOplog"); |
| oplogsSize += oplog.getOplogSize(); |
| } |
| @Override |
| public void afterHavingCompacted() { |
| cache.getLogger().info("afterHavingCompacted"); |
| if(before > -1) { |
| synchronized(lock) { |
| okToExit[0] = true; |
| long after = this.dh.getDirStatsDiskSpaceUsage(); |
| // after compaction, in _2.crf, key3 is an create-entry, |
| // key1 and key2 are tombstones. |
| // _2.drf contained a rvvgc with drMap.size()==1 |
| int expected_drf_size = |
| Oplog.OPLOG_DISK_STORE_REC_SIZE + |
| Oplog.OPLOG_MAGIC_SEQ_REC_SIZE + |
| Oplog.OPLOG_GEMFIRE_VERSION_REC_SIZE + |
| DiskOfflineCompactionJUnitTest.getRVVSize(1, new int[] {0}, true); |
| int expected_crf_size = |
| Oplog.OPLOG_DISK_STORE_REC_SIZE + |
| Oplog.OPLOG_MAGIC_SEQ_REC_SIZE + |
| Oplog.OPLOG_GEMFIRE_VERSION_REC_SIZE + |
| DiskOfflineCompactionJUnitTest.getRVVSize(1, new int[] {1}, false) + |
| Oplog.OPLOG_NEW_ENTRY_BASE_REC_SIZE + |
| key3_size + tombstone_key1 + tombstone_key2; |
| int oplog_2_size = expected_drf_size + expected_crf_size; |
| if (after != oplog_2_size) { |
| cache.getLogger().info("test failed before=" + before |
| + " after=" + after |
| + " oplogsSize=" + oplogsSize); |
| exceptionOccured[0] = true; |
| }else { |
| exceptionOccured[0] = false; |
| } |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| lock.notify(); |
| } |
| } |
| } |
| }); |
| try { |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| cache.getLogger().info("putting key1"); |
| region.put("key1", val); |
| // Disk space should have changed due to 1 put |
| //assertTrue("stats did not increase after put 1 ", diskSpaceUsageStats() == calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| cache.getLogger().info("putting key2"); |
| region.put("key2", val); |
| //assertTrue("stats did not increase after put 2", diskSpaceUsageStats() == calculatedDiskSpaceUsageStats()); |
| checkDiskStats(); |
| |
| cache.getLogger().info("removing key1"); |
| region.remove("key1"); |
| cache.getLogger().info("removing key2"); |
| region.remove("key2"); |
| |
| // This put will cause a switch as max-oplog size (900) will be exceeded (999) |
| switchExpected[0] = true; |
| cache.getLogger().info("putting key3"); |
| region.put("key3", val); |
| cache.getLogger().info("waiting for compaction"); |
| synchronized(lock) { |
| if (!okToExit[0]) { |
| lock.wait(9000); |
| assertTrue(okToExit[0]); |
| } |
| assertFalse(exceptionOccured[0]); |
| } |
| |
| region.close(); |
| } |
| finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| CacheObserverHolder.setInstance(old); |
| } |
| } |
| |
| // @todo this test is broken; size1 can keep changing since the roller will |
| // keep copying forward forever. Need to change it so copy forward oplogs |
| // will not be compacted so that size1 reaches a steady state |
| /** |
| * Tests stats verification with rolling enabled |
| */ |
| // @Test |
| // public void testSizeStatsAfterRecreationWithRollingEnabled() throws Exception |
| // { |
| // final int MAX_OPLOG_SIZE = 500; |
| // diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| // diskProps.setPersistBackup(true); |
| // diskProps.setRolling(true); |
| // diskProps.setCompactionThreshold(100); |
| // diskProps.setSynchronous(true); |
| // diskProps.setOverflow(false); |
| // diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 4000 }); |
| // final byte[] val = new byte[200]; |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| // diskProps); |
| // final DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| // final Object lock = new Object(); |
| // final boolean [] exceptionOccured = new boolean[] {true}; |
| // final boolean [] okToExit = new boolean[] {false}; |
| |
| // CacheObserver old = CacheObserverHolder |
| // .setInstance(new CacheObserverAdapter() { |
| // private long before = -1; |
| // public void beforeDeletingCompactedOplog(Oplog rolledOplog) |
| // { |
| // if (before == -1) { |
| // // only want to call this once; before the 1st oplog destroy |
| // before = dr.getNextDir().getDirStatsDiskSpaceUsage(); |
| // } |
| // } |
| // public void afterHavingCompacted() { |
| // if(before > -1) { |
| // synchronized(lock) { |
| // okToExit[0] = true; |
| // long after = dr.getNextDir().getDirStatsDiskSpaceUsage();; |
| // exceptionOccured[0] = false; |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| // lock.notify(); |
| // } |
| // } |
| // } |
| // }); |
| // try { |
| |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| // region.put("key1", val); |
| // region.put("key2", val); |
| // // This put will cause a switch as max-oplog size (500) will be exceeded (600) |
| // region.put("key3", val); |
| // synchronized(lock) { |
| // if (!okToExit[0]) { |
| // lock.wait(9000); |
| // assertTrue(okToExit[0]); |
| // } |
| // assertFalse(exceptionOccured[0]); |
| // } |
| // while (region.forceCompaction() != null) { |
| // // wait until no more oplogs to compact |
| // Thread.sleep(50); |
| // } |
| // long size1 =0; |
| // for(DirectoryHolder dh:dr.getDirectories()) { |
| // cache.getLogger().info(" dir=" + dh.getDir() |
| // + " size1=" + dh.getDirStatsDiskSpaceUsage()); |
| // size1 += dh.getDirStatsDiskSpaceUsage(); |
| // } |
| // System.out.println("Size before closing= "+ size1); |
| // region.close(); |
| // diskProps.setRolling(false); |
| // region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| // diskProps); |
| |
| // long size2 =0; |
| // for(DirectoryHolder dh:((LocalRegion)region).getDiskRegion().getDirectories()) { |
| // cache.getLogger().info(" dir=" + dh.getDir() |
| // + " size2=" + dh.getDirStatsDiskSpaceUsage()); |
| // size2 += dh.getDirStatsDiskSpaceUsage(); |
| // } |
| // System.out.println("Size after recreation= "+ size2); |
| // assertEquals(size1, size2); |
| // region.close(); |
| |
| // } |
| // finally { |
| // LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| // CacheObserverHolder.setInstance(old); |
| // } |
| // } |
| |
| // This test is not valid. When listenForDataSerializeChanges is called |
| // it ALWAYS does vrecman writes and a commit. Look at saveInstantiators |
| // and saveDataSerializers to see these commit calls. |
| // These calls can cause the size of the files to change. |
| /** |
| * Tests if without rolling the region size before close is same as after |
| * recreation |
| */ |
| @Test |
| public void testSizeStatsAfterRecreation() throws Exception |
| { |
| final int MAX_OPLOG_SIZE = 500; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0],dirs[1] }, new int[] { 4000,4000 }); |
| final byte[] val = new byte[200]; |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| |
| try { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| for(int i = 0; i < 8;++i) { |
| region.put("key"+i, val); |
| } |
| long size1 =0; |
| for(DirectoryHolder dh:dr.getDirectories()) { |
| size1 += dh.getDirStatsDiskSpaceUsage(); |
| } |
| System.out.println("Size before close = "+ size1); |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| dr = ((LocalRegion)region).getDiskRegion(); |
| long size2 =0; |
| for(DirectoryHolder dh:dr.getDirectories()) { |
| size2 += dh.getDirStatsDiskSpaceUsage(); |
| } |
| System.out.println("Size after recreation= "+ size2); |
| assertEquals(size1, size2); |
| region.close(); |
| } |
| finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| |
| @Test |
| public void testUnPreblowOnRegionCreate() throws Exception { |
| final int MAX_OPLOG_SIZE = 20000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 40000 }); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, |
| Scope.LOCAL); |
| |
| try { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| for (int i = 0; i < 10; ++i) { |
| region.put("key-" + i, "value-"); |
| } |
| |
| assertEquals(18000, getOplogFileSizeSum(dirs[0], ".crf")); |
| assertEquals(2000, getOplogFileSizeSum(dirs[0], ".drf")); |
| |
| // make a copy of inflated crf. use this to replace compacted crf to |
| // simulate incomplete diskStore close |
| File[] files = dirs[0].listFiles(); |
| for (File file : files) { |
| if (file.getName().endsWith(".crf") || file.getName().endsWith(".drf")) { |
| File inflated = new File(file.getAbsolutePath() + "_inflated"); |
| FileUtils.copyFile(file, inflated); |
| } |
| } |
| |
| cache.close(); |
| assertTrue(500 > getOplogFileSizeSum(dirs[0], ".crf")); |
| assertTrue(100 > getOplogFileSizeSum(dirs[0], ".drf")); |
| |
| // replace compacted crf with inflated crf and remove krf |
| files = dirs[0].listFiles(); |
| for (File file : files) { |
| String name = file.getName(); |
| if (name.endsWith(".krf") || name.endsWith(".crf") || name.endsWith(".drf")) { |
| file.delete(); |
| } |
| } |
| for (File file : files) { |
| String name = file.getName(); |
| if (name.endsWith("_inflated")) { |
| assertTrue(file.renameTo(new File(file.getAbsolutePath().replace("_inflated", "")))); |
| } |
| } |
| assertEquals(18000, getOplogFileSizeSum(dirs[0], ".crf")); |
| assertEquals(2000, getOplogFileSizeSum(dirs[0], ".drf")); |
| |
| createCache(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| for (int i = 10; i < 20; ++i) { |
| region.put("key-" + i, "value-"); |
| } |
| |
| int sizeCrf = getOplogFileSizeSum(dirs[0], ".crf"); |
| assertTrue("crf too big:" + sizeCrf, sizeCrf < 18000 + 500); |
| assertTrue("crf too small:" + sizeCrf, sizeCrf > 18000); |
| int sizeDrf = getOplogFileSizeSum(dirs[0], ".drf"); |
| assertTrue("drf too big:" + sizeDrf, sizeDrf < 2000 + 100); |
| assertTrue("drf too small:" + sizeDrf, sizeDrf > 2000); |
| |
| // test that region recovery does not cause unpreblow |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| |
| assertEquals(sizeCrf, getOplogFileSizeSum(dirs[0], ".crf")); |
| assertEquals(sizeDrf, getOplogFileSizeSum(dirs[0], ".drf")); |
| } finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| |
| private int getOplogFileSizeSum(File dir, String type) { |
| int sum = 0; |
| File[] files = dir.listFiles(); |
| for (File file : files) { |
| String name = file.getName(); |
| if (name.endsWith(type)) { |
| sum += file.length(); |
| } |
| } |
| return sum; |
| } |
| |
| // disabled - this test frequently fails. See bug #52213 |
| public void disabledtestMagicSeqPresence() throws Exception { |
| final int MAX_OPLOG_SIZE = 200; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(true); |
| diskProps.setSynchronous(true); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 4000 }); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, |
| Scope.LOCAL); |
| |
| // at least 3 kinds of files will be verified |
| assertEquals(3, verifyOplogHeader(dirs[0])); |
| |
| try { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| for (int i = 0; i < 10; ++i) { |
| region.put("key-" + i, "value-"); |
| } |
| assertEquals(4, verifyOplogHeader(dirs[0])); |
| |
| region.close(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| |
| assertEquals(4, verifyOplogHeader(dirs[0])); |
| region.close(); |
| } finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| |
| /* |
| * returns number of types of files verified |
| */ |
| private int verifyOplogHeader(File dir) throws IOException { |
| File[] files = dir.listFiles(); |
| HashSet<String> verified = new HashSet<String>(); |
| for (File file : files) { |
| String name = file.getName(); |
| byte[] expect = new byte[Oplog.OPLOG_MAGIC_SEQ_REC_SIZE]; |
| if (name.endsWith(".crf")) { |
| expect[0] = Oplog.OPLOG_MAGIC_SEQ_ID; |
| System.arraycopy(OPLOG_TYPE.CRF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen()); |
| verified.add(".crf"); |
| } else if (name.endsWith(".drf")) { |
| expect[0] = Oplog.OPLOG_MAGIC_SEQ_ID; |
| System.arraycopy(OPLOG_TYPE.DRF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen()); |
| verified.add(".drf"); |
| } else if (name.endsWith(".krf")) { |
| expect[0] = Oplog.OPLOG_MAGIC_SEQ_ID; |
| System.arraycopy(OPLOG_TYPE.KRF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen()); |
| verified.add(".krf"); |
| } else if (name.endsWith(".if")) { |
| expect[0] = DiskInitFile.OPLOG_MAGIC_SEQ_ID; |
| System.arraycopy(OPLOG_TYPE.IF.getBytes(), 0, expect, 1, OPLOG_TYPE.getLen()); |
| verified.add(".if"); |
| } else { |
| System.out.println("Ignored: " + file); |
| continue; |
| } |
| expect[expect.length-1] = 21; // EndOfRecord |
| |
| byte[] buf = new byte[Oplog.OPLOG_MAGIC_SEQ_REC_SIZE]; |
| |
| FileInputStream fis = new FileInputStream(file); |
| int count = fis.read(buf, 0, 8); |
| fis.close(); |
| |
| System.out.println("Verifying: " + file); |
| assertEquals("expected a read to return 8 but it returned " + count + " for file " + file, 8, count); |
| assertTrue(Arrays.equals(expect, buf)); |
| } |
| return verified.size(); |
| } |
| |
| /** |
| * Tests if without rolling the region size before close is same as after |
| * recreation |
| */ |
| @Test |
| public void testSizeStatsAfterRecreationInAsynchMode() throws Exception |
| { |
| final int MAX_OPLOG_SIZE = 1000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(false); |
| diskProps.setBytesThreshold(800); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0],dirs[1] }, new int[] { 4000,4000 }); |
| final byte[] val = new byte[25]; |
| region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, |
| diskProps); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| |
| try { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| for(int i = 0; i < 42;++i) { |
| region.put("key"+i, val); |
| } |
| // need to wait for writes to happen before getting size |
| dr.flushForTesting(); |
| long size1 =0; |
| for(DirectoryHolder dh:dr.getDirectories()) { |
| size1 += dh.getDirStatsDiskSpaceUsage(); |
| } |
| System.out.println("Size before close = "+ size1); |
| region.close(); |
| diskProps.setSynchronous(true); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| dr = ((LocalRegion)region).getDiskRegion(); |
| long size2 =0; |
| for(DirectoryHolder dh:dr.getDirectories()) { |
| size2 += dh.getDirStatsDiskSpaceUsage(); |
| } |
| System.out.println("Size after recreation= "+ size2); |
| assertEquals(size1, size2); |
| region.close(); |
| } |
| finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| |
| |
| |
| @Test |
| public void testAsynchModeStatsBehaviour() throws Exception |
| { |
| final int MAX_OPLOG_SIZE = 1000; |
| diskProps.setMaxOplogSize(MAX_OPLOG_SIZE); |
| diskProps.setPersistBackup(true); |
| diskProps.setRolling(false); |
| diskProps.setSynchronous(false); |
| diskProps.setBytesThreshold(800); |
| diskProps.setTimeInterval(Long.MAX_VALUE); |
| diskProps.setOverflow(false); |
| diskProps.setDiskDirsAndSizes(new File[] { dirs[0] }, new int[] { 4000}); |
| final byte[] val = new byte[25]; |
| region = DiskRegionHelperFactory.getAsyncPersistOnlyRegion(cache, |
| diskProps); |
| DiskRegion dr = ((LocalRegion)region).getDiskRegion(); |
| |
| try { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| for(int i = 0; i < 4;++i) { |
| region.put("key"+i, val); |
| } |
| // This test now has a race condition in it since |
| // async puts no longer increment disk space. |
| // It is not until a everything is flushed that we will know the disk size. |
| dr.flushForTesting(); |
| |
| checkDiskStats(); |
| long size1 =0; |
| for(DirectoryHolder dh:dr.getDirectories()) { |
| size1 += dh.getDirStatsDiskSpaceUsage(); |
| } |
| System.out.println("Size before close = "+ size1); |
| region.close(); |
| diskProps.setSynchronous(true); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| dr = ((LocalRegion)region).getDiskRegion(); |
| long size2 =0; |
| for(DirectoryHolder dh:dr.getDirectories()) { |
| size2 += dh.getDirStatsDiskSpaceUsage(); |
| } |
| System.out.println("Size after recreation= "+ size2); |
| assertEquals(size1, size2); |
| region.close(); |
| } |
| finally { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| |
| protected long diskSpaceUsageStats() { |
| return ((LocalRegion)region).getDiskRegion().getInfoFileDir().getDirStatsDiskSpaceUsage(); |
| } |
| |
| protected long calculatedDiskSpaceUsageStats() { |
| long oplogSize = oplogSize(); |
| // cache.getLogger().info(" oplogSize=" + oplogSize |
| // + " statSize=" + diskSpaceUsageStats()); |
| return oplogSize; |
| } |
| private void checkDiskStats() { |
| long actualDiskStats = diskSpaceUsageStats(); |
| long computedDiskStats = calculatedDiskSpaceUsageStats(); |
| int tries=0; |
| while (actualDiskStats != computedDiskStats && tries++ <= 100) { |
| // race conditions exist in which the stats change |
| try { Thread.sleep(100); } catch (InterruptedException ignore) {} |
| actualDiskStats = diskSpaceUsageStats(); |
| computedDiskStats = calculatedDiskSpaceUsageStats(); |
| } |
| assertEquals(computedDiskStats, actualDiskStats); |
| } |
| |
| private long oplogSize() { |
| long size = ((LocalRegion)region).getDiskRegion().getDiskStore().undeletedOplogSize.get(); |
| // cache.getLogger().info("undeletedOplogSize=" + size); |
| Oplog [] opArray = ((LocalRegion)region).getDiskRegion().getDiskStore().persistentOplogs.getAllOplogs(); |
| if((opArray != null) && (opArray.length != 0)){ |
| for (int j = 0; j < opArray.length; ++j) { |
| size += opArray[j].getOplogSize(); |
| // cache.getLogger().info("oplog#" + opArray[j].getOplogId() |
| // + ".size=" + opArray[j].getOplogSize()); |
| } |
| } |
| return size; |
| } |
| private int getDSID(LocalRegion lr) { |
| return lr.getDistributionManager().getDistributedSystemId(); |
| } |
| } |