blob: bb79369e2c3f4c388ad91e99683ebe1477c13620 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.asyncqueue;
import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.cache.RegionShortcut.REPLICATE;
import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.runners.Parameterized.UseParametersRunnerFactory;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.AEQTest;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
@Category(AEQTest.class)
@RunWith(Parameterized.class)
@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
@SuppressWarnings("serial")
public class AsyncEventListenerDistributedTest implements Serializable {
@Parameter
public int dispatcherThreadCount;
@Parameters(name = "dispatcherThreadCount={0}")
public static Iterable<Integer> dispatcherThreadCounts() {
return Arrays.asList(1, 3);
}
@Rule
public DistributedRule distributedRule = new DistributedRule();
@Rule
public CacheRule cacheRule = new CacheRule();
@Rule
public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
private String partitionedRegionName;
private String replicateRegionName;
private String asyncEventQueueId;
private VM vm0;
private VM vm1;
private VM vm2;
@Before
public void setUp() throws Exception {
vm0 = getVM(0);
vm1 = getVM(1);
vm2 = getVM(2);
String className = getClass().getSimpleName();
partitionedRegionName = className + "_PR";
replicateRegionName = className + "_RR";
asyncEventQueueId = className;
}
/**
* Override as needed to add to the configuration, such as off-heap-memory-size.
*/
protected Properties getDistributedSystemProperties() {
return new Properties();
}
/**
* Override as needed to add to the configuration, such as regionFactory.setOffHeap(boolean).
*/
protected RegionFactory<?, ?> configureRegion(RegionFactory<?, ?> regionFactory) {
return regionFactory;
}
@Test // serial, ReplicateRegion
public void testSerialAsyncEventQueueSize() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(this::waitForDispatcherToPause);
vm1.invoke(this::waitForDispatcherToPause);
vm2.invoke(this::waitForDispatcherToPause);
vm0.invoke(() -> doPuts(replicateRegionName, 1000));
assertThat(vm0.invoke(() -> getAsyncEventQueue().size())).isEqualTo(1000);
assertThat(vm1.invoke(() -> getAsyncEventQueue().size())).isEqualTo(1000);
assertThat(vm2.invoke(() -> getAsyncEventQueue().size())).isEqualTo(1000);
}
@Test // serial, ReplicateRegion
public void testSerialAsyncEventQueueStopStart() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm0.invoke(() -> getInternalGatewaySender().stop());
vm1.invoke(() -> getInternalGatewaySender().stop());
vm0.invoke(() -> doPuts(replicateRegionName, 10));
vm0.invoke(() -> getInternalGatewaySender().start());
vm1.invoke(() -> getInternalGatewaySender().start());
assertThat(vm0.invoke(() -> getAsyncEventQueue().size())).isEqualTo(0);
assertThat(vm1.invoke(() -> getAsyncEventQueue().size())).isEqualTo(0);
}
@Test // serial, ReplicateRegion
public void testReplicatedSerialAsyncEventQueue() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm0.invoke(() -> doPuts(replicateRegionName, 1000));
// primary sender
vm0.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(1000));
// secondaries
vm1.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
vm2.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
}
@Test // serial, conflation, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(this::waitForDispatcherToPause);
vm1.invoke(this::waitForDispatcherToPause);
vm2.invoke(this::waitForDispatcherToPause);
Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
keyValues.put(i, i);
}
vm0.invoke(() -> {
putGivenKeyValue(replicateRegionName, keyValues);
waitForAsyncEventQueueSize(keyValues.size());
});
Map<Integer, String> updateKeyValues = new HashMap<>();
for (int i = 0; i < 500; i++) {
updateKeyValues.put(i, i + "_updated");
}
// Put the update events and check the queue size.
// There should be no conflation with the previous create events.
vm0.invoke(() -> {
putGivenKeyValue(replicateRegionName, updateKeyValues);
waitForAsyncEventQueueSize(keyValues.size() + updateKeyValues.size());
});
// Put the update events again and check the queue size.
// There should be conflation with the previous update events.
vm0.invoke(() -> {
putGivenKeyValue(replicateRegionName, updateKeyValues);
waitForAsyncEventQueueSize(keyValues.size() + updateKeyValues.size());
});
vm0.invoke(() -> getInternalGatewaySender().resume());
vm1.invoke(() -> getInternalGatewaySender().resume());
vm2.invoke(() -> getInternalGatewaySender().resume());
// primary sender
vm0.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(1000));
// secondaries
vm1.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
vm2.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
}
@Test // serial, persistent, conflation, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm0.invoke(() -> doPuts(replicateRegionName, 1000));
// primary sender
vm0.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(1000));
// secondaries
vm1.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
vm2.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
}
@Test // serial, persistent, ReplicateRegion, IntegrationTest
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart() {
vm0.invoke(() -> {
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
createReplicateRegion(replicateRegionName, asyncEventQueueId);
// pause async channel and then do the puts
getInternalGatewaySender().pause();
waitForDispatcherToPause();
doPuts(replicateRegionName, 1000);
// kill vm0 and rebuild
getCache().close();
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
createReplicateRegion(replicateRegionName, asyncEventQueueId);
// primary sender
waitForAsyncEventListenerWithEventsMapSize(1000);
});
}
/**
* There are 3 VMs in the site and the VM with primary sender is shut down.
*
* <p>
* TODO: fix this test
*/
@Ignore("TODO: Disabled for 52351")
@Test // serial, persistent, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm0.invoke(() -> {
createReplicateRegion(replicateRegionName, asyncEventQueueId);
addClosingCacheListener(replicateRegionName, 900);
});
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(this::waitForSenderToBecomePrimary);
vm1.invoke(() -> doPuts(replicateRegionName, 2000));
vm1.invoke(this::waitForRegionQueuesToEmpty);
vm2.invoke(this::waitForRegionQueuesToEmpty);
int vm1size = vm1.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
int vm2size = vm2.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
// verify that there is no event loss (fails)
assertThat(vm1size + vm2size).isGreaterThanOrEqualTo(2000);
}
@Test
// See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
// and the processor started dispatching events before the actual region was available.
public void replicatedRegionWithPersistentSerialAsyncEventQueueAndConflationEnabledShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue()
throws IOException {
// Custom Log File to manually search for exceptions.
File customLogFile = temporaryFolder.newFile("memberLog.log");
Properties dsProperties = getDistributedSystemProperties();
dsProperties.setProperty(ConfigurationProperties.LOG_FILE, customLogFile.getAbsolutePath());
// Create Region, AsyncEventQueue and Insert Some Entries.
vm0.invoke(() -> {
createCache();
// Large batch time interval and low batch size so no events are processed before the restart.
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 10,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, 120000);
createReplicateRegion(replicateRegionName, asyncEventQueueId);
doPuts(replicateRegionName, 5);
waitForAsyncEventQueueSize(5);
});
vm0.invoke(() -> {
// Restart the cache.
getCache().close();
cacheRule.createCache(dsProperties);
// Recover the queue from disk, reduce thresholds so processing starts right away.
SpyAsyncEventListener spyAsyncEventListener = new SpyAsyncEventListener();
createPersistentAsyncEventQueue(asyncEventQueueId, spyAsyncEventListener, true, 5,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
waitForSenderToBecomePrimary();
// Wait for the processors to start.
await().until(() -> {
Set<Thread> threads = Thread.getAllStackTraces().keySet();
return threads
.stream()
.filter(t -> t.getName().contains("Processor for GatewaySender_AsyncEventQueue"))
.allMatch(Thread::isAlive);
});
// Create the region, processing will continue and no NPE should be thrown anymore.
createReplicateRegion(replicateRegionName, asyncEventQueueId);
waitForRegionQueuesToEmpty();
assertThat(spyAsyncEventListener.getEventsMap().size()).isEqualTo(5);
});
Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line)
.as("Dispatcher shouldn't have thrown any errors while processing batches")
.doesNotContain("An Exception occurred. The dispatcher will continue.")
.doesNotContain("java.lang.NullPointerException"));
}
@Test // serial, PartitionedRegion
public void testPartitionedSerialAsyncEventQueue() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> doPuts(partitionedRegionName, 500));
vm1.invoke(() -> doPutsFrom(partitionedRegionName, 500, 1000));
// primary sender
vm0.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(1000));
// secondaries
vm1.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
vm2.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
}
@Test // serial, conflation, PartitionedRegion
public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(this::waitForDispatcherToPause);
vm1.invoke(this::waitForDispatcherToPause);
vm2.invoke(this::waitForDispatcherToPause);
Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
keyValues.put(i, i);
}
vm0.invoke(() -> {
putGivenKeyValue(partitionedRegionName, keyValues);
waitForAsyncEventQueueSize(keyValues.size());
});
Map<Integer, String> updateKeyValues = new HashMap<>();
for (int i = 0; i < 500; i++) {
updateKeyValues.put(i, i + "_updated");
}
// Put the update events and check the queue size.
// There should be no conflation with the previous create events.
vm1.invoke(() -> {
putGivenKeyValue(partitionedRegionName, updateKeyValues);
waitForAsyncEventQueueSize(keyValues.size() + updateKeyValues.size());
});
// Put the update events again and check the queue size.
// There should be conflation with the previous update events.
vm1.invoke(() -> {
putGivenKeyValue(partitionedRegionName, updateKeyValues);
waitForAsyncEventQueueSize(keyValues.size() + updateKeyValues.size());
});
vm0.invoke(() -> getInternalGatewaySender().resume());
vm1.invoke(() -> getInternalGatewaySender().resume());
vm2.invoke(() -> getInternalGatewaySender().resume());
// primary sender
vm0.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(1000));
// secondaries
vm1.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
vm2.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
}
/**
* Test configuration::
*
* Region: Partitioned WAN: Serial Region persistence enabled: false Async channel persistence
* enabled: true
*
* No VM is restarted.
*/
@Test // persistent, PartitionedRegion
public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled() {
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);
vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> doPuts(partitionedRegionName, 500));
vm1.invoke(() -> doPutsFrom(partitionedRegionName, 500, 1000)); // hangs?
// primary sender
vm0.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(1000));
// secondaries
vm1.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
vm2.invoke(() -> waitForAsyncEventListenerWithEventsMapSize(0));
}
@Test // persistent, PartitionedRegion, IntegrationTest
public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled_Restart() {
vm0.invoke(() -> {
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
// pause async channel and then do the puts
getInternalGatewaySender().pause();
waitForDispatcherToPause();
doPuts(partitionedRegionName, 1000);
getCache().close();
createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
// primary sender
waitForAsyncEventListenerWithEventsMapSize(1000);
});
}
private InternalCache getCache() {
return cacheRule.getOrCreateCache(getDistributedSystemProperties());
}
private void createCache() {
cacheRule.createCache(getDistributedSystemProperties());
}
private void createPartitionedRegion(String regionName,
String asyncEventQueueId,
int redundantCopies,
int totalNumBuckets) {
assertThat(regionName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
PartitionAttributesFactory<?, ?> partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.setRedundantCopies(redundantCopies);
partitionAttributesFactory.setTotalNumBuckets(totalNumBuckets);
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(PARTITION);
regionFactory.addAsyncEventQueueId(asyncEventQueueId);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
configureRegion(regionFactory).create(regionName);
}
private void createReplicateRegion(String regionName, String asyncEventQueueId) {
assertThat(regionName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(REPLICATE);
regionFactory.addAsyncEventQueueId(asyncEventQueueId);
configureRegion(regionFactory).create(regionName);
}
private void createDiskStore(String diskStoreName, String asyncEventQueueId) {
assertThat(diskStoreName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
File directory = createDirectory(createDiskStoreName(asyncEventQueueId));
DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
diskStoreFactory.setDiskDirs(new File[] {directory});
diskStoreFactory.create(diskStoreName);
}
private File createDirectory(String name) {
assertThat(name).isNotEmpty();
File directory = new File(temporaryFolder.getRoot(), name);
if (!directory.exists()) {
try {
return temporaryFolder.newFolder(name);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return directory;
}
private String createDiskStoreName(String asyncEventQueueId) {
assertThat(asyncEventQueueId).isNotEmpty();
return asyncEventQueueId + "_disk_" + getCurrentVMNum();
}
private void createAsyncEventQueue(String asyncEventQueueId,
AsyncEventListener asyncEventListener,
boolean isBatchConflationEnabled,
int batchSize,
int dispatcherThreads,
int maximumQueueMemory) {
assertThat(asyncEventQueueId).isNotEmpty();
assertThat(asyncEventListener).isNotNull();
AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory();
asyncEventQueueFactory.setBatchConflationEnabled(isBatchConflationEnabled);
asyncEventQueueFactory.setBatchSize(batchSize);
asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads);
asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory);
asyncEventQueueFactory.setParallel(false);
asyncEventQueueFactory.setPersistent(false);
asyncEventQueueFactory.create(asyncEventQueueId, asyncEventListener);
}
private void createPersistentAsyncEventQueue(String asyncEventQueueId,
AsyncEventListener asyncEventListener,
boolean isBatchConflationEnabled,
int batchSize,
String diskStoreName,
boolean isDiskSynchronous,
int dispatcherThreads,
int maximumQueueMemory,
int batchTimeInterval) {
assertThat(asyncEventQueueId).isNotEmpty();
assertThat(asyncEventListener).isNotNull();
assertThat(diskStoreName).isNotEmpty();
createDiskStore(diskStoreName, asyncEventQueueId);
AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory();
asyncEventQueueFactory.setBatchConflationEnabled(isBatchConflationEnabled);
asyncEventQueueFactory.setBatchSize(batchSize);
asyncEventQueueFactory.setDiskStoreName(diskStoreName);
asyncEventQueueFactory.setDiskSynchronous(isDiskSynchronous);
asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads);
asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory);
asyncEventQueueFactory.setBatchTimeInterval(batchTimeInterval);
asyncEventQueueFactory.setParallel(false);
asyncEventQueueFactory.setPersistent(true);
asyncEventQueueFactory.create(asyncEventQueueId, asyncEventListener);
}
private void addClosingCacheListener(String regionName, int closeAfterCreateKey) {
assertThat(regionName).isNotEmpty();
Region<Integer, Integer> region = getCache().getRegion(regionName);
assertNotNull(region);
CacheListenerAdapter<Integer, Integer> cacheListener =
new CacheListenerAdapter<Integer, Integer>() {
@Override
public void afterCreate(EntryEvent event) {
if ((Integer) event.getKey() == closeAfterCreateKey) {
getCache().close();
}
}
};
region.getAttributesMutator().addCacheListener(cacheListener);
}
private void doPuts(String regionName, int numPuts) {
Region<Integer, Integer> region = getCache().getRegion(regionName);
for (int i = 0; i < numPuts; i++) {
region.put(i, i);
}
}
private void doPutsFrom(String regionName, int from, int numPuts) {
Region<Integer, Integer> region = getCache().getRegion(regionName);
for (int i = from; i < numPuts; i++) {
region.put(i, i);
}
}
private void putGivenKeyValue(String regionName, Map<?, ?> keyValues) {
Region<Object, Object> region = getCache().getRegion(regionName);
for (Object key : keyValues.keySet()) {
region.put(key, keyValues.get(key));
}
}
private void assertRegionQueuesAreEmpty(InternalGatewaySender gatewaySender) {
Set<RegionQueue> regionQueues = gatewaySender.getQueues();
for (RegionQueue queue : regionQueues) {
assertThat(queue.size()).isZero();
}
}
private int getTotalRegionQueueSize() {
InternalGatewaySender gatewaySender = getInternalGatewaySender();
int totalSize = 0;
for (RegionQueue regionQueue : gatewaySender.getQueues()) {
totalSize += regionQueue.size();
}
return totalSize;
}
@SuppressWarnings("unchecked")
private void waitForAsyncEventListenerWithEventsMapSize(int expectedSize) {
await().untilAsserted(
() -> assertThat(getSpyAsyncEventListener().getEventsMap()).hasSize(expectedSize));
}
private void waitForAsyncEventQueueSize(int expectedRegionQueueSize) {
await().untilAsserted(
() -> assertThat(getTotalRegionQueueSize()).isEqualTo(expectedRegionQueueSize));
}
private void waitForDispatcherToPause() {
getInternalGatewaySender().getEventProcessor().waitForDispatcherToPause();
}
private void waitForRegionQueuesToEmpty() {
await()
.untilAsserted(() -> assertRegionQueuesAreEmpty(getInternalGatewaySender()));
}
private void waitForSenderToBecomePrimary() {
InternalGatewaySender gatewaySender = getInternalGatewaySender();
await().untilAsserted(
() -> assertThat(gatewaySender.isPrimary()).as("gatewaySender: " + gatewaySender).isTrue());
}
private InternalGatewaySender getInternalGatewaySender() {
InternalGatewaySender gatewaySender = getInternalAsyncEventQueue().getSender();
assertThat(gatewaySender).isNotNull();
return gatewaySender;
}
private SpyAsyncEventListener getSpyAsyncEventListener() {
return (SpyAsyncEventListener) getAsyncEventListener();
}
private AsyncEventListener getAsyncEventListener() {
AsyncEventListener asyncEventListener = getAsyncEventQueue().getAsyncEventListener();
assertThat(asyncEventListener).isNotNull();
return asyncEventListener;
}
private InternalAsyncEventQueue getInternalAsyncEventQueue() {
return (InternalAsyncEventQueue) getAsyncEventQueue();
}
private AsyncEventQueue getAsyncEventQueue() {
AsyncEventQueue value = null;
Set<AsyncEventQueue> asyncEventQueues = getCache().getAsyncEventQueues();
for (AsyncEventQueue asyncEventQueue : asyncEventQueues) {
if (asyncEventQueueId.equals(asyncEventQueue.getId())) {
value = asyncEventQueue;
}
}
assertThat(value).isNotNull();
return value;
}
private static class SpyAsyncEventListener<K, V> implements AsyncEventListener, Declarable {
private final Map<K, V> eventsMap = new ConcurrentHashMap<>();
Map<K, V> getEventsMap() {
assertThat(eventsMap).isNotNull();
return eventsMap;
}
@Override
@SuppressWarnings("unchecked")
public boolean processEvents(List<AsyncEvent> events) {
for (AsyncEvent<K, V> event : events) {
eventsMap.put(event.getKey(), event.getDeserializedValue());
}
return true;
}
}
}