blob: 7e27a924d90c1698a75ff01d6d333bce140e78e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static java.util.Arrays.asList;
import static org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMORY_SIZE;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.getVMId;
import static org.apache.geode.test.dunit.rules.DistributedRule.getDistributedSystemProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.compression.Compressor;
import org.apache.geode.compression.SnappyCompressor;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheXmlRule;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
import org.apache.geode.test.junit.runners.GeodeParamsRunner;
@RunWith(GeodeParamsRunner.class)
@SuppressWarnings("serial")
public class DestroyRegionDuringGIIDistributedTest implements Serializable {
private static final int CHUNK_SIZE = 500 * 1024;
private static final int ENTRIES_COUNT = 1000;
private static final int VALUE_SIZE = CHUNK_SIZE * 10 / ENTRIES_COUNT;
private static final InternalCache DUMMY_CACHE = mock(InternalCache.class);
private static final Runnable DUMMY_RUNNABLE = () -> {
};
private static final AtomicReference<CacheDefinition> CACHE_DEFINITION = new AtomicReference<>();
private static final AtomicReference<Runnable> TEAR_DOWN = new AtomicReference<>();
private static final AtomicReference<InternalCache> CACHE = new AtomicReference<>(DUMMY_CACHE);
private static final AtomicReference<File> DISK_DIR = new AtomicReference<>();
private final byte[][] values = new byte[ENTRIES_COUNT][];
private String rootRegionName;
private String regionName;
private VM vm0;
private VM vm1;
private VM vm2;
private VM vm3;
@Rule
public DistributedRule distributedRule = new DistributedRule();
@Rule
public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
@Rule
public SerializableTestName testName = new SerializableTestName();
@Rule
public CacheXmlRule cacheXmlRule = new CacheXmlRule()
.cacheBuilder(() -> CACHE_DEFINITION.get().createCache());
@Before
public void setUp() {
vm0 = getVM(0);
vm1 = getVM(1);
vm2 = getVM(2);
vm3 = getVM(3);
regionName = getUniqueName() + "_region";
rootRegionName = getUniqueName() + "_rootRegion";
for (int i = 0; i < ENTRIES_COUNT; i++) {
values[i] = new byte[VALUE_SIZE];
Arrays.fill(values[i], (byte) 0x42);
}
for (VM memberVM : asList(vm0, vm1, vm2, vm3)) {
memberVM.invoke(() -> {
TEAR_DOWN.set(DUMMY_RUNNABLE);
CACHE.set(DUMMY_CACHE);
DISK_DIR.set(temporaryFolder.newFolder("diskDir-" + getVMId()).getAbsoluteFile());
});
}
}
@After
public void tearDown() {
for (VM vm : asList(vm0, vm1, vm2, vm3)) {
vm.invoke(() -> {
TEAR_DOWN.getAndSet(DUMMY_RUNNABLE).run();
InternalResourceManager.setResourceObserver(null);
closeCache();
CACHE_DEFINITION.set(null);
DISK_DIR.set(null);
InitialImageOperation.slowImageProcessing = 0;
});
}
}
@SuppressWarnings("unused")
private Object[] getDefinitionParameters() {
return new Object[] {
/* DistributedAckRegionDUnitTest */
new Object[] {"HEAP", "DISTRIBUTED_ACK"},
/* DistributedAckRegionOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_ACK"},
/* DistributedAckRegionCCEDUnitTest CONSERVE_SOCKETS="false" */
new Object[] {"HEAP", "DISTRIBUTED_ACK_CCE"},
/* DistributedAckRegionCCEOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_ACK_CCE"},
/* DistributedAckRegionCompressionDUnitTest */
new Object[] {"HEAP", "DISTRIBUTED_ACK_COMPRESSION"},
/* DistributedAckOverflowRegionCCEDUnitTest */
new Object[] {"HEAP", "DISTRIBUTED_ACK_EVICTION_OVERFLOW_CCE"},
/* DistributedAckOverflowRegionCCEOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_ACK_EVICTION_OVERFLOW_CCE"},
/* DistributedNoAckRegionDUnitTest */
new Object[] {"HEAP", "DISTRIBUTED_NO_ACK"},
/* DistributedNoAckRegionOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_NO_ACK"},
/* DistributedNoAckRegionCCEDUnitTest */
new Object[] {"HEAP", "DISTRIBUTED_NO_ACK_CCE"},
/* DistributedNoAckRegionCCEOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_NO_ACK_CCE"},
/* DiskDistributedNoAckAsyncRegionDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_ASYNC"},
/* DistributedAckPersistentRegionCCEDUnitTest */
new Object[] {"HEAP", "DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_CCE"},
/* DistributedAckPersistentRegionCCEOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_CCE"},
/* DiskDistributedNoAckAsyncOverflowRegionDUnitTest */
new Object[] {"OFF_HEAP",
"DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_EVICTION_OVERFLOW_ASYNC"},
/* DiskDistributedNoAckSyncOverflowRegionDUnitTest */
new Object[] {"OFF_HEAP", "DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_EVICTION_OVERFLOW_SYNC"},
/* GlobalRegionDUnitTest */
new Object[] {"HEAP", "GLOBAL"},
/* GlobalRegionOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "GLOBAL"},
/* GlobalRegionCCEDUnitTest CONSERVE_SOCKETS="false" */
new Object[] {"HEAP", "GLOBAL_CCE"},
/* GlobalRegionCCEOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "GLOBAL_CCE"},
/* PartitionedRegionDUnitTest */
new Object[] {"HEAP", "PARTITIONED_REGION"},
/* PartitionedRegionOffHeapDUnitTest */
new Object[] {"OFF_HEAP", "PARTITIONED_REGION"},
/* PartitionedRegionCompressionDUnitTest */
new Object[] {"HEAP", "PARTITIONED_REGION_COMPRESSION"},
};
}
@Test
@Parameters(method = "getDefinitionParameters")
@TestCaseName("{method}_{0}_{1}")
public void testNBRegionDestructionDuringGetInitialImage(CacheDefinition cacheDefinition,
RegionDefinition regionDefinition) throws Exception {
for (VM vm : asList(vm0, vm1, vm2, vm3)) {
vm.invoke(() -> CACHE_DEFINITION.set(cacheDefinition));
}
assumeThat(regionDefinition.supportsReplication()).isTrue();
vm0.invoke(() -> {
cacheDefinition.createCache();
createRootRegion(getCache().createRegionFactory()
.setDataPolicy(DataPolicy.EMPTY)
.setScope(Scope.DISTRIBUTED_ACK));
regionDefinition.createRegionFactory(getCache())
.setOffHeap(cacheDefinition.isOffHeap())
.create(regionName);
// reset slow
InitialImageOperation.slowImageProcessing = 0;
Region<Integer, byte[]> region = getCache().getRegion(regionName);
for (int i = 0; i < ENTRIES_COUNT; i++) {
region.put(i, values[i]);
}
assertThat(region.keySet().size()).isEqualTo(ENTRIES_COUNT);
});
// start asynchronous process that does updates to the data
AsyncInvocation<Void> updateDataInVM0 = vm0.invokeAsync(() -> {
await().until(() -> getCache().getCachePerfStats().getGetInitialImagesCompleted() < 1);
Region<Object, Object> region = getCache().getRegion(regionName);
// wait for profile of getInitialImage cache to show up
awaitRegionProfiles(region, 1);
// since we want to force a GII while updates are flying, make sure the other VM gets its
// CreateRegionResponse and starts its GII before falling into the update loop
doOperationsForDestroy(region);
region.destroyRegion();
flushRootRegion();
});
addIgnoredException(RegionDestroyedException.class);
// in the meantime, do the get initial image in vm2
AsyncInvocation<Void> getInitialImageInVM2 = vm2.invokeAsync(() -> {
if (!regionDefinition.getScope().isGlobal()) {
InitialImageOperation.slowImageProcessing = 200;
}
cacheXmlRule.beginCacheXml();
// root region must be DACK because its used to sync up async subregions
createRootRegion(cacheXmlRule.getCache().createRegionFactory()
.setDataPolicy(DataPolicy.NORMAL)
.setScope(Scope.DISTRIBUTED_ACK)
.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)));
regionDefinition.createRegionFactory(cacheXmlRule.getCache())
.setDataPolicy(DataPolicy.REPLICATE)
.setOffHeap(cacheDefinition.isOffHeap())
.create(regionName);
cacheXmlRule.finishCacheXml(regionName);
// reset slow
InitialImageOperation.slowImageProcessing = 0;
// if global scope, the region doesn't get destroyed until after region creation
await().until(
() -> getCache().getRegion(regionName) == null || regionDefinition.getScope().isGlobal());
});
if (regionDefinition.getScope().isGlobal()) {
// wait for nonblocking operations to complete
updateDataInVM0.await();
vm2.invoke(() -> InitialImageOperation.slowImageProcessing = 0);
}
// wait for GII to complete
getInitialImageInVM2.await();
if (regionDefinition.getScope().isGlobal()) {
// wait for nonblocking operations to complete
updateDataInVM0.await();
}
}
@Ignore("TODO: test is disabled for 51542")
@Test
@Parameters(method = "getDefinitionParameters")
@TestCaseName("{method}_{0}_{1}")
public void testNBRegionInvalidationDuringGetInitialImage(CacheDefinition cacheDefinition,
RegionDefinition regionDefinition)
throws Exception {
for (VM vm : asList(vm0, vm1, vm2, vm3)) {
vm.invoke(() -> CACHE_DEFINITION.set(cacheDefinition));
}
assumeThat(regionDefinition.supportsReplication()).isTrue();
assumeThat(regionDefinition.getScope().isDistributedNoAck()).isFalse();
vm0.invoke(() -> {
cacheDefinition.createCache();
// root region must be DACK because its used to sync up async subregions
createRootRegion(getCache().createRegionFactory()
.setDataPolicy(DataPolicy.NORMAL)
.setScope(Scope.DISTRIBUTED_ACK));
regionDefinition.createRegionFactory(getCache())
.setDataPolicy(DataPolicy.REPLICATE)
.setOffHeap(cacheDefinition.isOffHeap())
.create(regionName);
// reset slow
InitialImageOperation.slowImageProcessing = 0;
Region<Integer, byte[]> region = getCache().getRegion(regionName);
for (int i = 0; i < ENTRIES_COUNT; i++) {
region.put(i, values[i]);
}
assertThat(region.keySet().size()).isEqualTo(ENTRIES_COUNT);
});
// start asynchronous process that does updates to the data
AsyncInvocation<Void> updateDataInVM0 = vm0.invokeAsync("Do Nonblocking Operations", () -> {
Region<Object, Object> region = getCache().getRegion(regionName);
// wait for profile of getInitialImage cache to show up
awaitRegionProfiles(region, 1);
doOperationsForInvalidate(region);
flushRootRegion();
});
// in the meantime, do the get initial image in vm2
// slow down image processing to make it more likely to get async updates
if (!regionDefinition.getScope().isGlobal()) {
vm2.invoke("Set slow image processing", () -> {
// make sure the cache is set up before turning on slow image processing
getRootRegion();
// if this is a no_ack test, then we need to slow down more because of the pauses in the
// nonblocking operations
InitialImageOperation.slowImageProcessing = 100;
});
}
AsyncInvocation<Void> getInitialImageInVM2 = vm2.invokeAsync("Create Mirrored Region", () -> {
cacheXmlRule.beginCacheXml();
// root region must be DACK because its used to sync up async subregions
createRootRegion(cacheXmlRule.getCache().createRegionFactory()
.setDataPolicy(DataPolicy.NORMAL)
.setScope(Scope.DISTRIBUTED_ACK)
.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)));
regionDefinition.createRegionFactory(cacheXmlRule.getCache())
.setDataPolicy(DataPolicy.REPLICATE)
.setOffHeap(cacheDefinition.isOffHeap())
.create(regionName);
cacheXmlRule.finishCacheXml(regionName);
// reset slow
InitialImageOperation.slowImageProcessing = 0;
});
if (!regionDefinition.getScope().isGlobal()) {
// wait for nonblocking operations to complete
updateDataInVM0.await();
vm2.invoke(() -> InitialImageOperation.slowImageProcessing = 0);
}
// wait for GII to complete
getInitialImageInVM2.await();
long giiCompletedMillis = System.currentTimeMillis();
if (regionDefinition.getScope().isGlobal()) {
// wait for nonblocking operations to complete
updateDataInVM0.await();
}
// Locally destroy the region in vm0 so we know that they are not found by a netSearch
vm0.invoke(() -> getCache().getRegion(regionName).localDestroyRegion());
// invoke repeating so noack regions wait for all updates to get processed
vm2.invoke(() -> {
await().untilAsserted(() -> {
Region<Object, Object> region = getCache().getRegion(regionName);
// expected entry count (subtract entries destroyed)
int entryCount = ENTRIES_COUNT - ENTRIES_COUNT / 6;
assertThat(region.entrySet(false)).hasSize(entryCount);
});
});
vm2.invoke(() -> {
Region<Integer, Object> region = getCache().getRegion(regionName);
// expected entry count (subtract entries destroyed)
int entryCount = ENTRIES_COUNT - ENTRIES_COUNT / 6;
assertThat(region.entrySet(false).size()).isEqualTo(entryCount);
// determine how many entries were updated before getInitialImage was complete
int entriesUpdatedConcurrently = 0;
entriesUpdatedConcurrently =
validateEntriesForInvalidate(giiCompletedMillis, region, entriesUpdatedConcurrently);
// Looks like some random expectations that will always be a hit/miss.
// make sure at least some of them were concurrent
if (regionDefinition.getScope().isGlobal()) {
assertThat(entriesUpdatedConcurrently < 300)
.as("Too many concurrent updates when expected to block: " + entriesUpdatedConcurrently)
.isTrue();
} else {
assertThat(entriesUpdatedConcurrently >= 30)
.as("Not enough updates concurrent with getInitialImage occurred to my liking. " +
entriesUpdatedConcurrently + " entries out of " + entryCount +
" were updated concurrently with getInitialImage, and I'd expect at least 50 or so")
.isTrue();
}
});
}
private void doOperationsForDestroy(Region<Object, Object> region) {
// operate on every odd entry with different value, alternating between updates, invalidates,
// and destroys. These operations are likely to be nonblocking if a sufficient number of updates
// get through before the get initial image is complete.
for (int i = 1; i < 301; i += 2) {
Object key = i;
switch (i % 6) {
case 1: // UPDATE
// use the current timestamp so we know when it happened we could have used last
// modification timestamps, but this works without enabling statistics
region.put(key, System.currentTimeMillis());
break;
case 3: // INVALIDATE
region.invalidate(key);
if (region.getAttributes().getScope().isDistributedAck()) {
assertThat(region.get(key)).isNull();
}
break;
case 5: // DESTROY
region.destroy(key);
if (region.getAttributes().getScope().isDistributedAck()) {
assertThat(region.get(key)).isNull();
}
break;
default:
fail("Unknown region operation: " + i);
break;
}
}
}
private void doOperationsForInvalidate(Region<Object, Object> region) {
// operate on every odd entry with different value, alternating between updates, invalidates,
// and destroys. These operations are likely to be nonblocking if a sufficient number of
// updates get through before the get initial image is complete.
for (int i = 1; i < ENTRIES_COUNT; i += 2) {
// at magical number 301, do a region invalidation, then continue as before
if (i == 301) {
// wait for previous updates to be processed
flushIfNecessary(region);
region.invalidateRegion();
flushIfNecessary(region);
}
Object key = i;
switch (i % 6) {
case 1: // UPDATE
// use the current timestamp so we know when it happened we could have used last
// modification timestamps, but this works without enabling statistics
region.put(key, System.currentTimeMillis());
break;
case 3: // INVALIDATE
region.invalidate(key);
if (region.getAttributes().getScope().isDistributedAck()) {
// do a nonblocking netSearch
assertThat(region.get(key)).isNull();
}
break;
case 5: // DESTROY
region.destroy(key);
if (region.getAttributes().getScope().isDistributedAck()) {
// do a nonblocking netSearch
assertThat(region.get(key)).isNull();
}
break;
default:
fail("Unknown region operation: " + i);
break;
}
}
}
private int validateEntriesForInvalidate(long giiCompletedMillis, Region<Integer, Object> region,
int entriesUpdatedConcurrently) {
for (int i = 0; i < ENTRIES_COUNT; i++) {
Entry<Integer, Object> entry = region.getEntry(i);
if (i < 301) {
if (i % 6 == 5) {
// destroyed
assertThat(entry).as("Entry for #" + i).isNull();
} else {
assertThat(entry).as("Entry for #" + i).isNotNull();
assertThat(entry.getValue()).isNull();
}
continue;
}
switch (i % 6) {
// even keys are originals
case 0:
case 2:
case 4:
assertThat(entry).as("Entry for #" + i).isNotNull();
assertThat(entry.getValue()).isNull();
break;
case 1: // updated
assertThat(entry).as("Entry for #" + i).isNotNull();
assertThat(entry.getValue()).isNotNull().isInstanceOf(Long.class);
long timestamp = (long) entry.getValue();
if (timestamp < giiCompletedMillis) {
entriesUpdatedConcurrently++;
}
break;
case 3: // invalidated
assertThat(entry).as("Entry for #" + i).isNotNull();
assertThat(entry.getValue()).isNull();
break;
case 5: // destroyed
assertThat(entry).as("Entry for #" + i).isNull();
break;
default:
fail("unexpected modulus result: " + i % 6);
break;
}
}
return entriesUpdatedConcurrently;
}
private String getUniqueName() {
return getClass().getSimpleName() + "_" + testName.getMethodName();
}
private InternalCache getCache() {
return CACHE.get();
}
private void closeCache() {
CACHE.getAndSet(DUMMY_CACHE).close();
}
private static File[] getDiskDirs() {
return new File[] {DISK_DIR.get()};
}
private void createRootRegion(RegionFactory regionFactory) {
regionFactory.create(rootRegionName);
}
private Region<String, String> getRootRegion() {
return getCache().getRegion(rootRegionName);
}
private void flushRootRegion() {
// now do a put and our DACK root region which will not complete
// until processed on otherside which means everything done before this
// point has been processed
Region<String, String> rootRegion = getRootRegion();
if (rootRegion != null) {
rootRegion.put("DONE", "FLUSH_OPS");
}
}
/**
* Make sure all messages done on region r have been processed on the remote side.
*/
private void flushIfNecessary(Region r) {
// Only needed for no-ack regions
}
/**
* awaitRegionProfiles(region, 1);
*/
private void awaitRegionProfiles(Region<?, ?> region, int expectedProfileCount) {
CacheDistributionAdvisor regionAdvisor =
((CacheDistributionAdvisee) region).getCacheDistributionAdvisor();
await("Awaiting '" + expectedProfileCount + "' profiles for '" + region + "'")
.untilAsserted(
() -> assertThat(regionAdvisor.adviseReplicates()).hasSize(expectedProfileCount));
}
private enum CacheDefinition {
HEAP(() -> {
return getDistributedSystemProperties();
}, () -> {
// nothing
}),
OFF_HEAP(() -> {
Properties properties = getDistributedSystemProperties();
properties.setProperty(OFF_HEAP_MEMORY_SIZE, "10m");
return properties;
}, () -> {
OffHeapTestUtil.checkOrphans(CACHE.get());
});
private final Supplier<Properties> configSupplier;
private final Runnable tearDown;
CacheDefinition(Supplier<Properties> configSupplier, Runnable tearDown) {
this.configSupplier = configSupplier;
this.tearDown = tearDown;
}
private void createCache() {
TEAR_DOWN.set(tearDown);
CACHE.set((InternalCache) new CacheFactory(configSupplier.get()).create());
}
private boolean isOffHeap() {
return OFF_HEAP == this;
}
}
private enum RegionDefinition {
DISTRIBUTED_ACK(Scope.DISTRIBUTED_ACK, DataPolicy.PRELOADED, NO_EVICTION, NULL_COMPRESSOR,
NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_DISABLED, NO_DISK, SUPPORTS_REPLICATION,
SUPPORTS_TRANSACTIONS),
DISTRIBUTED_ACK_CCE(Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, NO_EVICTION, NULL_COMPRESSOR,
NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_ENABLED, NO_DISK, SUPPORTS_REPLICATION,
SUPPORTS_TRANSACTIONS),
DISTRIBUTED_ACK_COMPRESSION(Scope.DISTRIBUTED_ACK, DataPolicy.PRELOADED, NO_EVICTION,
SNAPPY_COMPRESSOR, NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_DISABLED, NO_DISK,
SUPPORTS_REPLICATION, SUPPORTS_TRANSACTIONS),
DISTRIBUTED_ACK_EVICTION_OVERFLOW_CCE(Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE,
EVICTION_OVERFLOW_TO_DISK, NULL_COMPRESSOR, NULL_DISK_STORE_NAME,
CONCURRENCY_CHECKS_ENABLED, NO_DISK, SUPPORTS_REPLICATION, SUPPORTS_TRANSACTIONS),
DISTRIBUTED_NO_ACK(Scope.DISTRIBUTED_NO_ACK, DataPolicy.PRELOADED, NO_EVICTION, NULL_COMPRESSOR,
NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_DISABLED, NO_DISK, SUPPORTS_REPLICATION,
SUPPORTS_TRANSACTIONS),
DISTRIBUTED_NO_ACK_CCE(Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, NO_EVICTION,
NULL_COMPRESSOR, NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_ENABLED, NO_DISK,
SUPPORTS_REPLICATION, SUPPORTS_TRANSACTIONS),
DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_ASYNC(Scope.DISTRIBUTED_NO_ACK,
DataPolicy.PERSISTENT_REPLICATE, NO_EVICTION, NULL_COMPRESSOR, NULL_DISK_STORE_NAME,
CONCURRENCY_CHECKS_ENABLED, NO_DISK, SUPPORTS_REPLICATION, SUPPORTS_TRANSACTIONS),
DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_CCE(Scope.DISTRIBUTED_NO_ACK,
DataPolicy.PERSISTENT_REPLICATE, NO_EVICTION, NULL_COMPRESSOR, NULL_DISK_STORE_NAME,
CONCURRENCY_CHECKS_ENABLED, NO_DISK, SUPPORTS_REPLICATION, SUPPORTS_TRANSACTIONS),
DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_EVICTION_OVERFLOW_ASYNC(Scope.DISTRIBUTED_NO_ACK,
DataPolicy.PERSISTENT_REPLICATE, EVICTION_OVERFLOW_TO_DISK, NULL_COMPRESSOR,
DISK_STORE_NAME, CONCURRENCY_CHECKS_ENABLED, DISK_ASYNCHRONOUS, SUPPORTS_REPLICATION,
SUPPORTS_TRANSACTIONS),
DISTRIBUTED_NO_ACK_PERSISTENT_REPLICATE_EVICTION_OVERFLOW_SYNC(Scope.DISTRIBUTED_NO_ACK,
DataPolicy.PERSISTENT_REPLICATE, EVICTION_OVERFLOW_TO_DISK, NULL_COMPRESSOR,
DISK_STORE_NAME, CONCURRENCY_CHECKS_ENABLED, DISK_SYNCHRONOUS, SUPPORTS_REPLICATION,
SUPPORTS_TRANSACTIONS),
GLOBAL(Scope.GLOBAL, DataPolicy.PRELOADED, NO_EVICTION, NULL_COMPRESSOR, NULL_DISK_STORE_NAME,
CONCURRENCY_CHECKS_DISABLED, NO_DISK, SUPPORTS_REPLICATION, SUPPORTS_TRANSACTIONS),
GLOBAL_CCE(Scope.GLOBAL, DataPolicy.REPLICATE, NO_EVICTION, NULL_COMPRESSOR,
NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_ENABLED, NO_DISK, SUPPORTS_REPLICATION,
SUPPORTS_TRANSACTIONS),
PARTITIONED_REGION(Scope.DISTRIBUTED_ACK, DataPolicy.PRELOADED, NO_EVICTION, NULL_COMPRESSOR,
NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_DISABLED, NO_DISK, NO_REPLICATION,
NO_TRANSACTIONS),
PARTITIONED_REGION_COMPRESSION(Scope.DISTRIBUTED_ACK, DataPolicy.PRELOADED, NO_EVICTION,
SNAPPY_COMPRESSOR, NULL_DISK_STORE_NAME, CONCURRENCY_CHECKS_DISABLED, NO_DISK,
NO_REPLICATION, NO_TRANSACTIONS);
private final Scope scope;
private final DataPolicy dataPolicy;
private final EvictionAttributes evictionAttributes;
private final String diskStoreName;
private final boolean concurrencyChecksEnabled;
private final boolean diskSynchronous;
private final boolean supportsReplication;
private final boolean supportsTransactions;
private final Compressor compressor;
RegionDefinition(Scope scope,
DataPolicy dataPolicy,
EvictionAttributes evictionAttributes,
Compressor compressor,
String diskStoreName,
boolean concurrencyChecksEnabled,
boolean diskSynchronous,
boolean supportsReplication,
boolean supportsTransactions) {
this.scope = scope;
this.dataPolicy = dataPolicy;
this.evictionAttributes = evictionAttributes;
this.compressor = compressor;
this.diskStoreName = diskStoreName;
this.concurrencyChecksEnabled = concurrencyChecksEnabled;
this.diskSynchronous = diskSynchronous;
this.supportsReplication = supportsReplication;
this.supportsTransactions = supportsTransactions;
}
private <K, V> RegionFactory<K, V> createRegionFactory(Cache cache) {
RegionFactory<K, V> regionFactory = cache.<K, V>createRegionFactory()
.setScope(scope)
.setDataPolicy(dataPolicy)
.setCompressor(compressor)
.setConcurrencyChecksEnabled(concurrencyChecksEnabled)
.setEvictionAttributes(evictionAttributes);
if (diskStoreName != null) {
cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs())
.setQueueSize(0)
.setTimeInterval(1000)
.create(diskStoreName);
regionFactory
.setDiskStoreName(diskStoreName)
.setDiskSynchronous(diskSynchronous);
}
return regionFactory;
}
private Scope getScope() {
return scope;
}
private DataPolicy getDataPolicy() {
return dataPolicy;
}
private boolean hasCompression() {
return compressor != null;
}
private boolean hasConcurrencyChecksEnabled() {
return concurrencyChecksEnabled;
}
private boolean supportsReplication() {
return supportsReplication;
}
private boolean supportsTransactions() {
return supportsTransactions;
}
}
private static final EvictionAttributes EVICTION_OVERFLOW_TO_DISK =
EvictionAttributes.createLRUEntryAttributes(5, EvictionAction.OVERFLOW_TO_DISK);
private static final EvictionAttributes NO_EVICTION = new EvictionAttributesImpl();
private static final Compressor SNAPPY_COMPRESSOR = new SnappyCompressor();
private static final Compressor NULL_COMPRESSOR = null;
private static final String DISK_STORE_NAME = "diskStore";
private static final String NULL_DISK_STORE_NAME = null;
private static final boolean CONCURRENCY_CHECKS_ENABLED = true;
private static final boolean CONCURRENCY_CHECKS_DISABLED = false;
private static final boolean DISK_SYNCHRONOUS = true;
private static final boolean DISK_ASYNCHRONOUS = false;
private static final boolean NO_DISK = false;
private static final boolean SUPPORTS_REPLICATION = true;
private static final boolean NO_REPLICATION = false;
private static final boolean SUPPORTS_TRANSACTIONS = true;
private static final boolean NO_TRANSACTIONS = false;
}