blob: 69e8a5fbad719070deaa7d52e00d956b2f712a7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.internal.cache.wan.asyncqueue;
import static java.util.Collections.synchronizedSet;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static org.apache.geode.cache.FixedPartitionAttributes.createFixedPartition;
import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.cache.RegionShortcut.REPLICATE;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.runners.Parameterized.UseParametersRunnerFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueStats;
import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.AEQTest;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
/**
* Extracted from {@link AsyncEventListenerDistributedTest}.
*/
@Category(AEQTest.class)
@RunWith(Parameterized.class)
@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
@SuppressWarnings("serial")
public class ParallelAsyncEventListenerDistributedTest implements Serializable {
@Parameter
public int dispatcherThreadCount;
@Parameters(name = "dispatcherThreadCount={0}")
public static Iterable<Integer> dispatcherThreadCounts() {
return Arrays.asList(1, 3);
}
@Rule
public DistributedRule distributedRule = new DistributedRule();
@Rule
public CacheRule cacheRule = new CacheRule();
@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
@Rule
public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
private String partitionedRegionName;
private String childPartitionedRegionName;
private String replicateRegionName;
private String asyncEventQueueId;
private VM vm0;
private VM vm1;
private VM vm2;
private VM vm3;
@Before
public void setUp() throws Exception {
vm0 = getVM(0);
vm1 = getVM(1);
vm2 = getVM(2);
vm3 = getVM(3);
String className = getClass().getSimpleName();
partitionedRegionName = className + "_PR";
childPartitionedRegionName = className + "_PR_child";
replicateRegionName = className + "_RR";
asyncEventQueueId = className;
}
/**
* Override as needed to add to the configuration, such as off-heap-memory-size.
*/
protected Properties getDistributedSystemProperties() {
return new Properties();
}
/**
* Override as needed to add to the configuration, such as regionFactory.setOffHeap(boolean).
*/
protected RegionFactory<?, ?> configureRegion(RegionFactory<?, ?> regionFactory) {
return regionFactory;
}
@Test // parallel, PartitionedRegion, FixedPartitionAttributes
public void testParallelAsyncEventQueueWithFixedPartition() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
String partition1 = "partition1";
String partition2 = "partition2";
List<String> allPartitions = Arrays.asList(partition1, partition2);
vm0.invoke(() -> createFixedPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16,
partition1, new SimpleFixedPartitionResolver(allPartitions)));
vm1.invoke(() -> createFixedPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16,
partition2, new SimpleFixedPartitionResolver(allPartitions)));
vm0.invoke(() -> doPuts(partitionedRegionName, 256));
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
int sizeInVM0 = vm0.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM1 = vm1.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
String description = "sizeInVM0=" + sizeInVM0 + ", sizeInVM1=" + sizeInVM1;
assertThat(sizeInVM0 + sizeInVM1).as(description).isEqualTo(256);
}
@Test // parallel, conflation, ReplicateRegion, IntegrationTest
public void testParallelAsyncEventQueueWithReplicatedRegion() {
vm0.invoke(() -> {
createCache();
createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true);
assertThatThrownBy(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId))
.isInstanceOf(AsyncEventQueueConfigurationException.class)
.hasMessageContaining("can not be used with replicated region");
});
}
@Test // parallel, PartitionedRegion
public void testParallelAsyncEventQueue() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm3.invoke(() -> createCache());
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm3.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm3.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> doPuts(partitionedRegionName, 256));
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> waitForAsyncQueueToEmpty());
vm3.invoke(() -> waitForAsyncQueueToEmpty());
int sizeInVM0 = vm0.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM1 = vm1.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM2 = vm2.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM3 = vm3.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
String description = "sizeInVM0=" + sizeInVM0 + ", sizeInVM1=" + sizeInVM1 + ", sizeInVM2="
+ sizeInVM2 + ", sizeInVM3=" + sizeInVM3;
assertThat(sizeInVM0 + sizeInVM1 + sizeInVM2 + sizeInVM3).as(description).isEqualTo(256);
}
@Test // parallel, PartitionedRegion
public void testParallelAsyncEventQueueSize() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm3.invoke(() -> createCache());
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm3.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm3.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
vm3.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm2.invoke(() -> waitForDispatcherToPause());
vm3.invoke(() -> waitForDispatcherToPause());
vm0.invoke(() -> doPuts(partitionedRegionName, 1000));
assertThat(vm0.invoke(() -> getAsyncEventQueue().size())).isEqualTo(1000);
assertThat(vm1.invoke(() -> getAsyncEventQueue().size())).isEqualTo(1000);
}
@Test // parallel, conflation, PartitionedRegion
public void testParallelAsyncEventQueueWithConflationEnabled() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm3.invoke(() -> createCache());
vm0.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm1.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm2.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm3.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm3.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
vm3.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm2.invoke(() -> waitForDispatcherToPause());
vm3.invoke(() -> waitForDispatcherToPause());
Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
keyValues.put(i, i);
}
vm0.invoke(() -> {
putGivenKeyValue(partitionedRegionName, keyValues);
validateParallelAsyncEventQueueSize(keyValues.size());
});
Map<Integer, String> updateKeyValues = new HashMap<>();
for (int i = 0; i < 500; i++) {
updateKeyValues.put(i, i + "_updated");
}
vm0.invoke(() -> {
putGivenKeyValue(partitionedRegionName, updateKeyValues);
// no conflation of creates
waitForParallelAsyncEventQueueSize(keyValues.size() + updateKeyValues.size());
putGivenKeyValue(partitionedRegionName, updateKeyValues);
// conflation of updates
waitForParallelAsyncEventQueueSize(keyValues.size() + updateKeyValues.size());
});
vm0.invoke(() -> getInternalGatewaySender().resume());
vm1.invoke(() -> getInternalGatewaySender().resume());
vm2.invoke(() -> getInternalGatewaySender().resume());
vm3.invoke(() -> getInternalGatewaySender().resume());
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> waitForAsyncQueueToEmpty());
vm3.invoke(() -> waitForAsyncQueueToEmpty());
int sizeInVM0 = vm0.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM1 = vm1.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM2 = vm2.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM3 = vm3.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
String description = "sizeInVM0=" + sizeInVM0 + ", sizeInVM1=" + sizeInVM1 + ", sizeInVM2="
+ sizeInVM2 + ", sizeInVM3=" + sizeInVM3;
assertThat(sizeInVM0 + sizeInVM1 + sizeInVM2 + sizeInVM3).as(description)
.isEqualTo(keyValues.size());
}
/**
* Added to reproduce defect #47213
*
* TRAC #47213:
*/
@Test // parallel, conflation, PartitionedRegion, RegressionTest
public void testParallelAsyncEventQueueWithConflationEnabled_bug47213() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm3.invoke(() -> createCache());
vm0.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm1.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm2.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm3.invoke(
() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100,
dispatcherThreadCount, 100, true));
vm0.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm1.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm2.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm3.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());
vm3.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm2.invoke(() -> waitForDispatcherToPause());
vm3.invoke(() -> waitForDispatcherToPause());
Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
keyValues.put(i, i);
}
vm0.invoke(() -> {
putGivenKeyValue(partitionedRegionName, keyValues);
waitForParallelAsyncEventQueueSize(keyValues.size());
});
Map<Integer, String> updateKeyValues = new HashMap<>();
for (int i = 0; i < 500; i++) {
updateKeyValues.put(i, i + "_updated");
}
vm0.invoke(() -> {
putGivenKeyValue(partitionedRegionName, updateKeyValues);
putGivenKeyValue(partitionedRegionName, updateKeyValues);
waitForParallelAsyncEventQueueSize(keyValues.size() + updateKeyValues.size());
});
vm0.invoke(() -> getInternalGatewaySender().resume());
vm1.invoke(() -> getInternalGatewaySender().resume());
vm2.invoke(() -> getInternalGatewaySender().resume());
vm3.invoke(() -> getInternalGatewaySender().resume());
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> waitForAsyncQueueToEmpty());
vm3.invoke(() -> waitForAsyncQueueToEmpty());
int sizeInVM0 = vm0.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM1 = vm1.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM2 = vm2.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM3 = vm3.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
String description = "sizeInVM0=" + sizeInVM0 + ", sizeInVM1=" + sizeInVM1 + ", sizeInVM2="
+ sizeInVM2 + ", sizeInVM3=" + sizeInVM3;
assertThat(sizeInVM0 + sizeInVM1 + sizeInVM2 + sizeInVM3).as(description)
.isEqualTo(keyValues.size());
}
@Test // parallel, PartitionedRegion, accessor
public void testParallelAsyncEventQueueWithOneAccessor() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm3.invoke(() -> createCache());
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm2.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm3.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100, true));
vm0.invoke(
() -> createPartitionedRegionAccessor(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm3.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> doPuts(partitionedRegionName, 256));
vm1.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> waitForAsyncQueueToEmpty());
vm3.invoke(() -> waitForAsyncQueueToEmpty());
vm0.invoke(() -> validateSpyAsyncEventListenerEventsMap(0));
int sizeInVM1 = vm1.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM2 = vm2.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM3 = vm3.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
String description =
"sizeInVM1=" + sizeInVM1 + ", sizeInVM2=" + sizeInVM2 + ", sizeInVM3=" + sizeInVM3;
assertThat(sizeInVM1 + sizeInVM2 + sizeInVM3).as(description).isEqualTo(256);
}
@Test // parallel, persistent, PartitionedRegion
public void testParallelAsyncEventQueueWithPersistence() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm3.invoke(() -> createCache());
vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, true,
true));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, true,
true));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, true,
true));
vm3.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, true,
true));
vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm2.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm3.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm0.invoke(() -> doPuts(partitionedRegionName, 256));
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> waitForAsyncQueueToEmpty());
vm3.invoke(() -> waitForAsyncQueueToEmpty());
int sizeInVM0 = vm0.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM1 = vm1.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM2 = vm2.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
int sizeInVM3 = vm3.invoke(() -> getSpyAsyncEventListener().getEventsMap().size());
String description = "sizeInVM0=" + sizeInVM0 + ", sizeInVM1=" + sizeInVM1 + ", sizeInVM2="
+ sizeInVM2 + ", sizeInVM3=" + sizeInVM3;
assertThat(sizeInVM0 + sizeInVM1 + sizeInVM2 + sizeInVM3).as(description).isEqualTo(256);
}
/**
* Test case to test possibleDuplicates. vm0 & vm1 are hosting the PR. vm1 is killed so the
* buckets hosted by it are shifted to vm0.
*/
@Test // parallel, PartitionedRegion, possibleDuplicates
public void testParallelAsyncEventQueueHA_Scenario1() throws InterruptedException {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(),
false, 5, dispatcherThreadCount, 100, true));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(),
false, 5, dispatcherThreadCount, 100, true));
vm0.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm1.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm0.invoke(() -> doPuts(partitionedRegionName, 80));
Set<Integer> primaryBucketsInVM1 =
vm0.invoke(() -> getAllLocalPrimaryBucketIds(partitionedRegionName));
Set<Integer> primaryBucketsInVM2 =
vm1.invoke(() -> getAllLocalPrimaryBucketIds(partitionedRegionName));
assertThat(primaryBucketsInVM1.size() + primaryBucketsInVM2.size()).isEqualTo(16);
vm1.invoke(() -> getCache().close());
vm0.invoke(() -> {
// give some time for rebalancing to happen
SECONDS.sleep(2); // TODO: change to await rebalancing to complete
getInternalGatewaySender().resume();
waitForAsyncQueueToEmpty();
validatePossibleDuplicateEvents(primaryBucketsInVM2, 5);
});
}
/**
* Test case to test possibleDuplicates. vm0 & vm1 are hosting the PR. vm1 is killed and
* subsequently vm2 is brought up. Buckets are now rebalanced between vm0 & vm2.
*/
@Test // parallel, PartitionedRegion, possibleDuplicates
public void testParallelAsyncEventQueueHA_Scenario2() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(),
false, 5, dispatcherThreadCount, 100, true));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(),
false, 5, dispatcherThreadCount, 100, true));
vm0.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm1.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(() -> doPuts(partitionedRegionName, 80));
Set<Integer> primaryBucketsInVM1 =
vm1.invoke(() -> getAllLocalPrimaryBucketIds(partitionedRegionName));
assertThat(primaryBucketsInVM1).isNotEmpty();
// before shutdown vm1, both vm0 and vm1 should have 40 events in primary queue
vm0.invoke(() -> validateAsyncEventQueueStats(40, 40, 80, 80, 0));
vm1.invoke(() -> validateAsyncEventQueueStats(40, 40, 80, 80, 0));
vm1.invoke(() -> getCache().close());
vm2.invoke(() -> {
createCache();
createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(), false, 5,
dispatcherThreadCount, 100, true);
// vm2 will move some primary buckets from vm0, but vm0's primary queue size did not reduce
getInternalGatewaySender().pause();
createPartitionedRegionAndAwaitRecovery(partitionedRegionName, asyncEventQueueId, 1, 16);
});
Set<Integer> primaryBucketsInVM0 =
vm0.invoke(() -> getAllLocalPrimaryBucketIds(partitionedRegionName));
assertThat(primaryBucketsInVM0).isNotEmpty();
Set<Integer> primaryBucketsInVM2 =
vm2.invoke(() -> getAllLocalPrimaryBucketIds(partitionedRegionName));
assertThat(primaryBucketsInVM2).isNotEmpty();
vm0.invoke(() -> validateAsyncEventQueueStats(40, 40, 80, 80, 0));
vm2.invoke(() -> validateAsyncEventQueueStats(40, 40, 0, 0, 0));
vm0.invoke(() -> getInternalGatewaySender().resume());
vm2.invoke(() -> getInternalGatewaySender().resume());
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> validatePossibleDuplicateEvents(primaryBucketsInVM2, 5));
}
/**
* Test case to test possibleDuplicates. vm0 & vm1 are hosting the PR. vm2 is brought up and
* rebalancing is triggered so the buckets get balanced among vm0, vm1 & vm2.
*/
@Test // parallel, PartitionedRegion, possibleDuplicates
public void testParallelAsyncEventQueueHA_Scenario3() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(),
false, 5, dispatcherThreadCount, 100, true));
vm1.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(),
false, 5, dispatcherThreadCount, 100, true));
vm0.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm1.invoke(() -> createPartitionedRegionAndAwaitRecovery(partitionedRegionName,
asyncEventQueueId, 1, 16));
vm0.invoke(() -> getInternalGatewaySender().pause());
vm1.invoke(() -> getInternalGatewaySender().pause());
vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm0.invoke(() -> doPuts(partitionedRegionName, 80));
vm2.invoke(() -> {
createCache();
createAsyncEventQueue(asyncEventQueueId, new GatewaySenderAsyncEventListener(), false, 5,
dispatcherThreadCount, 100, true);
createPartitionedRegionAndAwaitRecovery(partitionedRegionName, asyncEventQueueId, 1, 16);
});
vm0.invoke(() -> doRebalance());
Set<Integer> primaryBucketsInVM3 =
vm2.invoke(() -> getAllLocalPrimaryBucketIds(partitionedRegionName));
vm0.invoke(() -> getInternalGatewaySender().resume());
vm1.invoke(() -> getInternalGatewaySender().resume());
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> waitForAsyncQueueToEmpty());
vm2.invoke(() -> validatePossibleDuplicateEvents(primaryBucketsInVM3, 5));
}
/**
* Added for defect #50364 Can't colocate region that has AEQ with a region that does not have
* that same AEQ
*
* TRAC #50364:
*/
@Test // parallel, PartitionedRegion, colocated, RegressionTest, IntegrationTest
public void testParallelAsyncEventQueueAttachedToChildRegionButNotToParentRegion() {
vm2.invoke(() -> {
// create cache on node
createCache();
// create AsyncEventQueue on node
createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 10,
dispatcherThreadCount, 100, true);
// create leader (parent) PR on node
createPartitionedRegionWithRecoveryDelay(partitionedRegionName, 0, 0, 100);
// create colocated (child) PR on node
createColocatedPartitionedRegion(childPartitionedRegionName, asyncEventQueueId,
0, 100, partitionedRegionName);
// do puts in colocated (child) PR on node
doPuts(childPartitionedRegionName, 1000);
// wait for AsyncEventQueue to get empty on node
waitForAsyncQueueToEmpty();
// verify the events in listener
assertThat(getSpyAsyncEventListener().getEventsMap().size()).isEqualTo(1000);
});
}
@Test // parallel, Rebalancing, PartitionedRegion
public void testParallelAsyncEventQueueMoveBucketAndMoveItBackDuringDispatching() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
DistributedMember memberInVM0 =
vm0.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
DistributedMember memberInVM1 =
vm1.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
vm0.invoke(() -> {
createAsyncEventQueue(asyncEventQueueId,
new BucketMovingAsyncEventListener(memberInVM1, getCache(), partitionedRegionName),
false, 10, dispatcherThreadCount, 100, true);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
getInternalGatewaySender().pause();
doPuts(partitionedRegionName, 113);
});
vm1.invoke(() -> {
createAsyncEventQueue(asyncEventQueueId,
new BucketMovingAsyncEventListener(memberInVM0, getCache(), partitionedRegionName),
false, 10, dispatcherThreadCount, 100, true);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);
});
vm0.invoke(() -> getInternalGatewaySender().resume());
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
Set<Object> allKeys = new HashSet<>();
allKeys.addAll(vm0.invoke(() -> getBucketMovingAsyncEventListener().getKeysSeen()));
allKeys.addAll(vm1.invoke(() -> getBucketMovingAsyncEventListener().getKeysSeen()));
Set<Integer> expectedKeys = IntStream.range(0, 113).boxed().collect(toSet());
assertThat(allKeys).isEqualTo(expectedKeys);
assertThat(vm0.invoke(() -> getBucketMovingAsyncEventListener().isMoved())).isTrue();
vm1.invoke(() -> await()
.untilAsserted(() -> assertThat(getBucketMovingAsyncEventListener().isMoved()).isTrue()));
}
@Test // parallel, possibleDuplicates, PartitionedRegion, try-finally
public void testParallelAsyncEventQueueWithPossibleDuplicateEvents() {
// Set disable move primaries on start up
vm0.invoke(() -> setDisableMovePrimary());
vm1.invoke(() -> setDisableMovePrimary());
int numPuts = 30;
vm0.invoke(() -> {
// Create cache and async event queue in member 1
createCache();
createAsyncEventQueue(asyncEventQueueId, new PossibleDuplicateAsyncEventListener(),
false, 1, dispatcherThreadCount, 100, true);
// Create region with async event queue in member 1
createPartitionedRegionAndAwaitRecovery(partitionedRegionName, asyncEventQueueId, 1, 16);
// Do puts so that all primaries are in member 1
doPuts(partitionedRegionName, numPuts);
});
vm1.invoke(() -> {
// Create cache and async event queue in member 2
createCache();
createAsyncEventQueue(asyncEventQueueId, new PossibleDuplicateAsyncEventListener(),
false, 1, dispatcherThreadCount, 100, true);
// Create region with paused async event queue in member 2
createPartitionedRegionAndAwaitRecovery(partitionedRegionName, asyncEventQueueId, 1, 16);
getInternalGatewaySender().pause();
});
// Close cache in member 1 (all AEQ buckets will fail over to member 2)
vm0.invoke(() -> getCache().close());
vm1.invoke(() -> {
// Start processing async event queue in member 2
getInternalGatewaySender().resume();
getPossibleDuplicateAsyncEventListener().startProcessingEvents();
// Wait for queue to be empty
waitForAsyncQueueToEmpty();
// Verify all events were processed in member 2
validatePossibleDuplicateEvents(numPuts);
});
}
@Test // parallel, PartitionedRegion, Rebalancing
public void testParallelAsyncEventQueueMovePrimaryAndMoveItBackDuringDispatching() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
DistributedMember memberInVM0 =
vm0.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
DistributedMember memberInVM1 =
vm1.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
vm0.invoke(() -> {
// Create a PR with 1 bucket in vm0. Pause the sender and put some data in it
createAsyncEventQueue(asyncEventQueueId, new PrimaryMovingAsyncEventListener(memberInVM1),
false, 10, dispatcherThreadCount, 100, true);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 1, 1);
getInternalGatewaySender().pause();
doPuts(partitionedRegionName, 113);
});
vm1.invoke(() -> {
// Create the PR in vm1. This will create a redundant copy, but will be the secondary
createAsyncEventQueue(asyncEventQueueId, new PrimaryMovingAsyncEventListener(memberInVM0),
false, 10, dispatcherThreadCount, 100, true);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 1, 1);
// do a rebalance just to make sure we have restored redundancy
getCache().getResourceManager().createRebalanceFactory().start().getResults();
});
// Resume the AEQ. This should trigger the primary to move to vm1, which will then move it back
vm0.invoke(() -> getInternalGatewaySender().resume());
vm0.invoke(() -> waitForPrimaryToMove());
vm1.invoke(() -> waitForPrimaryToMove());
vm0.invoke(() -> waitForAsyncQueueToEmpty());
vm1.invoke(() -> waitForAsyncQueueToEmpty());
Set<Object> allKeys = new HashSet<>();
allKeys.addAll(vm0.invoke(() -> getPrimaryMovingAsyncEventListener().getKeysSeen()));
allKeys.addAll(vm1.invoke(() -> getPrimaryMovingAsyncEventListener().getKeysSeen()));
Set<Integer> expectedKeys = IntStream.range(0, 113).boxed().collect(toSet());
assertThat(allKeys).isEqualTo(expectedKeys);
}
private void setDisableMovePrimary() {
System.setProperty("gemfire.DISABLE_MOVE_PRIMARIES_ON_STARTUP", "true");
}
private InternalCache getCache() {
return cacheRule.getOrCreateCache(getDistributedSystemProperties());
}
private void createCache() {
cacheRule.createCache(getDistributedSystemProperties());
}
private void createPartitionedRegion(String regionName,
String asyncEventQueueId,
int redundantCopies,
int totalNumBuckets) {
assertThat(regionName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
PartitionAttributesFactory<?, ?> partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.setRedundantCopies(redundantCopies);
partitionAttributesFactory.setTotalNumBuckets(totalNumBuckets);
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(PARTITION);
regionFactory.addAsyncEventQueueId(asyncEventQueueId);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
configureRegion(regionFactory).create(regionName);
}
private void createPartitionedRegionAccessor(String regionName,
String asyncEventQueueId,
int redundantCopies,
int totalNumBuckets) {
assertThat(regionName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
PartitionAttributesFactory<?, ?> partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.setLocalMaxMemory(0);
partitionAttributesFactory.setRedundantCopies(redundantCopies);
partitionAttributesFactory.setTotalNumBuckets(totalNumBuckets);
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(PARTITION);
regionFactory.addAsyncEventQueueId(asyncEventQueueId);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
configureRegion(regionFactory).create(regionName);
}
private void createPartitionedRegionWithRecoveryDelay(String regionName,
long recoveryDelay,
int redundantCopies,
int totalNumBuckets) {
assertThat(regionName).isNotEmpty();
PartitionAttributesFactory<?, ?> partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.setRecoveryDelay(recoveryDelay);
partitionAttributesFactory.setRedundantCopies(redundantCopies);
partitionAttributesFactory.setTotalNumBuckets(totalNumBuckets);
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(PARTITION);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
configureRegion(regionFactory).create(regionName);
}
private void createFixedPartitionedRegion(String regionName,
String asyncEventQueueId,
int redundantCopies,
int totalNumBuckets,
String partitionName,
PartitionResolver partitionResolver) {
assertThat(regionName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
assertThat(partitionName).isNotEmpty();
assertThat(partitionResolver).isNotNull();
FixedPartitionAttributes fixedPartitionAttributes = createFixedPartition(partitionName, true);
PartitionAttributesFactory<?, ?> partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.addFixedPartitionAttributes(fixedPartitionAttributes);
partitionAttributesFactory.setPartitionResolver(partitionResolver);
partitionAttributesFactory.setRedundantCopies(redundantCopies);
partitionAttributesFactory.setTotalNumBuckets(totalNumBuckets);
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(PARTITION);
regionFactory.addAsyncEventQueueId(asyncEventQueueId);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
configureRegion(regionFactory).create(regionName);
}
private void createColocatedPartitionedRegion(String regionName,
String asyncEventQueueId,
int redundantCopies,
int totalNumBuckets,
String colocatedWith) {
assertThat(regionName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
assertThat(colocatedWith).isNotEmpty();
PartitionAttributesFactory<?, ?> partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.setColocatedWith(colocatedWith);
partitionAttributesFactory.setRedundantCopies(redundantCopies);
partitionAttributesFactory.setTotalNumBuckets(totalNumBuckets);
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(PARTITION);
regionFactory.addAsyncEventQueueId(asyncEventQueueId);
regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
configureRegion(regionFactory).create(regionName);
}
private void createPartitionedRegionAndAwaitRecovery(String regionName,
String asyncEventQueueId,
int redundantCopies,
int totalNumBuckets)
throws InterruptedException {
CountDownLatch recoveryDone = new CountDownLatch(2);
ResourceObserver resourceObserver = new ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
recoveryDone.countDown();
}
};
InternalResourceManager.setResourceObserver(resourceObserver);
createPartitionedRegion(regionName, asyncEventQueueId, redundantCopies, totalNumBuckets);
recoveryDone.await(2, MINUTES);
}
private void createReplicateRegion(String regionName, String asyncEventQueueId) {
assertThat(regionName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
RegionFactory<?, ?> regionFactory = getCache().createRegionFactory(REPLICATE);
regionFactory.addAsyncEventQueueId(asyncEventQueueId);
configureRegion(regionFactory).create(regionName);
}
private void createDiskStore(String diskStoreName, String asyncEventQueueId) {
assertThat(diskStoreName).isNotEmpty();
assertThat(asyncEventQueueId).isNotEmpty();
File directory = createDirectory(createDiskStoreName(asyncEventQueueId));
DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
diskStoreFactory.setDiskDirs(new File[] {directory});
diskStoreFactory.create(diskStoreName);
}
private File createDirectory(String name) {
assertThat(name).isNotEmpty();
File directory = new File(temporaryFolder.getRoot(), name);
if (!directory.exists()) {
try {
return temporaryFolder.newFolder(name);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return directory;
}
private String createDiskStoreName(String asyncEventQueueId) {
assertThat(asyncEventQueueId).isNotEmpty();
return asyncEventQueueId + "_disk_" + getCurrentVMNum();
}
private void createPersistentAsyncEventQueue(String asyncEventQueueId,
AsyncEventListener asyncEventListener,
boolean isBatchConflationEnabled,
int batchSize,
String diskStoreName,
boolean isDiskSynchronous,
int dispatcherThreads,
int maximumQueueMemory,
boolean isParallel,
boolean isPersistent) {
assertThat(asyncEventQueueId).isNotEmpty();
assertThat(asyncEventListener).isNotNull();
assertThat(diskStoreName).isNotEmpty();
createDiskStore(diskStoreName, asyncEventQueueId);
AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory();
asyncEventQueueFactory.setBatchConflationEnabled(isBatchConflationEnabled);
asyncEventQueueFactory.setBatchSize(batchSize);
asyncEventQueueFactory.setDiskStoreName(diskStoreName);
asyncEventQueueFactory.setDiskSynchronous(isDiskSynchronous);
asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads);
asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory);
asyncEventQueueFactory.setParallel(isParallel);
asyncEventQueueFactory.setPersistent(isPersistent);
asyncEventQueueFactory.create(asyncEventQueueId, asyncEventListener);
}
private void createAsyncEventQueue(String asyncEventQueueId,
AsyncEventListener asyncEventListener,
boolean isBatchConflationEnabled,
int batchSize,
int dispatcherThreads,
int maximumQueueMemory,
boolean isParallel) {
assertThat(asyncEventQueueId).isNotEmpty();
assertThat(asyncEventListener).isNotNull();
AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory();
asyncEventQueueFactory.setBatchConflationEnabled(isBatchConflationEnabled);
asyncEventQueueFactory.setBatchSize(batchSize);
asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads);
asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory);
asyncEventQueueFactory.setParallel(isParallel);
asyncEventQueueFactory.setPersistent(false);
asyncEventQueueFactory.create(asyncEventQueueId, asyncEventListener);
}
private void doPuts(String regionName, int numPuts) {
Region<Integer, Integer> region = getCache().getRegion(regionName);
for (int i = 0; i < numPuts; i++) {
region.put(i, i);
}
}
private void doRebalance() throws InterruptedException, TimeoutException {
ResourceManager resourceManager = getCache().getResourceManager();
RebalanceFactory rebalanceFactory = resourceManager.createRebalanceFactory();
RebalanceOperation rebalanceOperation = rebalanceFactory.start();
RebalanceResults rebalanceResults = rebalanceOperation.getResults(2, MINUTES);
assertThat(rebalanceResults).isNotNull();
}
private Set<Integer> getAllLocalPrimaryBucketIds(String regionName) {
PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
return region.getDataStore().getAllLocalPrimaryBucketIds();
}
private void putGivenKeyValue(String regionName, Map<Integer, ?> keyValues) {
Region<Integer, Object> region = getCache().getRegion(regionName);
for (int key : keyValues.keySet()) {
region.put(key, keyValues.get(key));
}
}
private boolean areRegionQueuesEmpty(InternalGatewaySender gatewaySender) {
assertThat(gatewaySender).isNotNull();
Set<RegionQueue> regionQueues = gatewaySender.getQueues();
int totalSize = 0;
for (RegionQueue regionQueue : regionQueues) {
totalSize += regionQueue.size();
}
return totalSize == 0;
}
private void validateAsyncEventQueueStats(int queueSize,
int secondaryQueueSize,
int eventsReceived,
int eventsQueued,
int eventsDistributed) {
InternalAsyncEventQueue asyncEventQueue = getInternalAsyncEventQueue();
AsyncEventQueueStats asyncEventQueueStats = asyncEventQueue.getStatistics();
assertThat(asyncEventQueueStats).isNotNull();
await()
.untilAsserted(
() -> assertThat(asyncEventQueueStats.getEventQueueSize()).isEqualTo(queueSize));
if (asyncEventQueue.isParallel()) {
await()
.untilAsserted(() -> assertThat(asyncEventQueueStats.getSecondaryEventQueueSize())
.isEqualTo(secondaryQueueSize));
} else {
// for serial queue, secondaryQueueSize is not used
assertThat(asyncEventQueueStats.getSecondaryEventQueueSize()).isEqualTo(0);
}
assertThat(asyncEventQueueStats.getEventQueueSize()).isEqualTo(queueSize);
assertThat(asyncEventQueueStats.getEventsReceived()).isEqualTo(eventsReceived);
assertThat(asyncEventQueueStats.getEventsQueued()).isEqualTo(eventsQueued);
assertThat(asyncEventQueueStats.getEventsDistributed())
.isGreaterThanOrEqualTo(eventsDistributed);
}
private void validateParallelAsyncEventQueueSize(int expectedRegionQueueSize) {
InternalGatewaySender gatewaySender = getInternalGatewaySender();
Set<RegionQueue> regionQueues = gatewaySender.getQueues();
assertThat(regionQueues).isNotEmpty().hasSize(1);
Region<?, ?> region = regionQueues.iterator().next().getRegion();
assertThat(region.size()).isEqualTo(expectedRegionQueueSize);
}
private void validatePossibleDuplicateEvents(int numEvents) {
PossibleDuplicateAsyncEventListener listener = getPossibleDuplicateAsyncEventListener();
// Verify all events were processed
assertThat(listener.getTotalEvents()).isEqualTo(numEvents);
// Verify all events are possibleDuplicate
assertThat(listener.getTotalPossibleDuplicateEvents()).isEqualTo(numEvents);
}
private void validatePossibleDuplicateEvents(Set<Integer> bucketIds, int batchSize) {
assertThat(bucketIds.size()).isGreaterThan(1);
Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap =
getGatewaySenderAsyncEventListener().getBucketToEventsMap();
for (int bucketId : bucketIds) {
List<GatewaySenderEventImpl> eventsForBucket = bucketToEventsMap.get(bucketId);
assertThat(eventsForBucket).as("bucketToEventsMap: " + bucketToEventsMap).isNotNull()
.hasSize(batchSize);
for (int i = 0; i < batchSize; i++) {
GatewaySenderEventImpl gatewaySenderEvent = eventsForBucket.get(i);
assertThat(gatewaySenderEvent.getPossibleDuplicate()).isTrue();
}
}
}
private void validateSpyAsyncEventListenerEventsMap(int expectedSize) {
Map eventsMap = (Map<?, ?>) getSpyAsyncEventListener().getEventsMap();
await().untilAsserted(() -> assertThat(eventsMap).hasSize(expectedSize));
}
private void waitForAsyncQueueToEmpty() {
InternalGatewaySender gatewaySender = getInternalGatewaySender();
await().until(() -> areRegionQueuesEmpty(gatewaySender));
}
private void waitForDispatcherToPause() {
getInternalGatewaySender().getEventProcessor().waitForDispatcherToPause();
}
private void waitForParallelAsyncEventQueueSize(int expectedRegionQueueSize) {
InternalGatewaySender gatewaySender = getInternalGatewaySender();
await().untilAsserted(() -> {
Set<RegionQueue> regionQueues = gatewaySender.getQueues();
assertThat(regionQueues).isNotEmpty().hasSize(1);
Region<?, ?> region = regionQueues.iterator().next().getRegion();
assertThat(region.size()).isEqualTo(expectedRegionQueueSize);
});
}
private void waitForPrimaryToMove() {
await().until(() -> getPrimaryMovingAsyncEventListener().isMoved());
}
private InternalGatewaySender getInternalGatewaySender() {
InternalGatewaySender gatewaySender = getInternalAsyncEventQueue().getSender();
assertThat(gatewaySender).isNotNull();
return gatewaySender;
}
private PrimaryMovingAsyncEventListener getPrimaryMovingAsyncEventListener() {
return (PrimaryMovingAsyncEventListener) getAsyncEventListener();
}
private BucketMovingAsyncEventListener getBucketMovingAsyncEventListener() {
return (BucketMovingAsyncEventListener) getAsyncEventListener();
}
private SpyAsyncEventListener getSpyAsyncEventListener() {
return (SpyAsyncEventListener) getAsyncEventListener();
}
private GatewaySenderAsyncEventListener getGatewaySenderAsyncEventListener() {
return (GatewaySenderAsyncEventListener) getAsyncEventListener();
}
private PossibleDuplicateAsyncEventListener getPossibleDuplicateAsyncEventListener() {
return (PossibleDuplicateAsyncEventListener) getAsyncEventListener();
}
private AsyncEventListener getAsyncEventListener() {
AsyncEventListener asyncEventListener = getAsyncEventQueue().getAsyncEventListener();
assertThat(asyncEventListener).isNotNull();
return asyncEventListener;
}
private InternalAsyncEventQueue getInternalAsyncEventQueue() {
return (InternalAsyncEventQueue) getAsyncEventQueue();
}
private AsyncEventQueue getAsyncEventQueue() {
AsyncEventQueue value = null;
Set<AsyncEventQueue> asyncEventQueues = getCache().getAsyncEventQueues();
for (AsyncEventQueue asyncEventQueue : asyncEventQueues) {
if (asyncEventQueueId.equals(asyncEventQueue.getId())) {
value = asyncEventQueue;
}
}
assertThat(value).isNotNull();
return value;
}
private abstract static class AbstractMovingAsyncEventListener implements AsyncEventListener {
private final DistributedMember destination;
private final Set<Object> keysSeen = synchronizedSet(new HashSet<>());
private volatile boolean moved;
AbstractMovingAsyncEventListener(final DistributedMember destination) {
this.destination = destination;
}
@Override
public boolean processEvents(final List<AsyncEvent> events) {
if (!moved) {
AsyncEvent event1 = events.get(0);
move(event1);
moved = true;
return false;
}
Set<Object> keysInThisBatch = events.stream()
.map(AsyncEvent::getKey)
.collect(toSet());
keysSeen.addAll(keysInThisBatch);
return true;
}
DistributedMember getDestination() {
return destination;
}
Set<Object> getKeysSeen() {
return keysSeen;
}
boolean isMoved() {
return moved;
}
abstract void move(AsyncEvent event);
}
private static class BucketMovingAsyncEventListener extends AbstractMovingAsyncEventListener {
private final Cache cache;
private final String regionName;
BucketMovingAsyncEventListener(final DistributedMember destination, final Cache cache,
final String regionName) {
super(destination);
this.cache = cache;
this.regionName = regionName;
}
@Override
protected void move(final AsyncEvent event) {
Object key = event.getKey();
Region<Object, Object> region = cache.getRegion(regionName);
DistributedMember source = cache.getDistributedSystem().getDistributedMember();
PartitionRegionHelper.moveBucketByKey(region, source, getDestination(), key);
}
}
private static class GatewaySenderAsyncEventListener implements AsyncEventListener {
private final Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap = new HashMap<>();
Map<Integer, List<GatewaySenderEventImpl>> getBucketToEventsMap() {
assertThat(bucketToEventsMap).isNotNull();
return bucketToEventsMap;
}
@Override
public synchronized boolean processEvents(List<AsyncEvent> events) {
for (AsyncEvent event : events) {
GatewaySenderEventImpl gatewayEvent = (GatewaySenderEventImpl) event;
int bucketId = gatewayEvent.getBucketId();
List<GatewaySenderEventImpl> bucketEvents = bucketToEventsMap.get(bucketId);
if (bucketEvents == null) {
bucketEvents = new ArrayList<>();
bucketEvents.add(gatewayEvent);
bucketToEventsMap.put(bucketId, bucketEvents);
} else {
bucketEvents.add(gatewayEvent);
}
}
return true;
}
}
private static class PossibleDuplicateAsyncEventListener implements AsyncEventListener {
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicInteger numberOfEvents = new AtomicInteger();
private final AtomicInteger numberOfPossibleDuplicateEvents = new AtomicInteger();
private void process(final AsyncEvent<?, ?> event) {
if (event.getPossibleDuplicate()) {
incrementTotalPossibleDuplicateEvents();
}
incrementTotalEvents();
}
private void waitToStartProcessingEvents() throws InterruptedException {
latch.await(2, MINUTES);
}
private void incrementTotalEvents() {
numberOfEvents.incrementAndGet();
}
private void incrementTotalPossibleDuplicateEvents() {
numberOfPossibleDuplicateEvents.incrementAndGet();
}
void startProcessingEvents() {
latch.countDown();
}
int getTotalEvents() {
return numberOfEvents.get();
}
int getTotalPossibleDuplicateEvents() {
return numberOfPossibleDuplicateEvents.get();
}
@Override
public boolean processEvents(final List<AsyncEvent> events) {
try {
waitToStartProcessingEvents();
} catch (InterruptedException e) {
throw new Error(e);
}
for (AsyncEvent<?, ?> event : events) {
process(event);
}
return true;
}
}
private static class PrimaryMovingAsyncEventListener extends AbstractMovingAsyncEventListener {
PrimaryMovingAsyncEventListener(final DistributedMember destination) {
super(destination);
}
@Override
protected void move(final AsyncEvent event) {
Object key = event.getKey();
PartitionedRegion region = (PartitionedRegion) event.getRegion();
BecomePrimaryBucketResponse response =
BecomePrimaryBucketMessage.send((InternalDistributedMember) getDestination(), region,
region.getKeyInfo(key).getBucketId(), true);
assertThat(response).isNotNull();
assertThat(response.waitForResponse()).isTrue();
}
}
private static class SimpleFixedPartitionResolver implements FixedPartitionResolver {
private final List<String> allPartitions;
SimpleFixedPartitionResolver(final List<String> allPartitions) {
this.allPartitions = allPartitions;
}
@Override
public String getPartitionName(final EntryOperation opDetails,
@Deprecated final Set targetPartitions) {
int hash = Math.abs(opDetails.getKey().hashCode() % allPartitions.size());
return allPartitions.get(hash);
}
@Override
public Object getRoutingObject(final EntryOperation opDetails) {
return opDetails.getKey();
}
@Override
public String getName() {
return getClass().getName();
}
}
private static class SpyAsyncEventListener<K, V> implements AsyncEventListener, Declarable {
private final Map<K, V> eventsMap = new ConcurrentHashMap<>();
Map<K, V> getEventsMap() {
assertThat(eventsMap).isNotNull();
return eventsMap;
}
@Override
public boolean processEvents(List<AsyncEvent> events) {
for (AsyncEvent<K, V> event : events) {
eventsMap.put(event.getKey(), event.getDeserializedValue());
}
return true;
}
}
}