| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static java.util.concurrent.TimeUnit.MINUTES; |
| import static org.apache.geode.cache.EvictionAction.OVERFLOW_TO_DISK; |
| import static org.apache.geode.cache.EvictionAttributes.DEFAULT_ENTRIES_MAXIMUM; |
| import static org.apache.geode.cache.EvictionAttributes.createLRUEntryAttributes; |
| import static org.apache.geode.cache.RegionShortcut.LOCAL; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.catchThrowable; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.lang.reflect.Array; |
| import java.nio.ByteBuffer; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.contrib.java.lang.system.RestoreSystemProperties; |
| import org.junit.rules.ErrorCollector; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.rules.TestName; |
| |
| import org.apache.geode.cache.CacheClosedException; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.DiskAccessException; |
| import org.apache.geode.cache.DiskStore; |
| import org.apache.geode.cache.DiskStoreFactory; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.EvictionAttributes; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.cache.util.ObjectSizer; |
| import org.apache.geode.internal.cache.entries.DiskEntry; |
| import org.apache.geode.internal.cache.eviction.EvictionCounters; |
| import org.apache.geode.internal.cache.persistence.DiskRecoveryStore; |
| import org.apache.geode.internal.cache.persistence.UninterruptibleFileChannel; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.junit.rules.ExecutorServiceRule; |
| |
| /** |
| * Integration tests covering some miscellaneous functionality of Disk Region. |
| */ |
| public class DiskRegionJUnitTest { |
| |
| private static final long MAX_OPLOG_SIZE_IN_BYTES = 1024 * 1024 * 1024 * 10L; |
| |
| private Properties config; |
| private InternalCache cache; |
| private EvictionAttributes heapEvictionAttributes; |
| |
| private File[] diskDirs; |
| private int[] diskDirSizes; |
| |
| private String uniqueName; |
| private String regionName; |
| private String diskStoreName; |
| |
| @Rule |
| public ErrorCollector errorCollector = new ErrorCollector(); |
| |
| @Rule |
| public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); |
| |
| @Rule |
| public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); |
| |
| @Rule |
| public TemporaryFolder temporaryFolder = new TemporaryFolder(); |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| @Before |
| public void setUp() throws Exception { |
| uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); |
| regionName = uniqueName + "_region"; |
| diskStoreName = uniqueName + "_diskStore"; |
| |
| config = new Properties(); |
| config.setProperty(MCAST_PORT, "0"); |
| config.setProperty(LOCATORS, ""); |
| |
| cache = (InternalCache) new CacheFactory(config).create(); |
| |
| heapEvictionAttributes = EvictionAttributes.createLRUHeapAttributes(ObjectSizer.DEFAULT, |
| OVERFLOW_TO_DISK); |
| |
| diskDirs = new File[4]; |
| diskDirs[0] = createDirectory(temporaryFolder.getRoot(), testName.getMethodName() + "1"); |
| diskDirs[1] = createDirectory(temporaryFolder.getRoot(), testName.getMethodName() + "2"); |
| diskDirs[2] = createDirectory(temporaryFolder.getRoot(), testName.getMethodName() + "3"); |
| diskDirs[3] = createDirectory(temporaryFolder.getRoot(), testName.getMethodName() + "4"); |
| |
| // set default values of disk dir sizes here |
| diskDirSizes = new int[4]; |
| diskDirSizes[0] = Integer.MAX_VALUE; |
| diskDirSizes[1] = Integer.MAX_VALUE; |
| diskDirSizes[2] = Integer.MAX_VALUE; |
| diskDirSizes[3] = Integer.MAX_VALUE; |
| |
| DiskStoreImpl.SET_IGNORE_PREALLOCATE = true; |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| try { |
| if (cache != null) { |
| cache.close(); |
| } |
| } finally { |
| CacheObserverHolder.setInstance(null); |
| DiskStoreImpl.SET_IGNORE_PREALLOCATE = false; |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| } |
| } |
| |
| @Test |
| public void testRemoveCorrectlyRecorded() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setOverFlowCapacity(1); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, 1); |
| |
| region.put("1", "1"); |
| region.put("2", "2"); |
| region.put("3", "3"); |
| |
| AfterDestroyListener<String, String> cacheListener = new AfterDestroyListener<>(); |
| region.getAttributesMutator().addCacheListener(cacheListener); |
| |
| region.destroy("1"); |
| |
| // Make sure we don't get an old value when doing a destroy of an entry that overflowed to disk. |
| // If we do then we have hit bug 40795. |
| assertThat(cacheListener.getLastEvent()).isNotNull(); |
| assertThat(cacheListener.getLastEvent().getOldValue()).isNull(); |
| |
| assertThat(region.get("1")).isNull(); |
| |
| // does the following ever throw EntryNotFoundException? that would be ok |
| Object valueOnDisk = ((InternalRegion) region).getValueOnDisk("1"); |
| assertThat(valueOnDisk == null || valueOnDisk.equals(Token.TOMBSTONE)).isTrue(); |
| |
| region.close(); |
| |
| Region<String, String> region2 = |
| createRegion(regionName, diskStoreName, true, true, false, true, 1); |
| assertThat(region2.get("1")).isNull(); |
| } |
| |
| /** |
| * Tests if region overflows correctly and stats are create and updated correctly. |
| */ |
| @Test |
| public void testDiskRegionOverflow() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setOverFlowCapacity(5); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, false, false, false, true, 5); |
| |
| DiskRegion diskRegion = getDiskRegion(region); |
| |
| DiskRegionStats diskStats = diskRegion.getStats(); |
| EvictionCounters evictionCounters = |
| ((InternalRegion) region).getEvictionController().getCounters(); |
| |
| assertThat(diskStats).isNotNull(); |
| assertThat(evictionCounters).isNotNull(); |
| |
| diskRegion.flushForTesting(); |
| |
| assertThat(diskStats.getWrites()).isEqualTo(0); |
| assertThat(diskStats.getReads()).isEqualTo(0); |
| assertThat(evictionCounters.getEvictions()).isEqualTo(0); |
| |
| // Put in larger stuff until we start evicting |
| int total; |
| for (total = 0; evictionCounters.getEvictions() <= 0; total++) { |
| int[] array = new int[250]; |
| array[0] = total; |
| region.put(total, array); |
| } |
| |
| diskRegion.flushForTesting(); |
| |
| assertThat(diskStats.getWrites()).isEqualTo(1); |
| assertThat(diskStats.getReads()).isEqualTo(0); |
| assertThat(diskStats.getNumEntriesInVM()).isEqualTo(total - 1); |
| assertThat(diskStats.getNumOverflowOnDisk()).isEqualTo(1); |
| assertThat(evictionCounters.getEvictions()).isEqualTo(1); |
| |
| Object value = region.get(0); |
| assertThat(value).isNotNull().isInstanceOf(int[].class); |
| assertThat(((int[]) value)[0]).isEqualTo(0); |
| |
| diskRegion.flushForTesting(); |
| |
| assertThat(diskStats.getWrites()).isEqualTo(2); |
| assertThat(diskStats.getReads()).isEqualTo(1); |
| assertThat(evictionCounters.getEvictions()).isEqualTo(2); |
| |
| for (int i = 0; i < total; i++) { |
| int[] array = (int[]) region.get(i); |
| assertThat(array).isNotNull().isInstanceOf(int[].class); |
| assertThat(array[0]).isEqualTo(i); |
| } |
| } |
| |
| /** |
| * test method for putting different objects and validating that they have been correctly put |
| */ |
| @Test |
| public void testDifferentObjectTypePuts() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setOverFlowCapacity(100); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, Object> region = |
| createRegion(regionName, diskStoreName, true, false, false, true, 100); |
| |
| DiskRegion diskRegion = getDiskRegion(region); |
| |
| int total = 10; |
| for (int i = 0; i < total; i++) { |
| region.put(String.valueOf(i), String.valueOf(i)); |
| } |
| region.put("foobar", "junk"); |
| |
| region.localDestroy("foobar"); |
| |
| region.put("foobar2", "junk"); |
| diskRegion.flushForTesting(); |
| region.localDestroy("foobar2"); |
| |
| // test invalidate |
| region.put("invalid", "invalid"); |
| diskRegion.flushForTesting(); |
| region.invalidate("invalid"); |
| diskRegion.flushForTesting(); |
| assertThat(region.containsKey("invalid")).isTrue(); |
| assertThat(region.containsValueForKey("invalid")).isFalse(); |
| total++; |
| |
| // test local-invalidate |
| region.put("localinvalid", "localinvalid"); |
| diskRegion.flushForTesting(); |
| region.localInvalidate("localinvalid"); |
| diskRegion.flushForTesting(); |
| assertThat(region.containsKey("localinvalid")).isTrue(); |
| assertThat(region.containsValueForKey("localinvalid")).isFalse(); |
| total++; |
| |
| // test byte[] values |
| region.put("byteArray", new byte[0]); |
| diskRegion.flushForTesting(); |
| assertThatArrayEquals(new byte[0], region.get("byteArray")); |
| total++; |
| |
| // test modification |
| region.put("modified", "originalValue"); |
| diskRegion.flushForTesting(); |
| region.put("modified", "modified"); |
| diskRegion.flushForTesting(); |
| assertThat(region.get("modified")).isEqualTo("modified"); |
| total++; |
| |
| assertThat(region.size()).isEqualTo(total); |
| |
| cache.close(); |
| cache = (InternalCache) new CacheFactory(config).create(); |
| |
| diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| region = createRegion(regionName, diskStoreName, true, false, false, true, 100); |
| |
| assertThat(region.size()).isEqualTo(total); |
| assertThat(region.containsKey("invalid")).isTrue(); |
| assertThat(region.get("invalid")).isNull(); |
| assertThat(region.containsValueForKey("invalid")).isFalse(); |
| |
| region.localDestroy("invalid"); |
| total--; |
| |
| assertThat(region.containsKey("localinvalid")).isTrue(); |
| assertThat(region.containsValueForKey("localinvalid")).isFalse(); |
| region.localDestroy("localinvalid"); |
| total--; |
| |
| assertThatArrayEquals(new byte[0], region.get("byteArray")); |
| region.localDestroy("byteArray"); |
| total--; |
| |
| assertThat(region.get("modified")).isEqualTo("modified"); |
| region.localDestroy("modified"); |
| total--; |
| |
| assertThat(region.size()).isEqualTo(total); |
| } |
| |
| @Test |
| public void testFaultingInRemovalFromAsyncBuffer() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setOverFlowCapacity(100); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Integer, Integer> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 100); |
| |
| CountDownLatch doLatch = new CountDownLatch(1); |
| DelayedGet get = new DelayedGet(doLatch, region); |
| |
| CompletableFuture<Void> getter1 = executorServiceRule.runAsync(get); |
| CompletableFuture<Void> getter2 = executorServiceRule.runAsync(get); |
| CompletableFuture<Void> getter3 = executorServiceRule.runAsync(get); |
| CompletableFuture<Void> getter4 = executorServiceRule.runAsync(get); |
| CompletableFuture<Void> getter5 = executorServiceRule.runAsync(get); |
| |
| for (int i = 0; i < 110; i++) { |
| region.put(i, i); |
| } |
| |
| doLatch.countDown(); |
| |
| CompletableFuture.allOf(getter1, getter2, getter3, getter4, getter5).get(2, MINUTES); |
| } |
| |
| /** |
| * DiskDirectoriesJUnitTest: |
| * |
| * This tests the potential deadlock situation if the region is created such that rolling is |
| * turned on but the Max directory space is less than or equal to the Max Oplog Size. In such |
| * situations , if during switch over , if the Oplog to be rolled is added after function call of |
| * obtaining nextDir , a dead lock occurs |
| */ |
| @Test |
| public void testSingleDirectoryNotHanging() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {diskDirs[0]}, new int[] {2048}); |
| diskRegionProperties.setMaxOplogSize(2097152); |
| diskRegionProperties.setRolling(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, byte[]> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| Puts puts = new Puts(region); |
| puts.performPuts(); |
| |
| assertThat(puts.putSuccessful(0)).as(" first put did not succeed").isTrue(); |
| assertThat(puts.putSuccessful(1)).as(" second put did not succeed").isTrue(); |
| assertThat(puts.putSuccessful(2)).as(" third put did not succeed").isTrue(); |
| |
| assertThat(puts.diskAccessExceptionOccurred()) |
| .as(" Exception was not supposed to occur but did occur").isFalse(); |
| } |
| |
| @Test |
| public void testOperationGreaterThanMaxOplogSize() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setMaxOplogSize(512); |
| diskRegionProperties.setRolling(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, byte[]> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| Puts puts = new Puts(region); |
| puts.performPuts(); |
| |
| assertThat(puts.putSuccessful(0)).as(" first put did not succeed").isTrue(); |
| assertThat(puts.putSuccessful(1)).as(" second put did not succeed").isTrue(); |
| assertThat(puts.putSuccessful(2)).as(" third put did not succeed").isTrue(); |
| |
| assertThat(puts.diskAccessExceptionOccurred()) |
| .as(" Exception was not supposed to occur but did occur").isFalse(); |
| } |
| |
| /** |
| * As we have relaxed the constraint of max dir size |
| */ |
| @Test |
| public void testOperationGreaterThanMaxDirSize() { |
| int[] diskDirSizes = {1025, 1025, 1025, 1025}; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setDiskDirsAndSizes(diskDirs, diskDirSizes); |
| diskRegionProperties.setMaxOplogSize(600); |
| diskRegionProperties.setRolling(false); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| DiskStore diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, byte[]> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| assertThat(diskStore.getDiskDirSizes()) |
| .as("expected=" + Arrays.toString(diskDirSizes) + " actual=" |
| + Arrays.toString(diskStore.getDiskDirSizes())) |
| .isEqualTo(diskDirSizes); |
| |
| Puts puts = new Puts(region, 1026); |
| puts.performPuts(); |
| |
| assertThat(puts.diskAccessExceptionOccurred()) |
| .as(" Exception was not supposed to occur but did occur").isTrue(); |
| |
| assertThat(puts.putSuccessful(0)).as(" first put did not succeed").isFalse(); |
| assertThat(puts.putSuccessful(1)).as(" second put did not succeed").isFalse(); |
| assertThat(puts.putSuccessful(2)).as(" third put did not succeed").isFalse(); |
| |
| // if the exception occurred then the region should be closed already |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * Regression test for TRAC #42464: overfilling disk directory causes rampant oplog creation |
| * |
| * <p> |
| * When max-dir-size is exceeded and compaction is enabled we allow oplogs to keep getting |
| * created. Make sure that when they do they do not keep putting one op per oplog (which is caused |
| * by bug 42464). |
| */ |
| @Test |
| public void testBug42464() { |
| int[] dirSizes = new int[] {900}; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {diskDirs[0]}, dirSizes); |
| diskRegionProperties.setMaxOplogSize(500); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| DiskStore diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 500); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 1); |
| |
| assertThat(diskStore.getDiskDirSizes()).as("expected=" + Arrays.toString(dirSizes) + " actual=" |
| + Arrays.toString(diskStore.getDiskDirSizes())).isEqualTo(dirSizes); |
| |
| // One entry is kept in memory |
| // since the crf max is 500 we should only be able to have 4 entries. The |
| // 5th should not fit because of record overhead. |
| for (int i = 0; i <= 9; i++) { |
| region.getCache().getLogger().info("putting " + i); |
| region.put(i, new byte[101]); |
| } |
| |
| // At this point we should have two oplogs that are basically full |
| // (they should each contain 4 entries) and a third oplog that |
| // contains a single entry. But the 3rd one will end up also containing 4 |
| // entries. |
| // TODO what is the max size of this 3rd oplog's crf? The first two crfs |
| // will be close to 400 bytes each. So the max size of the 3rd oplog should |
| // be close to 100. |
| ArrayList<OverflowOplog> oplogs = toDiskStoreImpl(diskStore).testHookGetAllOverflowOplogs(); |
| assertThat(oplogs).hasSize(3); |
| |
| // TODO verify oplogs |
| // Now make sure that further oplogs can hold 4 entries |
| for (int j = 10; j <= 13; j++) { |
| region.put(j, new byte[101]); |
| } |
| oplogs = toDiskStoreImpl(diskStore).testHookGetAllOverflowOplogs(); |
| assertThat(oplogs).hasSize(4); |
| |
| // now remove all entries and make sure old oplogs go away |
| for (int i = 0; i <= 13; i++) { |
| region.remove(i); |
| } |
| |
| // give background compactor chance to remove oplogs |
| oplogs = toDiskStoreImpl(diskStore).testHookGetAllOverflowOplogs(); |
| int retryCount = 20; |
| while (oplogs.size() > 1 && retryCount > 0) { |
| Wait.pause(100); |
| oplogs = toDiskStoreImpl(diskStore).testHookGetAllOverflowOplogs(); |
| retryCount--; |
| } |
| assertThat(oplogs).hasSize(1); |
| } |
| |
| @Test |
| public void testSingleDirectorySizeViolation() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {diskDirs[0]}, new int[] {2048}); |
| diskRegionProperties.setMaxOplogSize(2097152); |
| diskRegionProperties.setRolling(false); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, byte[]> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| Puts puts = new Puts(region); |
| puts.performPuts(); |
| |
| assertThat(puts.putSuccessful(0)).as(" first put did not succeed").isTrue(); |
| assertThat(puts.putSuccessful(1)).as(" second put should have failed").isFalse(); |
| assertThat(puts.putSuccessful(2)).as(" third put should have failed").isFalse(); |
| |
| assertThat(puts.diskAccessExceptionOccurred()) |
| .as(" Exception was supposed to occur but did not occur").isTrue(); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * DiskRegDiskAccessExceptionTest : Disk region test for DiskAccessException. |
| */ |
| @Test |
| public void testDiskFullExcep() { |
| int[] diskDirSizes = new int[4]; |
| diskDirSizes[0] = 2048 + 500; |
| diskDirSizes[1] = 2048 + 500; |
| diskDirSizes[2] = 2048 + 500; |
| diskDirSizes[3] = 2048 + 500; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(diskDirs, diskDirSizes); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setMaxOplogSize(1000000000); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1_000_000_000); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| int[] actualDiskDirSizes = ((InternalRegion) region).getDiskDirSizes(); |
| |
| assertThat(actualDiskDirSizes[0]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[1]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[2]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[3]).isEqualTo(2048 + 500); |
| |
| // we have room for 2 values per dir |
| |
| byte[] value = new byte[1024]; |
| Arrays.fill(value, (byte) 77); |
| |
| for (int i = 0; i < 8; i++) { |
| region.put(String.valueOf(i), value); |
| } |
| |
| // we should have put 2 values in each dir so the next one should not fit |
| try (IgnoredException ie = addIgnoredException(DiskAccessException.class)) { |
| Throwable thrown = catchThrowable(() -> region.put("FULL", value)); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| } |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * Make sure if compaction is enabled that we can exceed the disk dir limit |
| */ |
| @Test |
| public void testNoDiskFullExcep() { |
| int[] diskDirSizes = new int[4]; |
| diskDirSizes[0] = 2048 + 500; |
| diskDirSizes[1] = 2048 + 500; |
| diskDirSizes[2] = 2048 + 500; |
| diskDirSizes[3] = 2048 + 500; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(diskDirs, diskDirSizes); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setMaxOplogSize(1000000000); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1_000_000_000); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| int[] actualDiskDirSizes = ((InternalRegion) region).getDiskDirSizes(); |
| |
| assertThat(actualDiskDirSizes[0]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[1]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[2]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[3]).isEqualTo(2048 + 500); |
| |
| // we have room for 2 values per dir |
| |
| byte[] value = new byte[1024]; |
| Arrays.fill(value, (byte) 77); |
| |
| for (int i = 0; i < 8; i++) { |
| region.put(String.valueOf(i), value); |
| } |
| |
| // we should have put 2 values in each dir so the next one should not fit but will be allowed |
| // because compaction is enabled. It should log a warning. |
| region.put("OK", value); |
| |
| assertThat(cache.isClosed()).isFalse(); |
| } |
| |
| /** |
| * DiskRegDiskAccessExceptionTest : Disk region test for DiskAccessException. |
| */ |
| @Test |
| public void testDiskFullExcepOverflowOnly() { |
| int[] diskDirSizes = new int[4]; |
| diskDirSizes[0] = 2048 + 500; |
| diskDirSizes[1] = 2048 + 500; |
| diskDirSizes[2] = 2048 + 500; |
| diskDirSizes[3] = 2048 + 500; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(diskDirs, diskDirSizes); |
| diskRegionProperties.setPersistBackup(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setMaxOplogSize(1000000000); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1_000_000_000); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 1); |
| |
| int[] actualDiskDirSizes = ((InternalRegion) region).getDiskDirSizes(); |
| |
| assertThat(actualDiskDirSizes[0]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[1]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[2]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[3]).isEqualTo(2048 + 500); |
| |
| // we have room for 2 values per dir |
| |
| byte[] value = new byte[1024]; |
| Arrays.fill(value, (byte) 77); |
| |
| // put a dummy value in since one value stays in memory |
| region.put("FIRST", value); |
| |
| for (int i = 0; i < 8; i++) { |
| region.put(String.valueOf(i), value); |
| } |
| |
| // we should have put 2 values in each dir so the next one should not fit |
| try (IgnoredException ie = addIgnoredException(DiskAccessException.class)) { |
| Throwable thrown = catchThrowable(() -> region.put("FULL", value)); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| } |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * Make sure if compaction is enabled that we can exceed the disk dir limit |
| */ |
| @Test |
| public void testNoDiskFullExcepOverflowOnly() { |
| int[] diskDirSizes = new int[4]; |
| diskDirSizes[0] = 2048 + 500; |
| diskDirSizes[1] = 2048 + 500; |
| diskDirSizes[2] = 2048 + 500; |
| diskDirSizes[3] = 2048 + 500; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(diskDirs, diskDirSizes); |
| diskRegionProperties.setPersistBackup(false); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setMaxOplogSize(1000000000); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1_000_000_000); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 1); |
| |
| int[] actualDiskDirSizes = ((InternalRegion) region).getDiskDirSizes(); |
| |
| assertThat(actualDiskDirSizes[0]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[1]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[2]).isEqualTo(2048 + 500); |
| assertThat(actualDiskDirSizes[3]).isEqualTo(2048 + 500); |
| |
| // we have room for 2 values per dir |
| |
| byte[] value = new byte[1024]; |
| Arrays.fill(value, (byte) 77); |
| |
| // put a dummy value in since one value stays in memory |
| region.put("FIRST", value); |
| |
| for (int i = 0; i < 8; i++) { |
| region.put(String.valueOf(i), value); |
| } |
| |
| // we should have put 2 values in each dir so the next one should not fit |
| // but will be allowed because compaction is enabled. It should log a warning. |
| region.put("OK", value); |
| |
| assertThat(cache.isClosed()).isFalse(); |
| } |
| |
| /** |
| * DiskAccessException Test : Even if rolling doesn't free the space in stipulated time, the |
| * operation should not get stuck or see Exception |
| */ |
| @Test |
| public void testSyncModeAllowOperationToProceedEvenIfDiskSpaceIsNotSufficient() { |
| int[] diskDirSizes = {2048}; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {diskDirs[0]}, diskDirSizes); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setCompactionThreshold(100); |
| diskRegionProperties.setMaxOplogSize(100000000); |
| diskRegionProperties.setRegionName(regionName); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 100_000_000); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| int[] actualDiskSizes = ((InternalRegion) region).getDiskDirSizes(); |
| assertThat(actualDiskSizes).isEqualTo(diskDirSizes); |
| |
| // puts should not fail even after surpassing disk dir sizes |
| byte[] value = new byte[990]; |
| Arrays.fill(value, (byte) 77); |
| region.put(1, value); |
| region.put(1, value); |
| region.put(1, value); |
| |
| region.close(); |
| }// end of testSyncPersistRegionDAExp |
| |
| @Test |
| public void testAsyncModeAllowOperationToProceedEvenIfDiskSpaceIsNotSufficient() { |
| int[] dirSizes = {2048}; |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {diskDirs[0]}, dirSizes); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setCompactionThreshold(100); |
| diskRegionProperties.setMaxOplogSize(100000000); |
| diskRegionProperties.setBytesThreshold(1000000); |
| diskRegionProperties.setTimeInterval(1500000); |
| diskRegionProperties.setRegionName(regionName); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| DiskStore diskStore = |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 100_000_000); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, false, false, false, 0); |
| |
| assertThat(diskStore.getDiskDirSizes()).isEqualTo(dirSizes); |
| |
| // puts should not fail even after surpassing disk dir sizes |
| byte[] value = new byte[990]; |
| Arrays.fill(value, (byte) 77); |
| region.put(0, value); |
| region.put(1, value); |
| region.put(2, value); |
| |
| region.close(); |
| }// end of testAsyncPersistRegionDAExp |
| |
| /** |
| * DiskRegGetInvalidEntryTest: get invalid entry should return null. |
| */ |
| @Test |
| public void testDiskGetInvalidEntry() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(diskDirs, diskDirSizes); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setMaxOplogSize(1000000000); |
| diskRegionProperties.setBytesThreshold(1000000); |
| diskRegionProperties.setTimeInterval(1500000); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1_000_000_000); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, false, false, false, true, 1000); |
| |
| byte[] value = new byte[1024]; |
| Arrays.fill(value, (byte) 77); |
| for (int i = 0; i < 10; i++) { |
| region.put("key" + i, value); |
| } |
| |
| // invalidate an entry |
| region.invalidate("key1"); |
| |
| // get the invalid entry and verify that the value returned is null |
| assertThat(region.get("key1")).isNull(); |
| |
| region.close(); // closes disk file which will flush all buffers |
| |
| }// end of DiskRegGetInvalidEntryTest |
| |
| /** |
| * DiskRegionByteArrayJUnitTest: A byte array as a value put in local persistent region ,when |
| * retrieved from the disk should be correctly presented as a byte array |
| */ |
| @Test |
| public void testDiskRegionByteArray() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| int ENTRY_SIZE = 1024; |
| int OP_COUNT = 10; |
| String key = "K"; |
| byte[] value = new byte[ENTRY_SIZE]; |
| Arrays.fill(value, (byte) 77); |
| |
| // put an entry |
| region.put(key, value); |
| |
| // put few more entries to write on disk |
| for (int i = 0; i < OP_COUNT; i++) { |
| region.put(i, value); |
| } |
| |
| // get from disk |
| DiskId diskId = ((DiskEntry) ((InternalRegion) region).basicGetEntry("K")).getDiskId(); |
| Object val = getDiskRegion(region).get(diskId); |
| |
| // verify that the value retrieved above represents byte array. |
| // verify the length of the byte[] |
| assertThat((byte[]) val).hasSize(1024); |
| |
| // verify that the retrieved byte[] equals to the value put initially. |
| byte[] x = (byte[]) val; |
| boolean result = false; |
| for (int i = 0; i < x.length; i++) { |
| result = x[i] == value[i]; |
| } |
| |
| assertThat(result).as("The val obtained from disk is not euqal to the value put initially") |
| .isTrue(); |
| }// end of DiskRegionByteArrayJUnitTest |
| |
| /** |
| * DiskRegionFactoryJUnitTest: Test for verifying DiskRegion or SimpleDiskRegion. |
| */ |
| @Test |
| public void testInstanceOfDiskRegion() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirs(diskDirs); // diskDirs is an array of four diskDirs |
| diskRegionProperties.setRolling(true); |
| |
| DiskStoreFactory diskStoreFactory2 = toDiskStoreFactory(diskRegionProperties); |
| |
| DiskStore diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory2); |
| |
| Region<?, ?> region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.destroyRegion(); |
| closeDiskStore(diskStore); |
| |
| diskRegionProperties.setDiskDirs(diskDirs); // diskDirs is an array of four diskDirs |
| diskRegionProperties.setRolling(false); |
| |
| DiskStoreFactory diskStoreFactory1 = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory1); |
| |
| region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.destroyRegion(); |
| closeDiskStore(diskStore); |
| |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {newFolder(uniqueName + "1")}, |
| new int[] {2048}); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.destroyRegion(); |
| closeDiskStore(diskStore); |
| |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setMaxOplogSize(2048); |
| File dir = newFolder(uniqueName + "2"); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {dir}, new int[] {1024}); |
| |
| // TODO: test just rolls along? not sure of the value |
| } |
| |
| /** |
| * DiskRegionStatsJUnitTest : |
| */ |
| @Test |
| public void testStats() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {diskDirs[0]}, new int[] {diskDirSizes[0]}); |
| diskRegionProperties.setMaxOplogSize(2097152); |
| diskRegionProperties.setOverFlowCapacity(100); |
| diskRegionProperties.setRolling(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 2_097_152); |
| |
| int overflowCapacity = 100; |
| Region<Integer, Integer> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, overflowCapacity); |
| |
| DiskRegionStats stats = getDiskRegion(region).getStats(); |
| |
| // TODO: there are no assertions for counter |
| int counter = 0; |
| for (int i = 0; i < 5000; i++) { |
| region.put(i, i); |
| region.put(i, i); |
| region.put(i, i); |
| |
| if (i > overflowCapacity + 5) { |
| region.get(++counter); |
| region.get(counter); |
| } |
| |
| if (i > overflowCapacity) { |
| assertThat(stats.getNumEntriesInVM()).isEqualTo(overflowCapacity); |
| assertThat(stats.getNumOverflowOnDisk() - 1) |
| .as(" number of entries on disk not corrected expected " + (i - overflowCapacity) |
| + " but is " + stats.getNumOverflowOnDisk()) |
| .isEqualTo(i - overflowCapacity); |
| } |
| } |
| }// end of testStats |
| |
| /** |
| * DiskRegOverflowOnlyNoFilesTest: Overflow only mode has no files of previous run, during startup |
| */ |
| @Test |
| public void testOverflowOnlyNoFiles() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setTimeInterval(15000); |
| diskRegionProperties.setBytesThreshold(100000); |
| diskRegionProperties.setOverFlowCapacity(1000); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, false, false, false, true, 1_000); |
| |
| byte[] value = new byte[1024]; |
| Arrays.fill(value, (byte) 77); |
| |
| for (int i = 0; i < 100; i++) { |
| region.put(i, value); |
| } |
| |
| region.close(); // closes disk file which will flush all buffers |
| |
| // verify that there are no files in the disk dir |
| int fileCount = 0; |
| for (File diskDir : diskDirs) { |
| File[] files = diskDir.listFiles(); |
| fileCount += files.length; |
| } |
| |
| // since the diskStore has not been closed we expect two files: .lk and .if |
| assertThat(fileCount).isEqualTo(2); |
| cache.close(); |
| |
| // we now should only have zero |
| fileCount = 0; |
| for (File diskDir : diskDirs) { |
| File[] files = diskDir.listFiles(); |
| fileCount += files.length; |
| } |
| assertThat(fileCount).isEqualTo(0); |
| }// end of testOverflowOnlyNoFiles |
| |
| @Test |
| public void testPersistNoFiles() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setRegionName(regionName); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| byte[] value = new byte[1024]; |
| Arrays.fill(value, (byte) 77); |
| |
| for (int i = 0; i < 100; i++) { |
| region.put(i, value); |
| } |
| |
| region.destroyRegion(); |
| |
| int fileCount = 0; |
| for (File diskDir : diskDirs) { |
| File[] files = diskDir.listFiles(); |
| fileCount += files.length; |
| } |
| |
| // since the diskStore has not been closed we expect four files: .lk and .if and a crf and a drf |
| assertThat(fileCount).isEqualTo(4); |
| |
| cache.close(); |
| |
| // we now should only have zero since the disk store had no regions remaining in it. |
| fileCount = 0; |
| for (File diskDir : diskDirs) { |
| File[] files = diskDir.listFiles(); |
| fileCount += files.length; |
| } |
| assertThat(fileCount).isEqualTo(0); |
| } |
| |
| /** |
| * Test to verify that DiskAccessException is not thrown if rolling has been enabled. The test |
| * configurations will cause the disk to go full and wait for the compactor to release space. A |
| * DiskAccessException should not be thrown by this test |
| */ |
| @Test |
| public void testDiskAccessExceptionNotThrown() { |
| File diskDir = newFolder(uniqueName); |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setDiskDirsAndSizes(new File[] {diskDir}, new int[] {10240}); |
| diskRegionProperties.setMaxOplogSize(1024); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setSynchronous(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1024); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| byte[] bytes = new byte[256]; |
| |
| for (int i = 0; i < 1500; i++) { |
| region.put(i % 10, bytes); |
| } |
| } |
| |
| /** |
| * Regression test for TRAC #37605: The entry which has already been removed from the system by |
| * clear operation may still get appended to the LRUList |
| * |
| * <p> |
| * If an entry which has just been written on the disk, sees clear just before updating the |
| * EvictionList, then that deleted entry should not go into the EvictionList |
| */ |
| @Test |
| public void testClearInteractionWithLRUList_Bug37605() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setOverFlowCapacity(1); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setRegionName(regionName); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, |
| diskRegionProperties.getMaxOplogSize()); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, 1); |
| |
| AtomicReference<Future<Void>> doClearFuture = new AtomicReference<>(); |
| region.getAttributesMutator().addCacheListener(new CacheListenerAdapter<String, String>() { |
| @Override |
| public void afterCreate(EntryEvent<String, String> event) { |
| doClearFuture.set(executorServiceRule.runAsync(() -> { |
| try { |
| region.clear(); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| })); |
| } |
| }); |
| |
| region.create("key1", "value1"); |
| awaitFuture(doClearFuture); |
| assertThat(region.isEmpty()).isTrue(); |
| |
| VMLRURegionMap vmlruRegionMap = getVMLRURegionMap(region); |
| assertThat(vmlruRegionMap.getEvictionList().getEvictableEntry()).isNull(); |
| } |
| |
| /** |
| * Regression test for TRAC #37606: A cleared entry may still get logged into the Oplog due to a |
| * very small window of race condition |
| * |
| * <p> |
| * As in the clear operation, previously the code was such that Htree Ref was first reset & then |
| * the underlying region map got cleared, it was possible for the create op to set the new Htree |
| * ref in thread local. Now if clear happened, the entry on which create op is going on was no |
| * longer valid, but we would not be able to detect the conflict. The fix was to first clear the |
| * region map & then reset the Htree Ref. |
| */ |
| @Test |
| public void testClearInteractionWithCreateOperation_Bug37606() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setRegionName(regionName); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| AtomicReference<Future<Void>> doCreateFuture = new AtomicReference<>(); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserverHolder.setInstance(new CacheObserverAdapter() { |
| @Override |
| public void beforeDiskClear() { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| doCreateFuture.set(executorServiceRule.runAsync(() -> { |
| try { |
| region.create("key1", "value1"); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| })); |
| } |
| }); |
| |
| region.clear(); |
| awaitFuture(doCreateFuture); |
| |
| // We expect 1 entry to exist, because the clear was triggered before the update |
| assertThat(region.size()).isEqualTo(1); |
| |
| // close and recreate region and the entry should still exist |
| region.close(); |
| Region<String, String> region2 = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| assertThat(region2.size()).isEqualTo(1); |
| } |
| |
| /** |
| * Regression test for TRAC #37606: A cleared entry may still get logged into the Oplog due to a |
| * very small window of race condition |
| * |
| * <p> |
| * Similar test in case of 'update' |
| */ |
| @Test |
| public void testClearInteractionWithUpdateOperation_Bug37606() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setRegionName(regionName); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.create("key1", "value1"); |
| |
| AtomicReference<Future<Void>> doPutFuture = new AtomicReference<>(); |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserverHolder.setInstance(new CacheObserverAdapter() { |
| @Override |
| public void beforeDiskClear() { |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; |
| doPutFuture.set(executorServiceRule.runAsync(() -> { |
| try { |
| region.put("key1", "value1"); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| })); |
| } |
| }); |
| |
| region.clear(); |
| awaitFuture(doPutFuture); |
| |
| // We expect 1 entry to exist, because the clear was triggered before the update |
| assertThat(region.size()).isEqualTo(1); |
| |
| // close and recreate region and the entry should still exist |
| region.close(); |
| Region<String, String> region2 = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| assertThat(region2.size()).isEqualTo(1); |
| } |
| |
| /** |
| * If IOException occurs while updating an entry in a persist only sync mode, DiskAccessException |
| * should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryUpdateInSyncPersistOnlyForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.create("key1", "value1"); |
| |
| closeOplogFileChannel(region); |
| |
| Throwable thrown = catchThrowable(() -> region.put("key1", "value2")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while updating an entry in a persist overflow sync mode, we should get |
| * DiskAccessException & region be destroyed |
| */ |
| @Test |
| public void testEntryUpdateInSyncOverFlowPersistOnlyForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| region.create("key1", "value1"); |
| |
| closeOplogFileChannel(region); |
| |
| Throwable thrown = catchThrowable(() -> region.put("key1", "value2")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while invalidating an entry in a persist only sync mode, |
| * DiskAccessException should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryInvalidateInSyncPersistOnlyForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.create("key1", "value1"); |
| |
| closeOplogFileChannel(region); |
| |
| Throwable thrown = catchThrowable(() -> region.invalidate("key1")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while invalidating an entry in a persist overflow sync mode, |
| * DiskAccessException should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryInvalidateInSyncPersistOverflowForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| region.create("key1", "value1"); |
| |
| closeOplogFileChannel(region); |
| |
| Throwable thrown = catchThrowable(() -> region.invalidate("key1")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while creating an entry in a persist only sync mode, DiskAccessException |
| * should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryCreateInSyncPersistOnlyForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| closeOplogFileChannel(region); |
| |
| Throwable thrown = catchThrowable(() -> region.create("key1", "value1")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while creating an entry in a persist overflow sync mode, |
| * DiskAccessException should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryCreateInSyncPersistOverflowForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| closeOplogFileChannel(region); |
| |
| Throwable thrown = catchThrowable(() -> region.create("key1", "value1")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while destroying an entry in a persist only sync mode, |
| * DiskAccessException should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryDestructionInSyncPersistOnlyForIOExceptionCase() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.create("key1", "value1"); |
| |
| // Get the oplog handle & hence the underlying file & close it |
| getDiskRegion(region).testHook_getChild().testClose(); |
| |
| Throwable thrown = catchThrowable(() -> region.destroy("key1")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while destroying an entry in a persist overflow sync mode, |
| * DiskAccessException should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryDestructionInSyncPersistOverflowForIOExceptionCase() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| region.create("key1", "value1"); |
| |
| // Get the oplog handle & hence the underlying file & close it |
| getDiskRegion(region).testHook_getChild().testClose(); |
| |
| Throwable thrown = catchThrowable(() -> region.destroy("key1")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while updating an entry in a Overflow only sync mode, |
| * DiskAccessException should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryUpdateInSyncOverflowOnlyForIOExceptionCase() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(false); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 1); |
| |
| region.create("key1", "value1"); |
| region.create("key2", "value2"); |
| |
| getDiskRegion(region).testHookCloseAllOverflowChannels(); |
| |
| // Update key1, so that key2 goes on disk & encounters an exception |
| Throwable thrown = catchThrowable(() -> region.put("key1", "value1'")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while creating an entry in a Overflow only sync mode, |
| * DiskAccessException should occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryCreateInSyncOverflowOnlyForIOExceptionCase() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(false); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 1); |
| |
| region.create("key1", "value1"); |
| region.create("key2", "value2"); |
| |
| getDiskRegion(region).testHookCloseAllOverflowChannels(); |
| |
| Throwable thrown = catchThrowable(() -> region.create("key3", "value3")); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * A deletion of an entry in overflow only mode should not cause any eviction & hence no |
| * DiskAccessException |
| */ |
| @Test |
| public void testEntryDeletionInSyncOverflowOnlyForIOExceptionCase() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(false); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 1); |
| |
| region.create("key1", "value1"); |
| region.create("key2", "value2"); |
| region.create("key3", "value3"); |
| |
| getDiskRegion(region).testHookCloseAllOverflowChannels(); |
| |
| // Update key1, so that key2 goes on disk & encounters an exception |
| region.destroy("key1"); |
| region.destroy("key3"); |
| } |
| |
| /** |
| * If IOException occurs while updating an entry in an Asynch mode, DiskAccessException should |
| * occur & region should be destroyed |
| */ |
| @Test |
| public void testEntryUpdateInASyncPersistOnlyForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setBytesThreshold(48); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, false, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| closeOplogFileChannel(region); |
| |
| region.create("key1", new byte[16]); |
| region.create("key2", new byte[16]); |
| |
| DiskRegion diskRegion = getDiskRegion(region); |
| diskRegion.flushForTesting(); |
| |
| verifyClosedDueToDiskAccessException(region); |
| } |
| |
| /** |
| * If IOException occurs while updating an entry in an already initialized DiskRegion ,then the |
| * cache servers should not be stopped , if any running as they are no clients connected to it. |
| */ |
| @Test |
| public void testBridgeServerStoppingInSyncPersistOnlyForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| CacheServer cacheServer = cache.addCacheServer(); |
| cacheServer.setPort(0); |
| cacheServer.start(); |
| |
| region.create("key1", new byte[16]); |
| region.create("key2", new byte[16]); |
| |
| closeOplogFileChannel(region); |
| |
| Throwable thrown = catchThrowable(() -> region.put("key2", new byte[16])); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| verifyClosedDueToDiskAccessException(region); |
| |
| // a disk access exception in a server should always stop the server |
| List<CacheServer> cacheServers = cache.getCacheServers(); |
| assertThat(cacheServers).isEmpty(); |
| } |
| |
| /** |
| * Regression test for TRAC #40250: If roller is active at the time of region.close it can end up |
| * writing a dummy byte & thus lose the original value |
| */ |
| @Test |
| public void testDummyByteBugDuringRegionClose_Bug40250() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setCompactionThreshold(100); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| // create some string entries |
| for (int i = 0; i < 2; ++i) { |
| region.put(String.valueOf(i), String.valueOf(i)); |
| } |
| |
| AtomicReference<Future<Void>> closeRegionFuture = new AtomicReference<>(); |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserverHolder.setInstance(new CacheObserverAdapter() { |
| @Override |
| public void beforeGoingToCompact() { |
| closeRegionFuture.set(executorServiceRule.runAsync(() -> { |
| try { |
| region.close(); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| })); |
| |
| // await until the roll flag DiskRegion.OplogCompactor is false (not currently exposed) |
| try { |
| Thread.sleep(8000); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| }); |
| |
| getDiskRegion(region).forceRolling(); |
| |
| awaitFuture(closeRegionFuture); |
| |
| // Restart the region |
| Region<String, String> region2 = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| for (int i = 0; i < 2; ++i) { |
| assertThat(region2.get(String.valueOf(i))).isEqualTo(String.valueOf(i)); |
| } |
| } |
| |
| /** |
| * If IOException occurs while initializing a region, then the cache servers should not be |
| * stopped |
| */ |
| @Test |
| public void testBridgeServerRunningInSyncPersistOnlyForIOExceptionCase() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(true); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setMaxOplogSize(100000); // just needs to be bigger than 65550 |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 100_000); |
| |
| Region<String, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, DEFAULT_ENTRIES_MAXIMUM); |
| |
| CacheServer cacheServer = cache.addCacheServer(); |
| cacheServer.setPort(0); |
| cacheServer.start(); |
| |
| region.create("key1", new byte[16]); |
| region.create("key2", new byte[16]); |
| |
| // Get the oplog file path |
| UninterruptibleFileChannel oplogFileChannel = |
| getDiskRegion(region).testHook_getChild().getFileChannel(); |
| |
| // corrupt the opfile |
| oplogFileChannel.position(2); |
| ByteBuffer byteBuffer = ByteBuffer.allocate(416); |
| for (int i = 0; i < 5; ++i) { |
| byteBuffer.putInt(i); |
| } |
| byteBuffer.flip(); |
| |
| // Corrupt the oplogFile |
| oplogFileChannel.write(byteBuffer); |
| |
| // Close the region |
| region.close(); |
| assertThat(region.isDestroyed()).isTrue(); |
| |
| Throwable thrown = catchThrowable(() -> createRegion(regionName, diskStoreName, true, true, |
| false, true, DEFAULT_ENTRIES_MAXIMUM)); |
| assertThat(thrown).isInstanceOf(DiskAccessException.class); |
| |
| List cacheServers = cache.getCacheServers(); |
| assertThat(cacheServers).isNotEmpty(); |
| } |
| |
| @Test |
| public void testEarlyTerminationOfCompactorByDefault() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setCompactionThreshold(100); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setMaxOplogSize(100); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 100); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| AtomicReference<Future<Void>> closeRegionFuture = new AtomicReference<>(); |
| CountDownLatch closingRegionLatch = new CountDownLatch(1); |
| CountDownLatch allowCompactorLatch = new CountDownLatch(1); |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserverHolder.setInstance(new CacheObserverAdapter() { |
| private final CountDownLatch compactorSignalledLatch = new CountDownLatch(1); |
| private final CountDownLatch compactorCompletedLatch = new CountDownLatch(1); |
| private volatile int oplogSizeBeforeRolling; |
| |
| @Override |
| public void beforeGoingToCompact() { |
| try { |
| // wait for operations to get over |
| awaitLatch(allowCompactorLatch); |
| |
| DiskRegion diskRegion = getDiskRegion(region); |
| oplogSizeBeforeRolling = diskRegion.getOplogIdToOplog().size(); |
| assertThat(oplogSizeBeforeRolling).isGreaterThan(0); |
| |
| closeRegionFuture.set(executorServiceRule.runAsync(() -> { |
| closingRegionLatch.countDown(); |
| DiskStoreImpl diskStoreImpl = getDiskStore(region); |
| region.close(); |
| diskStoreImpl.close(); |
| })); |
| |
| // wait for th to call afterSignallingCompactor |
| awaitLatch(compactorSignalledLatch); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| @Override |
| public void afterSignallingCompactor() { |
| try { |
| compactorSignalledLatch.countDown(); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| @Override |
| public void afterStoppingCompactor() { |
| try { |
| awaitLatch(compactorCompletedLatch); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| @Override |
| public void afterHavingCompacted() { |
| try { |
| DiskRegion diskRegion = getDiskRegion(region); |
| assertThat(diskRegion.getOplogIdToOplog()).hasSize(oplogSizeBeforeRolling); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } finally { |
| compactorCompletedLatch.countDown(); |
| } |
| } |
| }); |
| |
| // create some string entries |
| for (int i = 0; i < 100; ++i) { |
| region.put(String.valueOf(i), String.valueOf(i)); |
| } |
| |
| allowCompactorLatch.countDown(); |
| awaitLatch(closingRegionLatch); |
| awaitFuture(closeRegionFuture); |
| } |
| |
| @Test |
| public void throwsIllegalStateExceptionIfMissingOplogDrf() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| DiskStore diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.close(); |
| closeDiskStore(diskStore); |
| |
| Path oplogFile = Files.list(diskDirs[0].toPath()) |
| .filter(path -> path.toString().endsWith(".drf")).findFirst().get(); |
| |
| Files.delete(oplogFile); |
| |
| String expectedMessage = |
| "The following required files could not be found: *.drf files with these ids:"; |
| |
| Throwable thrown = catchThrowable(() -> createDiskStoreWithSizeInBytes(diskStoreName, |
| toDiskStoreFactory(diskRegionProperties))); |
| assertThat(thrown).isInstanceOf(IllegalStateException.class) |
| .hasMessageContaining(expectedMessage).hasNoCause(); |
| } |
| |
| @Test |
| public void throwsIllegalStateExceptionIfMissingOplogCrf() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| DiskStore diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.close(); |
| closeDiskStore(diskStore); |
| |
| Path oplogFile = Files.list(diskDirs[0].toPath()) |
| .filter(path -> path.toString().endsWith(".crf")).findFirst().get(); |
| |
| Files.delete(oplogFile); |
| |
| String expectedMessage = |
| "The following required files could not be found: *.crf files with these ids:"; |
| |
| Throwable thrown = catchThrowable(() -> createDiskStoreWithSizeInBytes(diskStoreName, |
| toDiskStoreFactory(diskRegionProperties))); |
| assertThat(thrown).isInstanceOf(IllegalStateException.class) |
| .hasMessageContaining(expectedMessage).hasNoCause(); |
| } |
| |
| @Test |
| public void testNoTerminationOfCompactorTillRollingCompleted() throws Exception { |
| System.setProperty(DiskStoreImpl.COMPLETE_COMPACTION_BEFORE_TERMINATION_PROPERTY_NAME, "true"); |
| |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setCompactionThreshold(100); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setMaxOplogSize(100); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 100); |
| |
| Region<Object, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| AtomicReference<Future<Void>> closeRegionFuture = new AtomicReference<>(); |
| CountDownLatch closeThreadStartedLatch = new CountDownLatch(1); |
| CountDownLatch allowCompactorLatch = new CountDownLatch(1); |
| |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| CacheObserverHolder.setInstance(new CacheObserverAdapter() { |
| private final CountDownLatch compactorSignalledLatch = new CountDownLatch(1); |
| private final CountDownLatch compactorCompletedLatch = new CountDownLatch(1); |
| private volatile int oplogSizeBeforeRolling; |
| |
| @Override |
| public void beforeGoingToCompact() { |
| try { |
| // wait for operations to get over |
| awaitLatch(allowCompactorLatch); |
| |
| DiskRegion diskRegion = getDiskRegion(region); |
| oplogSizeBeforeRolling = diskRegion.getOplogIdToOplog().size(); |
| assertThat(oplogSizeBeforeRolling).isGreaterThan(0); |
| |
| closeRegionFuture.set(executorServiceRule.runAsync(() -> { |
| closeThreadStartedLatch.countDown(); |
| DiskStoreImpl diskStore = getDiskStore(region); |
| region.close(); |
| diskStore.close(); |
| })); |
| |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| @Override |
| public void afterSignallingCompactor() { |
| try { |
| compactorSignalledLatch.countDown(); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| @Override |
| public void afterStoppingCompactor() { |
| try { |
| awaitLatch(compactorCompletedLatch); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| @Override |
| public void afterHavingCompacted() { |
| try { |
| awaitLatch(compactorSignalledLatch); |
| DiskRegion diskRegion = getDiskRegion(region); |
| assertThat(diskRegion.getOplogIdToOplog()).hasSize(oplogSizeBeforeRolling); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } finally { |
| compactorCompletedLatch.countDown(); |
| } |
| } |
| }); |
| |
| // create some string entries |
| for (int i = 0; i < 100; ++i) { |
| region.put(String.valueOf(i), String.valueOf(i)); |
| } |
| |
| allowCompactorLatch.countDown(); |
| awaitLatch(closeThreadStartedLatch); |
| awaitFuture(closeRegionFuture); |
| } |
| |
| /** |
| * Regression test for TRAC #40648: oplog rolling fails reading with Bad file descriptor |
| */ |
| @Test |
| public void testBug40648part1() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setMaxOplogSize(500 * 2); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1000); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, true, 1); |
| |
| // long loop and no assertions just to verify nothing throws |
| |
| byte[] payload = new byte[100]; |
| for (int i = 0; i < 1000; i++) { |
| region.put(0, payload); |
| region.put(1, payload); |
| } |
| } |
| |
| /** |
| * Regression test for TRAC #40648: oplog rolling fails reading with Bad file descriptor |
| * |
| * <p> |
| * Same as part1 but no persistence. |
| */ |
| @Test |
| public void testBug40648part2() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(true); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setMaxOplogSize(500 * 2); |
| diskRegionProperties.setOverFlowCapacity(1); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 1000); |
| |
| Region<Integer, Object> region = |
| createRegion(regionName, diskStoreName, false, true, false, true, 1); |
| |
| // long loop and no assertions just to verify nothing throws |
| |
| byte[] payload = new byte[100]; |
| for (int i = 0; i < 1000; i++) { |
| region.put(0, payload); |
| region.put(1, payload); |
| } |
| } |
| |
| @Test |
| public void testForceCompactionDoesRoll() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setAllowForceCompaction(true); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| DiskStore diskStore = getDiskStore(region); |
| |
| assertThat(diskStore.forceCompaction()).isFalse(); |
| |
| region.put("key1", "value1"); |
| region.put("key2", "value2"); |
| |
| assertThat(diskStore.forceCompaction()).isFalse(); |
| |
| region.remove("key1"); |
| region.remove("key2"); |
| |
| // the following forceCompaction should go ahead and do a roll and compact it |
| boolean compacted = diskStore.forceCompaction(); |
| assertThat(compacted).isTrue(); |
| } |
| |
| /** |
| * Confirm that forceCompaction waits for the compaction to finish |
| */ |
| @Test |
| public void testNonDefaultCompaction() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setAllowForceCompaction(true); |
| diskRegionProperties.setPersistBackup(true); |
| diskRegionProperties.setCompactionThreshold(90); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.put("key1", "value1"); |
| region.put("key2", "value2"); |
| |
| // Only remove 1 of the entries. This wouldn't trigger compaction with the default threshold, |
| // since there are two entries. |
| region.remove("key1"); |
| |
| // the following forceCompaction should go ahead and do a roll and compact it |
| Oplog oplog = getDiskRegion(region).testHook_getChild(); |
| boolean compacted = getDiskStore(region).forceCompaction(); |
| |
| assertThat(oplog.testConfirmCompacted()).isTrue(); |
| assertThat(compacted).isTrue(); |
| } |
| |
| /** |
| * Confirm that forceCompaction waits for the compaction to finish |
| */ |
| @Test |
| public void testForceCompactionIsSync() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setAllowForceCompaction(true); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.put("key1", "value1"); |
| region.put("key2", "value2"); |
| region.remove("key1"); |
| region.remove("key2"); |
| |
| // now that it is compactable the following forceCompaction should |
| // go ahead and do a roll and compact it. |
| Oplog oplog = getDiskRegion(region).testHook_getChild(); |
| boolean compacted = getDiskStore(region).forceCompaction(); |
| |
| assertThat(oplog.testConfirmCompacted()).isTrue(); |
| assertThat(compacted).isTrue(); |
| |
| CachePerfStats stats = cache.getCachePerfStats(); |
| assertThat(stats.getDiskTasksWaiting()).isGreaterThanOrEqualTo(0); |
| } |
| |
| /** |
| * Regression test for TRAC #40876: diskReg tests (disk persistence and overflow-to-disk) fail |
| * when an invalidated entry is recovered from disk |
| */ |
| @Test |
| public void testBug40876() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| region.put("key1", "value1"); |
| region.invalidate("key1"); |
| region.close(); |
| |
| region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| Object value = ((InternalPersistentRegion) region).getValueOnDiskOrBuffer("key1"); |
| assertThat(value).isEqualTo(Token.INVALID); |
| assertThat(region.containsValueForKey("key1")).isFalse(); |
| } |
| |
| /** |
| * Regression test for TRAC #41822: Only one disk directory is being used when multiple |
| * directories are specified |
| * |
| * <p> |
| * Make sure oplog created by recovery goes in the proper directory |
| */ |
| @Test |
| public void testBug41822() { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setMaxOplogSize(500); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| DiskStore diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 500); |
| |
| Region<String, Object> region = |
| createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| |
| byte[] payload = new byte[100]; |
| |
| region.put("key0", payload); |
| assertThat(getDiskRegion(region).testHook_getChild().getDirectoryHolder().getDir()) |
| .isEqualTo(diskDirs[0]); |
| |
| region.close(); |
| closeDiskStore(diskStore); |
| |
| diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 500); |
| region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| region.put("key1", payload); |
| |
| assertThat(getDiskRegion(region).testHook_getChild().getDirectoryHolder().getDir()) |
| .isEqualTo(diskDirs[1]); |
| |
| region.close(); |
| closeDiskStore(diskStore); |
| |
| diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 500); |
| region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| region.put("key2", payload); |
| |
| assertThat(getDiskRegion(region).testHook_getChild().getDirectoryHolder().getDir()) |
| .isEqualTo(diskDirs[2]); |
| |
| region.close(); |
| closeDiskStore(diskStore); |
| |
| diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 500); |
| region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| region.put("key3", payload); |
| |
| assertThat(getDiskRegion(region).testHook_getChild().getDirectoryHolder().getDir()) |
| .isEqualTo(diskDirs[3]); |
| |
| region.close(); |
| closeDiskStore(diskStore); |
| |
| diskStore = createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 500); |
| region = createRegion(regionName, diskStoreName, true, true, false, false, 0); |
| region.put("key4", payload); |
| |
| assertThat(getDiskRegion(region).testHook_getChild().getDirectoryHolder().getDir()) |
| .isEqualTo(diskDirs[0]); |
| } |
| |
| /** |
| * Regression test for TRAC #41770: ConcurrentRegionOperationsJUnitTest fails with duplicate |
| * create in an oplog |
| */ |
| @Test |
| public void testBug41770() throws Exception { |
| DiskRegionProperties diskRegionProperties = new DiskRegionProperties(); |
| diskRegionProperties.setRegionName(regionName); |
| diskRegionProperties.setOverflow(false); |
| diskRegionProperties.setRolling(false); |
| diskRegionProperties.setDiskDirs(diskDirs); |
| diskRegionProperties.setPersistBackup(true); |
| |
| DiskStoreFactory diskStoreFactory = toDiskStoreFactory(diskRegionProperties); |
| |
| createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory); |
| |
| Region<String, String> region = |
| createRegion(regionName, diskStoreName, true, false, false, false, 0); |
| |
| // Install a listener then gets called when the async flusher threads finds an entry to flush |
| LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; |
| DoRegionClearDuringDiskStoreFlush doRegionClearDuringDiskStoreFlush = |
| new DoRegionClearDuringDiskStoreFlush(region); |
| CacheObserverHolder.setInstance(doRegionClearDuringDiskStoreFlush); |
| |
| region.create("KEY", "VALUE1"); |
| getDiskStore(region).forceFlush(); |
| |
| doRegionClearDuringDiskStoreFlush.waitForCompletion(); |
| // we should now have two creates in our oplog. |
| |
| region.close(); |
| |
| // do a recovery it will fail with an assertion if this bug is not fixed |
| Throwable thrown = |
| catchThrowable(() -> createRegion(regionName, diskStoreName, true, false, false, false, 0)); |
| assertThat(thrown).isNull(); |
| } |
| |
| private void awaitLatch(CountDownLatch latch) throws InterruptedException { |
| assertThat(latch.await(5, MINUTES)).as("Timed out awaiting latch").isTrue(); |
| } |
| |
| private void awaitFuture(AtomicReference<Future<Void>> voidFuture) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| await().until(() -> voidFuture.get() != null); |
| awaitFuture(voidFuture.get()); |
| } |
| |
| private void awaitFuture(Future<Void> voidFuture) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| voidFuture.get(5, MINUTES); |
| } |
| |
| private File newFolder(String folder) { |
| try { |
| return temporaryFolder.newFolder(folder); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| private File createDirectory(File parentDirectory, String name) { |
| File file = new File(parentDirectory, name); |
| assertThat(file.mkdir()).isTrue(); |
| return file; |
| } |
| |
| private <K, V> Region<K, V> createRegion(String regionName, |
| String diskStoreName, |
| boolean isDataPolicyPersistent, |
| boolean isDataSynchronous, boolean isHeapEviction, |
| boolean isOverflowEviction, |
| int overflowCapacity) { |
| RegionFactory<K, V> regionFactory = cache.createRegionFactory(LOCAL); |
| |
| if (isDataPolicyPersistent) { |
| regionFactory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| } |
| |
| regionFactory.setDiskStoreName(diskStoreName); |
| regionFactory.setDiskSynchronous(isDataSynchronous); |
| |
| if (isHeapEviction) { |
| regionFactory.setEvictionAttributes(heapEvictionAttributes); |
| } else if (isOverflowEviction) { |
| regionFactory.setEvictionAttributes(createOverflowEvictionAttributes(overflowCapacity)); |
| } |
| |
| return regionFactory.create(regionName); |
| } |
| |
| private EvictionAttributes createOverflowEvictionAttributes(int overflowCapacity) { |
| return createLRUEntryAttributes(overflowCapacity, OVERFLOW_TO_DISK); |
| } |
| |
| private DiskStoreFactory toDiskStoreFactory(DiskRegionProperties diskRegionProperties) { |
| DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); |
| |
| diskStoreFactory.setAllowForceCompaction(diskRegionProperties.getAllowForceCompaction()); |
| diskStoreFactory.setAutoCompact(diskRegionProperties.isRolling()); |
| diskStoreFactory.setCompactionThreshold(diskRegionProperties.getCompactionThreshold()); |
| |
| if (diskRegionProperties.getDiskDirSizes() == null) { |
| int[] diskDirSizes = new int[diskRegionProperties.getDiskDirs().length]; |
| Arrays.fill(diskDirSizes, Integer.MAX_VALUE); |
| diskStoreFactory.setDiskDirsAndSizes(diskRegionProperties.getDiskDirs(), diskDirSizes); |
| } else { |
| diskStoreFactory.setDiskDirsAndSizes(diskRegionProperties.getDiskDirs(), |
| diskRegionProperties.getDiskDirSizes()); |
| } |
| |
| if (diskRegionProperties.getBytesThreshold() > Integer.MAX_VALUE) { |
| diskStoreFactory.setQueueSize(Integer.MAX_VALUE); |
| } else { |
| diskStoreFactory.setQueueSize((int) diskRegionProperties.getBytesThreshold()); |
| } |
| |
| if (diskRegionProperties.getTimeInterval() != -1) { |
| diskStoreFactory.setTimeInterval(diskRegionProperties.getTimeInterval()); |
| } |
| |
| return diskStoreFactory; |
| } |
| |
| private DiskStore createDiskStoreWithSizeInBytes(String diskStoreName, |
| DiskStoreFactory diskStoreFactory) { |
| return createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, MAX_OPLOG_SIZE_IN_BYTES); |
| } |
| |
| private DiskStore createDiskStoreWithSizeInBytes(String diskStoreName, |
| DiskStoreFactory diskStoreFactory, |
| long maxOplogSizeInBytes) { |
| ((DiskStoreFactoryImpl) diskStoreFactory).setMaxOplogSizeInBytes(maxOplogSizeInBytes); |
| ((DiskStoreFactoryImpl) diskStoreFactory).setDiskDirSizesUnit(DiskDirSizesUnit.BYTES); |
| return diskStoreFactory.create(diskStoreName); |
| } |
| |
| private DiskStoreImpl toDiskStoreImpl(DiskStore diskStore) { |
| return (DiskStoreImpl) diskStore; |
| } |
| |
| private DiskRegion getDiskRegion(Region region) { |
| return ((InternalRegion) region).getDiskRegion(); |
| } |
| |
| private DiskStoreImpl getDiskStore(Region region) { |
| return ((DiskRecoveryStore) region).getDiskStore(); |
| } |
| |
| private InternalRegion toInternalRegion(Region region) { |
| return (InternalRegion) region; |
| } |
| |
| private VMLRURegionMap getVMLRURegionMap(Region<?, ?> region) { |
| return (VMLRURegionMap) toInternalRegion(region).getRegionMap(); |
| } |
| |
| private void closeDiskStore(DiskStore diskStore) { |
| DiskStoreImpl internalDiskStore = (DiskStoreImpl) diskStore; |
| internalDiskStore.close(); |
| |
| cache.removeDiskStore(internalDiskStore); |
| } |
| |
| private void closeOplogFileChannel(Region<?, ?> region) throws IOException { |
| // Get the oplog handle & hence the underlying file & close it |
| UninterruptibleFileChannel oplogFileChannel = |
| getDiskRegion(region).testHook_getChild().getFileChannel(); |
| oplogFileChannel.close(); |
| } |
| |
| private void assertThatArrayEquals(Object expected, Object actual) { |
| assertThat(actual).isInstanceOf(expected.getClass()); |
| |
| int actualLength = Array.getLength(actual); |
| assertThat(actualLength).isEqualTo(Array.getLength(expected)); |
| |
| for (int i = 0; i < actualLength; i++) { |
| assertThat(Array.get(actual, i)).isEqualTo(Array.get(expected, i)); |
| } |
| } |
| |
| private void verifyClosedDueToDiskAccessException(Region<?, ?> region) { |
| await().until(() -> getDiskStore(region).isClosed()); |
| await().until(() -> cache.isClosed()); |
| Throwable thrown = catchThrowable(() -> cache.createRegionFactory().create(regionName)); |
| assertThat(thrown).isInstanceOf(CacheClosedException.class) |
| .hasCauseInstanceOf(DiskAccessException.class); |
| } |
| |
| private class DelayedGet implements Runnable { |
| |
| private final CountDownLatch goLatch; |
| private final Region<Integer, ?> region; |
| |
| DelayedGet(CountDownLatch goLatch, Region<Integer, ?> region) { |
| this.goLatch = goLatch; |
| this.region = region; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| awaitLatch(goLatch); |
| region.get(0); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| } |
| |
| private static class AfterDestroyListener<K, V> extends CacheListenerAdapter<K, V> { |
| |
| private volatile EntryEvent<K, V> lastEvent; |
| |
| @Override |
| public void afterDestroy(EntryEvent<K, V> event) { |
| lastEvent = event; |
| } |
| |
| EntryEvent<K, V> getLastEvent() { |
| return lastEvent; |
| } |
| } |
| |
| private class Puts implements Runnable { |
| |
| private final int dataSize; |
| private final Region<String, byte[]> region; |
| private final AtomicBoolean[] putSuccessful; |
| |
| private volatile boolean diskAccessExceptionOccurred; |
| |
| Puts(Region<String, byte[]> region) { |
| this(region, 1024); |
| } |
| |
| Puts(Region<String, byte[]> region, int dataSize) { |
| this.region = region; |
| this.dataSize = dataSize; |
| putSuccessful = |
| new AtomicBoolean[] {new AtomicBoolean(), new AtomicBoolean(), new AtomicBoolean()}; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| performPuts(); |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| boolean diskAccessExceptionOccurred() { |
| return diskAccessExceptionOccurred; |
| } |
| |
| boolean putSuccessful(int index) { |
| return putSuccessful[index].get(); |
| } |
| |
| void performPuts() { |
| diskAccessExceptionOccurred = false; |
| putSuccessful[0].set(false); |
| putSuccessful[1].set(false); |
| putSuccessful[2].set(false); |
| |
| try { |
| byte[] bytes = new byte[dataSize]; |
| region.put("1", bytes); |
| putSuccessful[0].set(true); |
| region.put("2", bytes); |
| putSuccessful[1].set(true); |
| region.put("3", bytes); |
| putSuccessful[2].set(true); |
| } catch (DiskAccessException e) { |
| diskAccessExceptionOccurred = true; |
| } |
| } |
| } |
| |
| /** |
| * Performs Region clear during DiskStore flush and then performs create after flushing. |
| * |
| * <p> |
| * TRAC #41770: ConcurrentRegionOperationsJUnitTest fails with duplicate create in an oplog |
| */ |
| private class DoRegionClearDuringDiskStoreFlush extends CacheObserverAdapter { |
| |
| private final CountDownLatch completedLatch = new CountDownLatch(1); |
| private final Region<String, String> region; |
| |
| private boolean cleared; |
| |
| DoRegionClearDuringDiskStoreFlush(Region<String, String> region) { |
| this.region = region; |
| } |
| |
| @Override |
| public synchronized void afterWritingBytes() { |
| try { |
| if (cleared) { |
| CacheObserverHolder.setInstance(null); |
| // now that the flusher finished the async create of VALUE1 do another create |
| region.create("KEY", "VALUE2"); |
| completedLatch.countDown(); |
| } |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| @Override |
| public synchronized void goingToFlush() { |
| try { |
| if (!cleared) { |
| // once the flusher is stuck in our listener do a region clear |
| region.clear(); |
| cleared = true; |
| } |
| } catch (AssertionError | Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| |
| synchronized void waitForCompletion() throws InterruptedException { |
| awaitLatch(completedLatch); |
| } |
| } |
| } |