blob: 70993d82a9b362f6514fae2697a5c52e6bb8efbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMORY_SIZE;
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueStats;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallbackAdapter;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
import org.apache.geode.internal.size.Sizeable;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedRule;
/**
* @deprecated Please use {@link DistributedRule} and Geode User APIs or {@link ClusterStartupRule}
* instead.
*/
@Deprecated
public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
protected static Cache cache;
protected static VM vm0;
protected static VM vm1;
protected static VM vm2;
protected static VM vm3;
protected static VM vm4;
protected static AsyncEventListener eventListener1;
private static final long MAX_WAIT = 60000;
protected static GatewayEventFilter eventFilter;
protected static boolean destroyFlag = false;
protected static List<Integer> dispatcherThreads = new ArrayList<>(Arrays.asList(1, 3, 5));
// this will be set for each test method run with one of the values from above
// list
protected static int numDispatcherThreadsForTheRun = 1;
public AsyncEventQueueTestBase() {
super();
}
@Override
public final void preSetUp() throws Exception {
final Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
vm2 = host.getVM(2);
vm3 = host.getVM(3);
vm4 = host.getVM(4);
}
@Override
public final void postSetUp() throws Exception {
// this is done to vary the number of dispatchers for sender
// during every test method run
shuffleNumDispatcherThreads();
Invoke.invokeInEveryVM(() -> setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0)));
}
public static void shuffleNumDispatcherThreads() {
Collections.shuffle(dispatcherThreads);
}
public static void setNumDispatcherThreadsForTheRun(int numThreads) {
numDispatcherThreadsForTheRun = numThreads;
}
public static Integer createFirstLocatorWithDSId(int dsId) {
if (Locator.hasLocator()) {
Locator.getLocator().stop();
}
AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
// props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
props.setProperty(LOCATORS, "localhost[" + port + "]");
props.setProperty(START_LOCATOR,
"localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
test.startLocatorDistributedSystem(props);
return port;
}
public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
props.setProperty(LOCATORS, "localhost[" + port + "]");
props.setProperty(START_LOCATOR,
"localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
test.startLocatorDistributedSystem(props);
return port;
}
private void startLocatorDistributedSystem(Properties props) {
// Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
try {
getSystem(props);
} finally {
System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
}
}
public static void createReplicatedRegionWithAsyncEventQueue(String regionName,
String asyncQueueIds, Boolean offHeap) {
IgnoredException exp1 =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
AttributesFactory fact = new AttributesFactory();
addAsyncEventQueueIds(fact, asyncQueueIds);
fact.setDataPolicy(DataPolicy.REPLICATE);
fact.setOffHeap(offHeap);
RegionFactory regionFactory = cache.createRegionFactory(fact.create());
Region r = regionFactory.create(regionName);
assertNotNull(r);
} finally {
exp1.remove();
}
}
public static void createReplicatedRegionWithCacheLoaderAndAsyncEventQueue(String regionName,
String asyncQueueIds) {
AttributesFactory fact = new AttributesFactory();
addAsyncEventQueueIds(fact, asyncQueueIds);
fact.setDataPolicy(DataPolicy.REPLICATE);
// set the CacheLoader
fact.setCacheLoader(new MyCacheLoader());
RegionFactory regionFactory = cache.createRegionFactory(fact.create());
Region r = regionFactory.create(regionName);
assertNotNull(r);
}
private static void addAsyncEventQueueIds(AttributesFactory fact, String asyncQueueIds) {
if (asyncQueueIds != null) {
StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
while (tokenizer.hasMoreTokens()) {
String asyncQueueId = tokenizer.nextToken();
fact.addAsyncEventQueueId(asyncQueueId);
}
}
}
public static void createReplicatedRegionWithSenderAndAsyncEventQueue(String regionName,
String senderIds, String asyncChannelId, Boolean offHeap) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
AttributesFactory fact = new AttributesFactory();
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
fact.addGatewaySenderId(senderId);
}
}
fact.setDataPolicy(DataPolicy.REPLICATE);
fact.setOffHeap(offHeap);
fact.setScope(Scope.DISTRIBUTED_ACK);
RegionFactory regionFactory = cache.createRegionFactory(fact.create());
regionFactory.addAsyncEventQueueId(asyncChannelId);
Region r = regionFactory.create(regionName);
assertNotNull(r);
} finally {
exp.remove();
}
}
public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
String diskStoreName, boolean isDiskSynchronous) {
createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation,
isPersistent, diskStoreName, isDiskSynchronous, new MyAsyncEventListener());
}
public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
String diskStoreName, boolean isDiskSynchronous,
final AsyncEventListener asyncEventListener) {
createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation,
isPersistent, diskStoreName, isDiskSynchronous, numDispatcherThreadsForTheRun,
asyncEventListener);
}
public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
String diskStoreName, boolean isDiskSynchronous, int numDispatcherThreads,
final AsyncEventListener asyncEventListener) {
createDiskStore(asyncChannelId, diskStoreName);
AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
factory.setDiskSynchronous(isDiskSynchronous);
factory.setBatchConflationEnabled(isConflation);
// set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreads);
// Set GatewayEventSubstitutionFilter
AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
}
private static void createDiskStore(String asyncChannelId, String diskStoreName) {
if (diskStoreName != null) {
File directory = new File(
asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
directory.mkdir();
File[] dirs1 = new File[] {directory};
DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.setDiskDirs(dirs1);
DiskStore ds = dsf.create(diskStoreName);
}
}
public static void createAsyncEventQueueWithListener2(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) {
createDiskStore(asyncChannelId, diskStoreName);
AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
// set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
}
public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation,
isPersistent, diskStoreName, isDiskSynchronous, asyncListenerClass, null);
}
public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass,
String substitutionFilterClass) throws Exception {
createDiskStore(asyncChannelId, diskStoreName);
AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
factory.setDiskSynchronous(isDiskSynchronous);
factory.setBatchConflationEnabled(isConflation);
if (substitutionFilterClass != null) {
factory.setGatewayEventSubstitutionListener(
(GatewayEventSubstitutionFilter) getClass(substitutionFilterClass).newInstance());
}
// set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
(AsyncEventListener) getClass(asyncListenerClass).newInstance());
}
private static Class getClass(String simpleClassName) throws Exception {
String packagePrefix = "org.apache.geode.internal.cache.wan.";
String className = packagePrefix + simpleClassName;
Class clazz = null;
clazz = Class.forName(className);
return clazz;
}
public static void createAsyncEventQueueWithCustomListener(String asyncChannelId,
boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) {
createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize,
isConflation, isPersistent, diskStoreName, isDiskSynchronous,
GatewaySender.DEFAULT_DISPATCHER_THREADS);
}
public static void createAsyncEventQueueWithCustomListener(String asyncChannelId,
boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
boolean isPersistent, String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
createDiskStore(asyncChannelId, diskStoreName);
AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
factory.setDispatcherThreads(nDispatchers);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
} finally {
exp.remove();
}
}
private static AsyncEventQueueFactory getInitialAsyncEventQueueFactory(boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) {
AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
factory.setBatchSize(batchSize);
factory.setPersistent(isPersistent);
factory.setDiskStoreName(diskStoreName);
factory.setMaximumQueueMemory(maxMemory);
factory.setParallel(isParallel);
return factory;
}
public static void createConcurrentAsyncEventQueue(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
String diskStoreName, boolean isDiskSynchronous, int dispatcherThreads, OrderPolicy policy) {
createDiskStore(asyncChannelId, diskStoreName);
AsyncEventListener asyncEventListener = new MyAsyncEventListener();
AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
factory.setDiskSynchronous(isDiskSynchronous);
factory.setBatchConflationEnabled(isConflation);
factory.setOrderPolicy(policy);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
}
public static String createAsyncEventQueueWithDiskStore(String asyncChannelId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isPersistent, String diskStoreName) {
AsyncEventListener asyncEventListener = new MyAsyncEventListener();
File persistentDirectory = null;
if (diskStoreName == null) {
persistentDirectory = new File(
asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
} else {
persistentDirectory = new File(diskStoreName);
}
LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] {persistentDirectory};
AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
factory.setBatchSize(batchSize);
factory.setParallel(isParallel);
if (isPersistent) {
factory.setPersistent(isPersistent);
factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId).getName());
}
factory.setMaximumQueueMemory(maxMemory);
// set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
return persistentDirectory.getName();
}
public static void pauseAsyncEventQueue(String asyncChannelId) {
AsyncEventQueue theChannel = null;
Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncChannel : asyncEventChannels) {
if (asyncChannelId.equals(asyncChannel.getId())) {
theChannel = asyncChannel;
}
}
((AsyncEventQueueImpl) theChannel).getSender().pause();
}
public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) {
AsyncEventQueue theChannel = null;
Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncChannel : asyncEventChannels) {
if (asyncChannelId.equals(asyncChannel.getId())) {
theChannel = asyncChannel;
break;
}
}
((AsyncEventQueueImpl) theChannel).getSender().pause();
((AsyncEventQueueImpl) theChannel).getSender().getEventProcessor()
.waitForDispatcherToPause();
}
public static void resumeAsyncEventQueue(String asyncQueueId) {
AsyncEventQueue theQueue = null;
Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncChannel : asyncEventChannels) {
if (asyncQueueId.equals(asyncChannel.getId())) {
theQueue = asyncChannel;
}
}
((AsyncEventQueueImpl) theQueue).getSender().resume();
}
public static void waitForAsyncEventQueueSize(String senderId, int numQueueEntries,
boolean localSize) {
await()
.untilAsserted(() -> checkAsyncEventQueueSize(senderId, numQueueEntries, localSize));
}
public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
checkAsyncEventQueueSize(asyncQueueId, numQueueEntries, false);
}
public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries,
boolean localSize) {
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(asyncQueueId);
GatewaySender sender = aeq.getSender();
if (sender.isParallel()) {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
Region queueRegion = queues.toArray(new RegionQueue[queues.size()])[0].getRegion();
if (localSize) {
queueRegion = PartitionRegionHelper.getLocalData(queueRegion);
}
assertEquals(numQueueEntries, queueRegion.size());
} else {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
int size = 0;
for (RegionQueue q : queues) {
size += q.size();
}
assertEquals(numQueueEntries, size);
}
}
public static void createPartitionedRegion(String regionName, String senderIds,
Integer redundantCopies, Integer totalNumBuckets) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
try {
AttributesFactory fact = new AttributesFactory();
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
// GatewaySender sender = cache.getGatewaySender(senderId);
// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
PartitionAttributesFactory pfact = new PartitionAttributesFactory();
pfact.setTotalNumBuckets(totalNumBuckets);
pfact.setRedundantCopies(redundantCopies);
pfact.setRecoveryDelay(0);
fact.setPartitionAttributes(pfact.create());
Region r = cache.createRegionFactory(fact.create()).create(regionName);
assertNotNull(r);
} finally {
exp.remove();
exp1.remove();
}
}
public static void createPartitionedRegionWithAsyncEventQueue(String regionName,
String asyncEventQueueId, Boolean offHeap) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
try {
AttributesFactory fact = new AttributesFactory();
PartitionAttributesFactory pfact = new PartitionAttributesFactory();
pfact.setTotalNumBuckets(16);
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId)
.create(regionName);
assertNotNull(r);
} finally {
exp.remove();
exp1.remove();
}
}
public static void createFixedPartitionedRegionWithAsyncEventQueue(String regionName,
String asyncEventQueueId, String partitionName, final List<String> allPartitions,
boolean offHeap) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
try {
AttributesFactory fact = new AttributesFactory();
PartitionAttributesFactory pfact = new PartitionAttributesFactory();
pfact.setTotalNumBuckets(16);
pfact.addFixedPartitionAttributes(
FixedPartitionAttributes.createFixedPartition(partitionName, true));
pfact.setPartitionResolver(new MyFixedPartitionResolver(allPartitions));
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId)
.create(regionName);
assertNotNull(r);
} finally {
exp.remove();
exp1.remove();
}
}
public static void createColocatedPartitionedRegionWithAsyncEventQueue(String regionName,
String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
try {
AttributesFactory fact = new AttributesFactory();
PartitionAttributesFactory pfact = new PartitionAttributesFactory();
pfact.setTotalNumBuckets(totalNumBuckets);
pfact.setColocatedWith(colocatedWith);
fact.setPartitionAttributes(pfact.create());
Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId)
.create(regionName);
assertNotNull(r);
} finally {
exp.remove();
exp1.remove();
}
}
public static void createPartitionedRegionWithCacheLoaderAndAsyncQueue(String regionName,
String asyncEventQueueId) {
AttributesFactory fact = new AttributesFactory();
PartitionAttributesFactory pfact = new PartitionAttributesFactory();
pfact.setTotalNumBuckets(16);
fact.setPartitionAttributes(pfact.create());
// set the CacheLoader implementation
fact.setCacheLoader(new MyCacheLoader());
Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId)
.create(regionName);
assertNotNull(r);
}
/**
* Create PartitionedRegion with 1 redundant copy
*/
public static void createPRWithRedundantCopyWithAsyncEventQueue(String regionName,
String asyncEventQueueId, Boolean offHeap) throws InterruptedException {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
CountDownLatch recoveryDone = new CountDownLatch(2);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
recoveryDone.countDown();
}
};
InternalResourceManager.setResourceObserver(observer);
try {
AttributesFactory fact = new AttributesFactory();
PartitionAttributesFactory pfact = new PartitionAttributesFactory();
pfact.setTotalNumBuckets(16);
pfact.setRedundantCopies(1);
fact.setPartitionAttributes(pfact.create());
fact.setOffHeap(offHeap);
Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId)
.create(regionName);
assertNotNull(r);
recoveryDone.await();
} finally {
exp.remove();
}
}
public static void createPartitionedRegionAccessorWithAsyncEventQueue(String regionName,
String asyncEventQueueId) {
AttributesFactory fact = new AttributesFactory();
PartitionAttributesFactory pfact = new PartitionAttributesFactory();
pfact.setTotalNumBuckets(16);
pfact.setLocalMaxMemory(0);
fact.setPartitionAttributes(pfact.create());
Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId)
.create(regionName);
// fact.create()).create(regionName);
assertNotNull(r);
}
protected static void createCache(Integer locPort) {
AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + locPort + "]");
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
}
public static void createCacheWithoutLocator(Integer mCastPort) {
AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "" + mCastPort);
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
}
public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
int secondaryQueueSize, final int eventsReceived, final int eventsQueued,
final int eventsDistributed) {
Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
AsyncEventQueue queue = null;
boolean isParallel = false;
for (AsyncEventQueue q : asyncQueues) {
isParallel = q.isParallel();
if (q.getId().equals(queueId)) {
queue = q;
break;
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
await()
.untilAsserted(
() -> assertEquals("Expected queue entries: " + queueSize + " but actual entries: "
+ statistics.getEventQueueSize(), queueSize, statistics.getEventQueueSize()));
if (isParallel) {
await().untilAsserted(() -> {
assertEquals(
"Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is "
+ statistics.getSecondaryEventQueueSize(),
secondaryQueueSize, statistics.getSecondaryEventQueueSize());
});
} else {
// for serial queue, evenvSecondaryQueueSize is not used
assertEquals(0, statistics.getSecondaryEventQueueSize());
}
assertEquals(queueSize, statistics.getEventQueueSize());
assertEquals(eventsReceived, statistics.getEventsReceived());
assertEquals(eventsQueued, statistics.getEventsQueued());
assert (statistics.getEventsDistributed() >= eventsDistributed);
}
public static void checkAsyncEventQueueConflatedStats(String asyncEventQueueId,
final int eventsConflated) {
Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
AsyncEventQueue queue = null;
for (AsyncEventQueue q : queues) {
if (q.getId().equals(asyncEventQueueId)) {
queue = q;
break;
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
}
public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId,
final int eventsReceived) {
Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
AsyncEventQueue queue = null;
for (AsyncEventQueue q : asyncEventQueues) {
if (q.getId().equals(asyncEventQueueId)) {
queue = q;
break;
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
assertEquals(eventsReceived, statistics.getEventsReceived());
assertEquals(eventsReceived,
(statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
+ statistics.getUnprocessedEventsRemovedByPrimary()));
}
public static void checkAsyncEventQueueBatchStats(String asyncQueueId, final int batches) {
Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
AsyncEventQueue queue = null;
for (AsyncEventQueue q : queues) {
if (q.getId().equals(asyncQueueId)) {
queue = q;
break;
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
assert (statistics.getBatchesDistributed() >= batches);
assertEquals(0, statistics.getBatchesRedistributed());
}
public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) {
Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
AsyncEventQueue queue = null;
for (AsyncEventQueue q : asyncQueues) {
if (q.getId().equals(asyncQueueId)) {
queue = q;
break;
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
assertEquals(events, (statistics.getUnprocessedEventsAddedBySecondary()
+ statistics.getUnprocessedTokensRemovedBySecondary()));
assertEquals(events, (statistics.getUnprocessedEventsRemovedByPrimary()
+ statistics.getUnprocessedTokensAddedByPrimary()));
}
public static void setRemoveFromQueueOnException(String senderId, boolean removeFromQueue) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
for (GatewaySender s : senders) {
if (s.getId().equals(senderId)) {
sender = s;
break;
}
}
assertNotNull(sender);
((AbstractGatewaySender) sender).setRemoveFromQueueOnException(removeFromQueue);
}
public static void unsetRemoveFromQueueOnException(String senderId) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
for (GatewaySender s : senders) {
if (s.getId().equals(senderId)) {
sender = s;
break;
}
}
assertNotNull(sender);
((AbstractGatewaySender) sender).setRemoveFromQueueOnException(false);
}
public static void waitForSenderToBecomePrimary(String senderId) {
Set<GatewaySender> senders = ((GemFireCacheImpl) cache).getAllGatewaySenders();
final GatewaySender sender = getGatewaySenderById(senders, senderId);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return sender != null && ((AbstractGatewaySender) sender).isPrimary();
}
@Override
public String description() {
return "Expected sender primary state to be true but is false";
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
for (GatewaySender s : senders) {
if (s.getId().equals(senderId)) {
return s;
}
}
// if none of the senders matches with the supplied senderid, return null
return null;
}
public static void createSender(String dsName, int remoteDsId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
GatewayEventFilter filter, boolean isManulaStart) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
try {
File persistentDirectory =
new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] {persistentDirectory};
if (isParallel) {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(true);
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManulaStart);
// set dispatcher threads
gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
((InternalGatewaySenderFactory) gateway)
.setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
eventFilter = filter;
gateway.addGatewayEventFilter(filter);
}
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}
gateway.setBatchConflationEnabled(isConflation);
gateway.create(dsName, remoteDsId);
} else {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManulaStart);
// set dispatcher threads
gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
((InternalGatewaySenderFactory) gateway)
.setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
eventFilter = filter;
gateway.addGatewayEventFilter(filter);
}
gateway.setBatchConflationEnabled(isConflation);
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}
gateway.create(dsName, remoteDsId);
}
} finally {
exln.remove();
}
}
public static int createReceiver(int locPort) {
AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + locPort + "]");
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
int port = AvailablePortHelper.getRandomAvailableTCPPort();
fact.setStartPort(port);
fact.setEndPort(port);
fact.setManualStart(true);
GatewayReceiver receiver = fact.create();
try {
receiver.start();
} catch (IOException e) {
e.printStackTrace();
fail("Test " + test.getName() + " failed to start GatewayRecevier on port " + port);
}
return port;
}
public static String makePath(String[] strings) {
StringBuilder sb = new StringBuilder();
for (final String string : strings) {
sb.append(string);
sb.append(File.separator);
}
return sb.toString();
}
/**
* Do a rebalance and verify balance was improved. If evictionPercentage > 0 (the default) then we
* have heapLRU and this can cause simulate and rebalance results to differ if eviction kicks in
* between. (See BUG 44899).
*/
public static void doRebalance() {
ResourceManager resMan = cache.getResourceManager();
boolean heapEviction = (resMan.getEvictionHeapPercentage() > 0);
RebalanceFactory factory = resMan.createRebalanceFactory();
try {
RebalanceResults simulateResults = null;
if (!heapEviction) {
LogWriterUtils.getLogWriter().info("Calling rebalance simulate");
RebalanceOperation simulateOp = factory.simulate();
simulateResults = simulateOp.getResults();
}
LogWriterUtils.getLogWriter().info("Starting rebalancing");
RebalanceOperation rebalanceOp = factory.start();
RebalanceResults rebalanceResults = rebalanceOp.getResults();
} catch (InterruptedException e) {
Assert.fail("Interrupted", e);
}
}
public static void doPuts(String regionName, int numPuts) {
IgnoredException exp1 =
IgnoredException.addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
try {
Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
for (long i = 0; i < numPuts; i++) {
r.put(i, i);
}
} finally {
exp1.remove();
exp2.remove();
}
// for (long i = 0; i < numPuts; i++) {
// r.destroy(i);
// }
}
public static void doHeavyPuts(String regionName, int numPuts) {
Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
for (long i = 0; i < numPuts; i++) {
r.put(i, new byte[1024 * 1024]);
}
}
/**
* To be used for CacheLoader related tests
*/
public static void doGets(String regionName, int numGets) {
Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
for (long i = 0; i < numGets; i++) {
r.get(i);
}
}
public static void doPutsFrom(String regionName, int from, int numPuts) {
Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
for (long i = from; i < numPuts; i++) {
r.put(i, i);
}
}
public static void doPutAll(String regionName, int numPuts, int size) {
Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
for (long i = 0; i < numPuts; i++) {
Map putAllMap = new HashMap();
for (long j = 0; j < size; j++) {
putAllMap.put((size * i) + j, i);
}
r.putAll(putAllMap, "putAllCallback");
putAllMap.clear();
}
}
public static void putGivenKeyValue(String regionName, Map keyValues) {
Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
for (Object key : keyValues.keySet()) {
r.put(key, keyValues.get(key));
}
}
public static void doNextPuts(String regionName, int start, int numPuts) {
// waitForSitesToUpdate();
IgnoredException exp =
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
try {
Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
for (long i = start; i < numPuts; i++) {
r.put(i, i);
}
} finally {
exp.remove();
}
}
public static void validateRegionSize(String regionName, final int regionSize) {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
try {
final Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return r.keySet().size() == regionSize;
}
@Override
public String description() {
return "Expected region entries: " + regionSize + " but actual entries: "
+ r.keySet().size() + " present region keyset " + r.keySet();
}
};
GeodeAwaitility.await().untilAsserted(wc);
} finally {
exp.remove();
exp1.remove();
}
}
/**
* Validate whether all the attributes set on AsyncEventQueueFactory are set on the sender
* underneath the AsyncEventQueue.
*/
public static void validateAsyncEventQueueAttributes(String asyncChannelId, int maxQueueMemory,
int batchSize, int batchTimeInterval, boolean isPersistent, String diskStoreName,
boolean isDiskSynchronous, boolean batchConflationEnabled) {
AsyncEventQueue theChannel = null;
Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncChannel : asyncEventChannels) {
if (asyncChannelId.equals(asyncChannel.getId())) {
theChannel = asyncChannel;
}
}
GatewaySender theSender = ((AsyncEventQueueImpl) theChannel).getSender();
assertEquals("maxQueueMemory", maxQueueMemory, theSender.getMaximumQueueMemory());
assertEquals("batchSize", batchSize, theSender.getBatchSize());
assertEquals("batchTimeInterval", batchTimeInterval, theSender.getBatchTimeInterval());
assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
assertEquals("isDiskSynchronous", isDiskSynchronous, theSender.isDiskSynchronous());
assertEquals("batchConflation", batchConflationEnabled, theSender.isBatchConflationEnabled());
}
/**
* Validate whether all the attributes set on AsyncEventQueueFactory are set on the sender
* underneath the AsyncEventQueue.
*/
public static void validateConcurrentAsyncEventQueueAttributes(String asyncChannelId,
int maxQueueMemory, int batchSize, int batchTimeInterval, boolean isPersistent,
String diskStoreName, boolean isDiskSynchronous, boolean batchConflationEnabled,
int dispatcherThreads, OrderPolicy policy) {
AsyncEventQueue theChannel = null;
Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncChannel : asyncEventChannels) {
if (asyncChannelId.equals(asyncChannel.getId())) {
theChannel = asyncChannel;
}
}
GatewaySender theSender = ((AsyncEventQueueImpl) theChannel).getSender();
assertEquals("maxQueueMemory", maxQueueMemory, theSender.getMaximumQueueMemory());
assertEquals("batchSize", batchSize, theSender.getBatchSize());
assertEquals("batchTimeInterval", batchTimeInterval, theSender.getBatchTimeInterval());
assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
assertEquals("isDiskSynchronous", isDiskSynchronous, theSender.isDiskSynchronous());
assertEquals("batchConflation", batchConflationEnabled, theSender.isBatchConflationEnabled());
assertEquals("dispatcherThreads", dispatcherThreads, theSender.getDispatcherThreads());
assertEquals("orderPolicy", policy, theSender.getOrderPolicy());
}
public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
AsyncEventListener theListener = null;
Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncQueue : asyncEventQueues) {
if (asyncQueueId.equals(asyncQueue.getId())) {
theListener = asyncQueue.getAsyncEventListener();
}
}
final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
assertNotNull(eventsMap);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return eventsMap.size() == expectedSize;
}
@Override
public String description() {
return "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size();
}
};
GeodeAwaitility.await().untilAsserted(wc); // TODO:Yogs
}
public static void validateAsyncEventForOperationDetail(String asyncQueueId,
final int expectedSize, boolean isLoad, boolean isPutAll) {
AsyncEventListener theListener = null;
Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncQueue : asyncEventQueues) {
if (asyncQueueId.equals(asyncQueue.getId())) {
theListener = asyncQueue.getAsyncEventListener();
}
}
final Map eventsMap = ((MyAsyncEventListener_CacheLoader) theListener).getEventsMap();
assertNotNull(eventsMap);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return eventsMap.size() == expectedSize;
}
@Override
public String description() {
return "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size();
}
};
GeodeAwaitility.await().untilAsserted(wc); // TODO:Yogs
Collection values = eventsMap.values();
for (final Object value : values) {
AsyncEvent asyncEvent = (AsyncEvent) value;
if (isLoad) {
assertTrue(asyncEvent.getOperation().isLoad());
}
if (isPutAll) {
assertTrue(asyncEvent.getOperation().isPutAll());
}
}
}
public static void validateCustomAsyncEventListener(String asyncQueueId, final int expectedSize) {
AsyncEventListener theListener = null;
Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncQueue : asyncEventQueues) {
if (asyncQueueId.equals(asyncQueue.getId())) {
theListener = asyncQueue.getAsyncEventListener();
}
}
final Map eventsMap = ((CustomAsyncEventListener) theListener).getEventsMap();
assertNotNull(eventsMap);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return eventsMap.size() == expectedSize;
}
@Override
public String description() {
return "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size();
}
};
GeodeAwaitility.await().untilAsserted(wc); // TODO:Yogs
for (final AsyncEvent event : (Iterable<AsyncEvent>) eventsMap.values()) {
assertTrue("possibleDuplicate should be true for event: " + event,
event.getPossibleDuplicate());
}
}
public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
AsyncEventQueue theAsyncEventQueue = cache.getAsyncEventQueue(asyncQueueId);
final GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue).getSender();
if (sender.isParallel()) {
final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
int size = 0;
for (RegionQueue q : queues) {
size += q.size();
}
return size == 0;
}
@Override
public String description() {
int size = 0;
for (RegionQueue q : queues) {
size += q.size();
}
return "Expected queue size to be : " + 0 + " but actual entries: " + size;
}
};
GeodeAwaitility.await().untilAsserted(wc);
} else {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
int size = 0;
for (RegionQueue q : queues) {
size += q.size();
}
return size == 0;
}
@Override
public String description() {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
int size = 0;
for (RegionQueue q : queues) {
size += q.size();
}
return "Expected queue size to be : " + 0 + " but actual entries: " + size;
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
}
public static void verifyAsyncEventListenerForPossibleDuplicates(String asyncEventQueueId,
Set<Integer> bucketIds, int batchSize) {
AsyncEventListener theListener = null;
Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncQueue : asyncEventQueues) {
if (asyncEventQueueId.equals(asyncQueue.getId())) {
theListener = asyncQueue.getAsyncEventListener();
}
}
final Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap =
((MyAsyncEventListener2) theListener).getBucketToEventsMap();
assertNotNull(bucketToEventsMap);
assertTrue(bucketIds.size() > 1);
for (int bucketId : bucketIds) {
List<GatewaySenderEventImpl> eventsForBucket = bucketToEventsMap.get(bucketId);
LogWriterUtils.getLogWriter()
.info("Events for bucket: " + bucketId + " is " + eventsForBucket);
assertNotNull(eventsForBucket);
for (int i = 0; i < batchSize; i++) {
GatewaySenderEventImpl senderEvent = eventsForBucket.get(i);
assertTrue(senderEvent.getPossibleDuplicate());
}
}
}
public static void verifySubstitutionFilterInvocations(String asyncEventQueueId,
int expectedNumInvocations) {
AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
assertNotNull(queue);
// Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times
MyGatewayEventSubstitutionFilter filter =
(MyGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
assertNotNull(filter);
assertEquals(expectedNumInvocations, filter.getNumInvocations());
// Verify the AsyncEventListener has received the substituted values
MyAsyncEventListener listener = (MyAsyncEventListener) queue.getAsyncEventListener();
final Map eventsMap = listener.getEventsMap();
assertNotNull(eventsMap);
assertEquals(expectedNumInvocations, eventsMap.size());
for (final Object o : eventsMap.entrySet()) {
Map.Entry<Integer, String> entry = (Map.Entry<Integer, String>) o;
assertEquals(MyGatewayEventSubstitutionFilter.SUBSTITUTION_PREFIX + entry.getKey(),
entry.getValue());
}
}
public static void verifySubstitutionFilterToDataInvocations(String asyncEventQueueId,
int expectedToDataInvoations) {
AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId);
assertNotNull(queue);
// Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times
SizeableGatewayEventSubstitutionFilter filter =
(SizeableGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter();
assertNotNull(filter);
assertEquals(expectedToDataInvoations, filter.getNumToDataInvocations());
}
public static AsyncEventListener getAsyncEventListener(String asyncEventQueueId) {
AsyncEventListener theListener = null;
Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncQueue : asyncEventQueues) {
if (asyncEventQueueId.equals(asyncQueue.getId())) {
return asyncQueue.getAsyncEventListener();
}
}
return null;
}
public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
AsyncEventListener theListener = getAsyncEventListener(asyncEventQueueId);
final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
assertNotNull(eventsMap);
LogWriterUtils.getLogWriter().info("The events map size is " + eventsMap.size());
return eventsMap.size();
}
public static int getAsyncEventQueueSize(String asyncEventQueueId) {
AsyncEventQueue theQueue = null;
Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
for (AsyncEventQueue asyncQueue : asyncEventQueues) {
if (asyncEventQueueId.equals(asyncQueue.getId())) {
theQueue = asyncQueue;
}
}
assertNotNull(theQueue);
return theQueue.size();
}
public static String getRegionFullPath(String regionName) {
final Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
return r.getFullPath();
}
public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
return region.getDataStore().getAllLocalPrimaryBucketIds();
}
public static void addCacheListenerAndCloseCache(String regionName) {
final Region region = cache.getRegion(SEPARATOR + regionName);
assertNotNull(region);
CacheListenerAdapter cl = new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
if ((Long) event.getKey() == 900) {
cache.getLogger().fine(" Gateway sender is killed by a test");
cache.close();
cache.getDistributedSystem().disconnect();
}
}
};
region.getAttributesMutator().addCacheListener(cl);
}
public static Boolean killSender(String senderId) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
IgnoredException exp =
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
IgnoredException exp1 =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
Set<GatewaySender> senders = cache.getGatewaySenders();
AbstractGatewaySender sender = null;
for (GatewaySender s : senders) {
if (s.getId().equals(senderId)) {
sender = (AbstractGatewaySender) s;
break;
}
}
if (sender.isPrimary()) {
LogWriterUtils.getLogWriter().info("Gateway sender is killed by a test");
cache.getDistributedSystem().disconnect();
return Boolean.TRUE;
}
return Boolean.FALSE;
} finally {
exp.remove();
exp1.remove();
exln.remove();
}
}
public static Boolean killAsyncEventQueue(String asyncQueueId) {
Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
AsyncEventQueueImpl queue = null;
for (AsyncEventQueue q : queues) {
if (q.getId().equals(asyncQueueId)) {
queue = (AsyncEventQueueImpl) q;
break;
}
}
if (queue.isPrimary()) {
LogWriterUtils.getLogWriter().info("AsyncEventQueue is killed by a test");
cache.getDistributedSystem().disconnect();
return Boolean.TRUE;
}
return Boolean.FALSE;
}
public static void killSender() {
LogWriterUtils.getLogWriter().info("Gateway sender is going to be killed by a test");
cache.close();
cache.getDistributedSystem().disconnect();
LogWriterUtils.getLogWriter().info("Gateway sender is killed by a test");
}
public static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {
private final Set discoveredLocators = new HashSet();
private final Set removedLocators = new HashSet();
@Override
public synchronized void locatorsDiscovered(List locators) {
discoveredLocators.addAll(locators);
notifyAll();
}
@Override
public synchronized void locatorsRemoved(List locators) {
removedLocators.addAll(locators);
notifyAll();
}
public boolean waitForDiscovery(InetSocketAddress locator, long time)
throws InterruptedException {
return waitFor(discoveredLocators, locator, time);
}
public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException {
return waitFor(removedLocators, locator, time);
}
private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time)
throws InterruptedException {
long remaining = time;
long endTime = System.currentTimeMillis() + time;
while (!set.contains(locator) && remaining >= 0) {
wait(remaining);
remaining = endTime - System.currentTimeMillis();
}
return set.contains(locator);
}
public synchronized Set getDiscovered() {
return new HashSet(discoveredLocators);
}
public synchronized Set getRemoved() {
return new HashSet(removedLocators);
}
}
@Override
public final void postTearDown() throws Exception {
cleanupVM();
vm0.invoke(AsyncEventQueueTestBase::cleanupVM);
vm1.invoke(AsyncEventQueueTestBase::cleanupVM);
vm2.invoke(AsyncEventQueueTestBase::cleanupVM);
vm3.invoke(AsyncEventQueueTestBase::cleanupVM);
vm4.invoke(AsyncEventQueueTestBase::cleanupVM);
}
public static void cleanupVM() throws IOException {
closeCache();
JUnit4DistributedTestCase.cleanDiskDirs();
}
public static void closeCache() throws IOException {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
cache = null;
} else {
AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
if (test.isConnectedToDS()) {
test.getSystem().disconnect();
}
}
}
public static void shutdownLocator() {
AsyncEventQueueTestBase test = new AsyncEventQueueTestBase();
test.getSystem().disconnect();
}
public static void printEventListenerMap() {
((MyGatewaySenderEventListener) eventListener1).printMap();
}
@Override
public Properties getDistributedSystemProperties() {
// For now all WANTestBase tests allocate off-heap memory even though
// many of them never use it.
// The problem is that WANTestBase has static methods that create instances
// of WANTestBase (instead of instances of the subclass). So we can't override
// this method so that only the off-heap subclasses allocate off heap memory.
Properties props = new Properties();
props.setProperty(OFF_HEAP_MEMORY_SIZE, "300m");
return props;
}
/**
* Returns true if the test should create off-heap regions. OffHeap tests should over-ride this
* method and return false.
*/
public boolean isOffHeap() {
return false;
}
private static class MyFixedPartitionResolver implements FixedPartitionResolver {
private final List<String> allPartitions;
public MyFixedPartitionResolver(final List<String> allPartitions) {
this.allPartitions = allPartitions;
}
@Override
public String getPartitionName(final EntryOperation opDetails,
@Deprecated final Set targetPartitions) {
int hash = Math.abs(opDetails.getKey().hashCode() % allPartitions.size());
return allPartitions.get(hash);
}
@Override
public Object getRoutingObject(final EntryOperation opDetails) {
return opDetails.getKey();
}
@Override
public String getName() {
return getClass().getName();
}
@Override
public void close() {
}
}
}
class MyAsyncEventListener_CacheLoader implements AsyncEventListener {
private final Map eventsMap;
public MyAsyncEventListener_CacheLoader() {
eventsMap = new ConcurrentHashMap();
}
@Override
public boolean processEvents(List<AsyncEvent> events) {
for (AsyncEvent event : events) {
eventsMap.put(event.getKey(), event);
}
return true;
}
public Map getEventsMap() {
return eventsMap;
}
@Override
public void close() {}
}
class MyCacheLoader implements CacheLoader, Declarable {
@Override
public Object load(LoaderHelper helper) {
Long key = (Long) helper.getKey();
return "LoadedValue" + "_" + key;
}
@Override
public void close() {}
@Override
public void init(Properties props) {}
}
class SizeableGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
private final AtomicInteger numToDataInvocations = new AtomicInteger();
protected static final String SUBSTITUTION_PREFIX = "substituted_";
@Override
public Object getSubstituteValue(EntryEvent event) {
return new GatewayEventSubstituteObject(this, SUBSTITUTION_PREFIX + event.getKey());
}
@Override
public void close() {}
@Override
public void init(Properties properties) {}
protected void incNumToDataInvocations() {
numToDataInvocations.incrementAndGet();
}
protected int getNumToDataInvocations() {
return numToDataInvocations.get();
}
}
class GatewayEventSubstituteObject implements DataSerializable, Sizeable {
private String id;
private final SizeableGatewayEventSubstitutionFilter filter;
public GatewayEventSubstituteObject(SizeableGatewayEventSubstitutionFilter filter, String id) {
this.filter = filter;
this.id = id;
}
public String getId() {
return id;
}
@Override
public void toData(DataOutput out) throws IOException {
filter.incNumToDataInvocations();
DataSerializer.writeString(id, out);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
id = DataSerializer.readString(in);
}
@Override
public int getSizeInBytes() {
return 0;
}
public String toString() {
return getClass().getSimpleName() + "[" + "id="
+ id + "]";
}
}
class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable {
private final AtomicInteger numInvocations = new AtomicInteger();
protected static final String SUBSTITUTION_PREFIX = "substituted_";
@Override
public Object getSubstituteValue(EntryEvent event) {
numInvocations.incrementAndGet();
return SUBSTITUTION_PREFIX + event.getKey();
}
@Override
public void close() {}
@Override
public void init(Properties properties) {}
protected int getNumInvocations() {
return numInvocations.get();
}
}