| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache; |
| |
| import org.junit.After; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import static org.junit.Assert.*; |
| |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.test.junit.categories.IntegrationTest; |
| |
| /** |
| * The test will verify <br> |
| * 1. Multiple oplogs are being rolled at once <br> |
| * 2. The Number of entries getting logged to the HTree are taking care of creation |
| * |
| * @author Pratik Batra |
| */ |
| @Category(IntegrationTest.class) |
| public class MultipleOplogsRollingFeatureJUnitTest extends |
| DiskRegionTestingBase |
| { |
| |
| protected Object mutex = new Object(); |
| |
| protected boolean CALLBACK_SET = false; |
| |
| protected volatile boolean FLAG = false; |
| |
| DiskRegionProperties diskProps = new DiskRegionProperties(); |
| |
| @After |
| public void tearDown() throws Exception |
| { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| super.tearDown(); |
| diskProps.setDiskDirs(dirs); |
| } |
| |
| /** |
| * The test will verify <br> |
| * 1. Multiple oplogs are being rolled at once |
| * 2. The Number of entries are properly conflated |
| */ |
| @Test |
| public void testMultipleRolling() |
| { |
| System.setProperty("gemfire.MAX_OPLOGS_PER_COMPACTION", "17"); |
| try { |
| deleteFiles(); |
| diskProps.setMaxOplogSize(450); |
| diskProps.setCompactionThreshold(100); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| assertNotNull(region); |
| DiskRegion diskRegion = ((LocalRegion)region).getDiskRegion(); |
| assertNotNull(diskRegion); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserverHolder.setInstance(getCacheObserver()); |
| try { |
| |
| CALLBACK_SET = true; |
| |
| assertEquals(null, diskRegion.getOplogToBeCompacted()); |
| |
| logWriter.info("testMultipleRolling adding entry 1"); |
| addEntries(1 /* oplogNumber*/, 50 /* byte array size*/); |
| |
| ((LocalRegion)region).getDiskStore().forceCompaction(); |
| waitForCompactor(3000/*wait for forceRolling to finish */); |
| logWriter.info("testMultipleRolling after waitForCompactor"); |
| // the compactor copied two tombstone and 1 entry to oplog #2 |
| // The total oplog size will become 429, that why we need to |
| // set oplogmaxsize to be 450. After compaction, the size become 151 |
| // the compactor thread is now stuck waiting for mutex.notify |
| |
| // check the oplog rolling is null (since the compactor was able to delete it) |
| if (diskRegion.getOplogToBeCompacted() != null) { |
| logWriter.info("testMultipleRolling OplogToBeCompacted=" + java.util.Arrays.toString(diskRegion.getOplogToBeCompacted())); |
| } |
| assertEquals(null, diskRegion.getOplogToBeCompacted()); |
| |
| // update key3 with 360 bytes to cause it not fit in oplog #2 so it will create oplog #3 |
| // this does an update to key 3 (3 is no longer in oplog#2) |
| // So oplog#2 is EMPTY. |
| logWriter.info("testMultipleRolling adding entry 2"); |
| addEntries(2 /* oplogNumber*/, 360 /* byte array size*/); |
| |
| // #2 is now owned by the compactor so even though it is empty it |
| // will not be deleted until the compactor does so. |
| // only one oplog to be compacted because there's no drf |
| assertEquals(1, diskRegion.getOplogToBeCompacted().length); |
| |
| // This add will not fit in oplog #3 so it will create oplog #4 |
| // It does an update to key 3 (so 3 is no longer in oplog#3) |
| // which empties it out |
| logWriter.info("testMultipleRolling adding entry 3"); |
| addEntries(3 /* oplogNumber*/, 180 /* byte array size*/); |
| |
| // #3 is was ready to be compacted but since it was EMPTIED |
| // but because the compaction thread (the pool defaults to 1 thread) |
| // is hung it will not be deleted immedaitely. |
| assertEquals(2, diskRegion.getOplogToBeCompacted().length); |
| |
| // this add will fit in the current oplog |
| // Just added an update to oplog #4 |
| logWriter.info("testMultipleRolling adding entry 4"); |
| addEntries(4 /* oplogNumber*/, 1 /* byte array size*/); |
| |
| assertEquals(2, diskRegion.getOplogToBeCompacted().length); |
| logWriter.info("testMultipleRolling forceRolling"); |
| region.forceRolling(); |
| assertEquals(3, diskRegion.getOplogToBeCompacted().length); |
| |
| } finally { |
| synchronized (mutex) { |
| // let the compactor go |
| CALLBACK_SET = false; |
| FLAG = false; |
| logWriter.info("testMultipleRolling letting compactor go"); |
| mutex.notify(); |
| |
| } |
| } |
| |
| // let the main thread sleep so that rolling gets over |
| waitForCompactor(5000); |
| |
| assertTrue( |
| "Number of Oplogs to be rolled is not null : this is unexpected", |
| diskRegion.getOplogToBeCompacted() == null); |
| cache.close(); |
| cache = createCache(); |
| region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, |
| diskProps, Scope.LOCAL); |
| assertTrue("Recreated region size is not 1 ", region.size() == 1); |
| |
| closeDown(); |
| deleteFiles(); |
| } |
| catch (Exception ex) { |
| ex.printStackTrace(); |
| fail("testMultipleRolling: test failed due to " + ex); |
| } finally { |
| System.clearProperty("gemfire.MAX_OPLOGS_PER_COMPACTION"); |
| } |
| } |
| |
| private void waitForCompactor(long maxWaitingTime) |
| { |
| long maxWaitTime = maxWaitingTime; |
| long start = System.currentTimeMillis(); |
| while (!FLAG) { // wait until |
| // condition is met |
| assertTrue("Waited over " + maxWaitTime + "entry to get refreshed", |
| (System.currentTimeMillis() - start) < maxWaitTime); |
| try { |
| Thread.sleep(1); |
| |
| } |
| catch (InterruptedException ie) { |
| fail("Interrupted while waiting " + ie); |
| } |
| } |
| } |
| |
| private void addEntries(int opLogNum, int valueSize) |
| { |
| assertNotNull(region); |
| byte[] val = new byte[valueSize]; |
| for (int i = 0; i < valueSize; ++i) { |
| val[i] = (byte)i; |
| } |
| |
| // Creating opLog1 |
| if (opLogNum == 1) { |
| for (int i = 1; i < 4; i++) { |
| // create 3 entries |
| region.create(new Integer(i), val); |
| |
| } |
| // destroy Entry 1 and 2 |
| region.destroy(new Integer(1)); |
| region.destroy(new Integer(2)); |
| } |
| |
| else if (opLogNum == 2) { |
| // update Entry 3 |
| region.put(new Integer(3), val); |
| |
| } |
| |
| else if (opLogNum == 3) { |
| // update Entry 3 |
| region.put(new Integer(3), val); |
| |
| } |
| |
| else if (opLogNum == 4) { |
| // update Entry 3 |
| region.put(new Integer(3), val); |
| } |
| } |
| |
| private CacheObserver getCacheObserver() |
| { |
| return (new CacheObserverAdapter() { |
| |
| public void beforeGoingToCompact() |
| { |
| |
| if (logWriter.fineEnabled()) { |
| |
| logWriter.fine("In beforeGoingToCompact"); |
| } |
| |
| } |
| |
| public void afterHavingCompacted() |
| { |
| FLAG = true; |
| if (CALLBACK_SET) { |
| synchronized (mutex) { |
| try { |
| mutex.wait(); |
| } |
| catch (InterruptedException e) { |
| fail("interrupted"); |
| } |
| } |
| } |
| if (logWriter.fineEnabled()) { |
| |
| logWriter.fine("In afterHavingCompacted"); |
| } |
| |
| } |
| }); |
| |
| } |
| } |