/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.wan;

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_CIPHERS;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_ENABLED;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_KEYSTORE;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_KEYSTORE_PASSWORD;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_KEYSTORE_TYPE;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_PROTOCOLS;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_REQUIRE_AUTHENTICATION;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_TRUSTSTORE;
import static org.apache.geode.distributed.ConfigurationProperties.GATEWAY_SSL_TRUSTSTORE_PASSWORD;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_HTTP_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMORY_SIZE;
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.management.ObjectName;

import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.ConnectionStats;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallbackAdapter;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListener;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache.wan.internal.GatewaySenderEventRemoteDispatcher;
import org.apache.geode.cache30.CacheTestCase;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCacheBuilder;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PoolStats;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Customer;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.AsyncEventQueueMXBean;
import org.apache.geode.management.DistributedSystemMXBean;
import org.apache.geode.management.GatewayReceiverMXBean;
import org.apache.geode.management.GatewaySenderMXBean;
import org.apache.geode.management.MBeanUtil;
import org.apache.geode.management.ManagementService;
import org.apache.geode.management.RegionMXBean;
import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.pdx.SimpleClass;
import org.apache.geode.pdx.SimpleClass1;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestCase;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.WanTest;

@SuppressWarnings("deprecation")
@Category({WanTest.class})
public class WANTestBase extends DistributedTestCase {

  protected static Cache cache;
  protected static Region<?, ?> region;

  protected static PartitionedRegion customerRegion;
  protected static PartitionedRegion orderRegion;
  protected static PartitionedRegion shipmentRegion;

  protected static final String customerRegionName = "CUSTOMER";
  protected static final String orderRegionName = "ORDER";
  protected static final String shipmentRegionName = "SHIPMENT";

  protected static VM vm0;
  protected static VM vm1;
  protected static VM vm2;
  protected static VM vm3;
  protected static VM vm4;
  protected static VM vm5;
  protected static VM vm6;
  protected static VM vm7;

  protected static QueueListener<Object, Object> listener1;
  protected static QueueListener<Object, Object> listener2;

  protected static AsyncEventListener eventListener1;
  protected static AsyncEventListener eventListener2;

  private static final long MAX_WAIT = 10000;

  protected static GatewayEventFilter eventFilter;

  protected static List<Integer> dispatcherThreads = new ArrayList<>(Arrays.asList(1, 3, 5));
  // this will be set for each test method run with one of the values from above list
  protected static int numDispatcherThreadsForTheRun = 1;

  private static final Logger logger = LogService.getLogger();

  @BeforeClass
  public static void beforeClassWANTestBase() {
    vm0 = getHost(0).getVM(0);
    vm1 = getHost(0).getVM(1);
    vm2 = getHost(0).getVM(2);
    vm3 = getHost(0).getVM(3);
    vm4 = getHost(0).getVM(4);
    vm5 = getHost(0).getVM(5);
    vm6 = getHost(0).getVM(6);
    vm7 = getHost(0).getVM(7);
  }

  @Before
  public void setUpWANTestBase() throws Exception {
    shuffleNumDispatcherThreads();
    Invoke.invokeInEveryVM(() -> setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0)));
    addIgnoredException("Connection refused");
    addIgnoredException("Software caused connection abort");
    addIgnoredException("Connection reset");
    postSetUpWANTestBase();
  }

  protected void postSetUpWANTestBase() throws Exception {
    // nothing
  }

  public static void shuffleNumDispatcherThreads() {
    Collections.shuffle(dispatcherThreads);
  }

  public static void setNumDispatcherThreadsForTheRun(int numThreads) {
    numDispatcherThreadsForTheRun = numThreads;
  }

  public static void stopOldLocator() {
    if (Locator.hasLocator()) {
      Locator.getLocator().stop();
    }
  }

  public static void createLocator(int dsId, int port, Set<String> localLocatorsList,
      Set<String> remoteLocatorsList) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
    StringBuilder localLocatorBuffer = new StringBuilder(localLocatorsList.toString());
    localLocatorBuffer.deleteCharAt(0);
    localLocatorBuffer.deleteCharAt(localLocatorBuffer.lastIndexOf("]"));
    String localLocator = localLocatorBuffer.toString();
    localLocator = localLocator.replace(" ", "");

    props.setProperty(LOCATORS, localLocator);
    props.setProperty(START_LOCATOR,
        "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
    StringBuilder remoteLocatorBuffer = new StringBuilder(remoteLocatorsList.toString());
    remoteLocatorBuffer.deleteCharAt(0);
    remoteLocatorBuffer.deleteCharAt(remoteLocatorBuffer.lastIndexOf("]"));
    String remoteLocator = remoteLocatorBuffer.toString();
    remoteLocator = remoteLocator.replace(" ", "");
    props.setProperty(REMOTE_LOCATORS, remoteLocator);
    test.startLocatorDistributedSystem(props);
  }

  private void startLocator(int dsId, int locatorPort, int startLocatorPort, int remoteLocPort,
      boolean startServerLocator) {
    Properties props = getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
    props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
    props.setProperty(START_LOCATOR, "localhost[" + startLocatorPort + "],server="
        + startServerLocator + ",peer=true,hostname-for-clients=localhost");
    if (remoteLocPort != -1) {
      props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
    }
    startLocatorDistributedSystem(props);
  }

  private void startLocatorDistributedSystem(Properties props) {
    // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
    System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
    try {
      getSystem(props);
    } finally {
      System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
    }
  }

  public static Integer createFirstLocatorWithDSId(int dsId) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, port, port, -1, true);
    return port;
  }

  public static Integer createFirstPeerLocator(int dsId) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, port, port, -1, false);
    return port;
  }

  public static Integer createSecondLocator(int dsId, int locatorPort) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, locatorPort, port, -1, true);
    return port;
  }

  public static Integer createSecondPeerLocator(int dsId, int locatorPort) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, locatorPort, port, -1, false);
    return port;
  }

  public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, port, port, remoteLocPort, true);
    return port;
  }

  public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) {
    WANTestBase test = new WANTestBase();
    test.startLocator(dsId, oldPort, oldPort, remoteLocPort, true);
  }


  public static Integer createFirstRemotePeerLocator(int dsId, int remoteLocPort) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, port, port, remoteLocPort, false);
    return port;
  }

  public static Integer createSecondRemoteLocator(int dsId, int localPort, int remoteLocPort) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, localPort, port, remoteLocPort, true);
    return port;
  }

  public static Integer createSecondRemoteLocatorWithAPI(int dsId, int localPort, int remoteLocPort,
      String hostnameForClients) throws IOException {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
    props.setProperty(LOCATORS, "localhost[" + localPort + "]");
    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
    Locator locator = Locator.startLocatorAndDS(0, null, null, props,
        true, true, hostnameForClients);
    return locator.getPort();
  }

  public static Integer createSecondRemotePeerLocator(int dsId, int localPort, int remoteLocPort) {
    stopOldLocator();
    WANTestBase test = new WANTestBase();
    int port = getRandomAvailableTCPPort();
    test.startLocator(dsId, localPort, port, remoteLocPort, false);
    return port;
  }

  public static int createReceiverInSecuredCache() {
    GatewayReceiverFactory fact = WANTestBase.cache.createGatewayReceiverFactory();
    int port = getRandomAvailableTCPPort();
    fact.setStartPort(port);
    fact.setEndPort(port);
    fact.setManualStart(true);
    GatewayReceiver receiver = fact.create();
    try {
      receiver.start();
    } catch (IOException e) {
      e.printStackTrace();
      fail("Failed to start GatewayReceiver on port " + port, e);
    }
    return port;
  }

  public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    IgnoredException exp2 =
        addIgnoredException(GatewaySenderException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }

      fact.setOffHeap(offHeap);
      fact.create(regionName);
    } finally {
      exp.remove();
      exp1.remove();
      exp2.remove();
    }
  }

  public static void createReplicatedProxyRegion(String regionName, String senderIds,
      Boolean offHeap) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    IgnoredException exp2 =
        addIgnoredException(GatewaySenderException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }

      fact.setOffHeap(offHeap);
      fact.create(regionName);
    } finally {
      exp.remove();
      exp1.remove();
      exp2.remove();
    }
  }

  public static void createNormalRegion(String regionName, String senderIds) {
    RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.LOCAL);
    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }

    fact.create(regionName);
  }

  public static void createPersistentReplicatedRegion(String regionName, String senderIds,
      Boolean offHeap) {
    RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT);
    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }

    fact.setOffHeap(offHeap);
    fact.create(regionName);
  }

  public static void createReplicatedRegionWithAsyncEventQueue(String regionName,
      String asyncQueueIds, Boolean offHeap) {
    IgnoredException exp1 =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
      if (asyncQueueIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String asyncQueueId = tokenizer.nextToken();
          fact.addAsyncEventQueueId(asyncQueueId);
        }
      }

      fact.setOffHeap(offHeap);
      fact.create(regionName);
    } finally {
      exp1.remove();
    }
  }

  public static void createReplicatedRegionWithSenderAndAsyncEventQueue(String regionName,
      String senderIds, String asyncChannelId, Boolean offHeap) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }

      fact.setOffHeap(offHeap);
      fact.addAsyncEventQueueId(asyncChannelId);
      fact.create(regionName);
    } finally {
      exp.remove();
    }
  }

  public static void createReplicatedRegion(String regionName, String senderIds, Scope scope,
      DataPolicy policy, Boolean offHeap) {
    createReplicatedRegion(regionName, senderIds, scope, policy, offHeap, false, true);
  }

  public static void createReplicatedRegion(String regionName, String senderIds, Scope scope,
      DataPolicy policy, Boolean offHeap, boolean statisticsEnabled,
      boolean concurrencyChecksEnabled) {
    RegionFactory<?, ?> fact = cache.createRegionFactory();
    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }

    fact.setDataPolicy(policy);
    fact.setScope(scope);
    fact.setOffHeap(offHeap);
    fact.setStatisticsEnabled(statisticsEnabled);
    fact.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
    fact.create(regionName);
  }

  public static void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
      String diskStoreName, boolean isDiskSynchronous) {

    if (diskStoreName != null) {
      File directory = new File(
          asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
      directory.mkdir();
      File[] dirs1 = new File[] {directory};
      DiskStoreFactory dsf = cache.createDiskStoreFactory();
      dsf.setDiskDirs(dirs1);
      dsf.create(diskStoreName);
    }

    AsyncEventListener asyncEventListener = new MyAsyncEventListener();

    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
    factory.setBatchSize(batchSize);
    factory.setPersistent(isPersistent);
    factory.setDiskStoreName(diskStoreName);
    factory.setDiskSynchronous(isDiskSynchronous);
    factory.setBatchConflationEnabled(isConflation);
    factory.setMaximumQueueMemory(maxMemory);
    factory.setParallel(isParallel);
    // set dispatcher threads
    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
    factory.create(asyncChannelId, asyncEventListener);
  }

  public static void createPartitionedRegion(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
    createPartitionedRegion(regionName, senderIds, redundantCopies, totalNumBuckets, offHeap,
        RegionShortcut.PARTITION);
  }

  public static void createPartitionedRegion(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, RegionShortcut shortcut) {
    createPartitionedRegion(regionName, senderIds, redundantCopies, totalNumBuckets, offHeap,
        shortcut, false, true);
  }

  public static void createPartitionedRegion(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, RegionShortcut shortcut,
      boolean statisticsEnabled, boolean concurrencyChecksEnabled) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(PartitionOfflineException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(shortcut);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }
      PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
      pfact.setTotalNumBuckets(totalNumBuckets);
      pfact.setRedundantCopies(redundantCopies);
      pfact.setRecoveryDelay(0);
      fact.setPartitionAttributes(pfact.create());
      fact.setOffHeap(offHeap);
      fact.setStatisticsEnabled(statisticsEnabled);
      fact.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
      fact.create(regionName);
    } finally {
      exp.remove();
      exp1.remove();
    }
  }

  // TODO:OFFHEAP: add offheap flavor
  public static void createPartitionedRegionWithPersistence(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(PartitionOfflineException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }
      PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
      pfact.setTotalNumBuckets(totalNumBuckets);
      pfact.setRedundantCopies(redundantCopies);
      pfact.setRecoveryDelay(0);
      fact.setPartitionAttributes(pfact.create());
      fact.create(regionName);
    } finally {
      exp.remove();
      exp1.remove();
    }
  }

  public static void createColocatedPartitionedRegion(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(PartitionOfflineException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }

      PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
      pfact.setTotalNumBuckets(totalNumBuckets);
      pfact.setRedundantCopies(redundantCopies);
      pfact.setRecoveryDelay(0);
      pfact.setColocatedWith(colocatedWith);
      fact.setPartitionAttributes(pfact.create());
      fact.create(regionName);
    } finally {
      exp.remove();
      exp1.remove();
    }
  }

  public static void addSenderThroughAttributesMutator(String regionName, String senderIds) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    AttributesMutator<?, ?> mutator = r.getAttributesMutator();
    mutator.addGatewaySenderId(senderIds);
  }

  public static void addAsyncEventQueueThroughAttributesMutator(String regionName, String queueId) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    AttributesMutator<?, ?> mutator = r.getAttributesMutator();
    mutator.addAsyncEventQueueId(queueId);
  }

  public static void createPartitionedRegionAsAccessor(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets) {
    RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION_PROXY);
    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }
    PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
    pfact.setTotalNumBuckets(totalNumBuckets);
    pfact.setRedundantCopies(redundantCopies);
    fact.setPartitionAttributes(pfact.create());
    fact.create(regionName);
  }

  public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName,
      String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap) {
    RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION);
    if (serialSenderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }
    if (parallelSenderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }
    PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
    pfact.setColocatedWith(colocatedWith);
    fact.setPartitionAttributes(pfact.create());
    fact.setOffHeap(offHeap);
    fact.create(regionName);
  }

  public static void createPersistentPartitionedRegion(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {

    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(PartitionOfflineException.class.getName());
    try {

      RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }
      PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
      pfact.setTotalNumBuckets(totalNumBuckets);
      pfact.setRedundantCopies(redundantCopies);
      fact.setPartitionAttributes(pfact.create());
      fact.setOffHeap(offHeap);
      fact.create(regionName);
    } finally {
      exp.remove();
      exp1.remove();
    }
  }

  public static void createCustomerOrderShipmentPartitionedRegion(String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
    createCustomerOrderShipmentPartitionedRegion(senderIds, redundantCopies, totalNumBuckets,
        offHeap, RegionShortcut.PARTITION);
  }

  public static void createCustomerOrderShipmentPartitionedRegion(String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap,
      RegionShortcut regionShortcut) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      RegionFactory<?, ?> fact = cache.createRegionFactory(regionShortcut);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }

      PartitionAttributesFactory<Object, Object> paf = new PartitionAttributesFactory<>();
      paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets)
          .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
      fact.setPartitionAttributes(paf.create());
      fact.setOffHeap(offHeap);
      customerRegion =
          (PartitionedRegion) fact.create(customerRegionName);
      logger.info("Partitioned Region CUSTOMER created Successfully :" + customerRegion.toString());

      paf = new PartitionAttributesFactory<>();
      paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets)
          .setColocatedWith(customerRegionName)
          .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
      fact = cache.createRegionFactory(regionShortcut);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }
      fact.setPartitionAttributes(paf.create());
      fact.setOffHeap(offHeap);
      orderRegion =
          (PartitionedRegion) fact.create(orderRegionName);
      logger.info("Partitioned Region ORDER created Successfully :" + orderRegion.toString());

      paf = new PartitionAttributesFactory<>();
      paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets)
          .setColocatedWith(orderRegionName)
          .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
      fact = cache.createRegionFactory(regionShortcut);
      if (senderIds != null) {
        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
        while (tokenizer.hasMoreTokens()) {
          String senderId = tokenizer.nextToken();
          fact.addGatewaySenderId(senderId);
        }
      }
      fact.setPartitionAttributes(paf.create());
      fact.setOffHeap(offHeap);
      shipmentRegion =
          (PartitionedRegion) fact.create(shipmentRegionName);
      logger.info("Partitioned Region SHIPMENT created Successfully :" + shipmentRegion.toString());
    } finally {
      exp.remove();
    }
  }

  public static void createColocatedPartitionedRegions(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
    RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION);
    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }
    PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
    pfact.setTotalNumBuckets(totalNumBuckets);
    pfact.setRedundantCopies(redundantCopies);
    fact.setPartitionAttributes(pfact.create());
    fact.setOffHeap(offHeap);
    Region<?, ?> r = fact.create(regionName);

    pfact.setColocatedWith(r.getName());
    fact.setPartitionAttributes(pfact.create());
    fact.setOffHeap(offHeap);
    fact.create(regionName + "_child1");

    fact.create(regionName + "_child2");
  }

  public static void createColocatedPartitionedRegions2(String regionName, String senderIds,
      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
    RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION);
    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }
    PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>();
    pfact.setTotalNumBuckets(totalNumBuckets);
    pfact.setRedundantCopies(redundantCopies);
    fact.setPartitionAttributes(pfact.create());
    fact.setOffHeap(offHeap);
    Region<?, ?> r = fact.create(regionName);

    fact = cache.createRegionFactory(RegionShortcut.PARTITION);
    pfact.setColocatedWith(r.getName());
    fact.setPartitionAttributes(pfact.create());
    fact.setOffHeap(offHeap);
    fact.create(regionName + "_child1");

    fact.create(regionName + "_child2");
  }

  public static void createCacheInVMs(Integer locatorPort, VM... vms) {
    for (VM vm : vms) {
      vm.invoke(() -> createCache(locatorPort));
    }
  }

  public static void addListenerToSleepAfterCreateEvent(int milliSeconds, final String regionName) {
    cache.getRegion(regionName).getAttributesMutator()
        .addCacheListener(new CacheListenerAdapter<Object, Object>() {
          @Override
          public void afterCreate(final EntryEvent<Object, Object> event) {
            try {
              Thread.sleep(milliSeconds);
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          }
        });
  }

  public static void createCache(Integer locPort) {
    createCache(false, locPort);
  }

  public static void createManagementCache(Integer locPort) {
    createCache(true, locPort);
  }

  public static void createCacheConserveSockets(Boolean conserveSockets, Integer locPort) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
    props.setProperty(CONSERVE_SOCKETS, conserveSockets.toString());
    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);
  }

  protected static void createCache(boolean management, Integer locPort) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    if (management) {
      props.setProperty(JMX_MANAGER, "true");
      props.setProperty(JMX_MANAGER_START, "false");
      props.setProperty(JMX_MANAGER_PORT, "0");
      props.setProperty(JMX_MANAGER_HTTP_PORT, "0");
    }
    props.setProperty(MCAST_PORT, "0");
    String logLevel = System.getProperty(LOG_LEVEL, "info");
    props.setProperty(LOG_LEVEL, logLevel);
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);
  }

  protected static void createCacheWithSSL(Integer locPort) {
    WANTestBase test = new WANTestBase();

    boolean gatewaySslenabled = true;
    String gatewaySslprotocols = "any";
    String gatewaySslciphers = "any";
    boolean gatewaySslRequireAuth = true;

    Properties gemFireProps = test.getDistributedSystemProperties();
    gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
    gemFireProps.put(GATEWAY_SSL_ENABLED, String.valueOf(gatewaySslenabled));
    gemFireProps.put(GATEWAY_SSL_PROTOCOLS, gatewaySslprotocols);
    gemFireProps.put(GATEWAY_SSL_CIPHERS, gatewaySslciphers);
    gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, String.valueOf(gatewaySslRequireAuth));

    gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks");
    // this uses the default.keystore which is all jdk compliant in geode-dunit module
    gemFireProps.put(GATEWAY_SSL_KEYSTORE, createTempFileFromResource(WANTestBase.class,
        "/org/apache/geode/cache/client/internal/default.keystore").getAbsolutePath());
    gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password");
    gemFireProps.put(GATEWAY_SSL_TRUSTSTORE, createTempFileFromResource(WANTestBase.class,
        "/org/apache/geode/cache/client/internal/default.keystore").getAbsolutePath());
    gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password");

    gemFireProps.setProperty(MCAST_PORT, "0");
    gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]");

    logger.info("Starting cache ds with following properties \n" + gemFireProps);

    InternalDistributedSystem ds = test.getSystem(gemFireProps);
    cache = CacheFactory.create(ds);
  }

  public static void createCache_PDX(Integer locPort) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
    InternalDistributedSystem ds = test.getSystem(props);

    cache = new InternalCacheBuilder(props)
        .setPdxPersistent(true)
        .setPdxDiskStore("PDX_TEST")
        .setIsExistingOk(false)
        .create(ds);

    File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
    DiskStoreFactory dsf = cache.createDiskStoreFactory();
    File[] dirs1 = new File[] {pdxDir};
    dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
  }

  public static void createCache(Integer locPort1, Integer locPort2) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort1 + "],localhost[" + locPort2 + "]");
    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);
  }

  public static void createCacheWithoutLocator(Integer mCastPort) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "" + mCastPort);
    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);
  }

  /**
   * Method that creates a cache server
   *
   * @return Integer Port on which the server is started.
   */
  public static Integer createCacheServer() {
    CacheServer server1 = cache.addCacheServer();
    server1.setPort(0);
    try {
      server1.start();
    } catch (IOException e) {
      fail("Failed to start the Server", e);
    }
    assertThat(server1.isRunning()).isTrue();

    return server1.getPort();
  }

  /**
   * Returns a Map that contains the count for number of cache server and number of Receivers.
   *
   */
  public static Map<String, Integer> getCacheServers() {
    List<CacheServer> cacheServers = cache.getCacheServers();

    Map<String, Integer> cacheServersMap = new HashMap<>();
    int bridgeServerCounter = 0;
    int receiverServerCounter = 0;
    for (final CacheServer server : cacheServers) {
      CacheServerImpl cacheServer = (CacheServerImpl) server;
      if (cacheServer.getAcceptor().isGatewayReceiver()) {
        receiverServerCounter++;
      } else {
        bridgeServerCounter++;
      }
    }
    cacheServersMap.put("BridgeServer", bridgeServerCounter);
    cacheServersMap.put("ReceiverServer", receiverServerCounter);
    return cacheServersMap;
  }

  public static void startSenderInVMs(String senderId, VM... vms) {
    for (VM vm : vms) {
      vm.invoke(() -> startSender(senderId));
    }
  }

  public static void startSenderInVMsAsync(String senderId, VM... vms) {
    List<AsyncInvocation<?>> tasks = new LinkedList<>();
    for (VM vm : vms) {
      tasks.add(vm.invokeAsync(() -> startSender(senderId)));
    }
    for (AsyncInvocation<?> invocation : tasks) {
      try {
        invocation.await();
      } catch (InterruptedException e) {
        fail("Starting senders was interrupted");
      }
    }
  }


  public static void startSenderwithCleanQueuesInVMsAsync(String senderId, VM... vms) {
    List<AsyncInvocation<?>> tasks = new LinkedList<>();
    for (VM vm : vms) {
      tasks.add(vm.invokeAsync(() -> startSenderwithCleanQueues(senderId)));
    }
    for (AsyncInvocation<?> invocation : tasks) {
      try {
        invocation.await();
      } catch (InterruptedException e) {
        fail("Starting senders was interrupted");
      }
    }
  }

  public static void startSender(String senderId) {
    final IgnoredException exln = addIgnoredException("Could not connect");

    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    try {
      GatewaySender sender = getGatewaySender(senderId);
      sender.start();
    } finally {
      exp.remove();
      exp1.remove();
      exln.remove();
    }

  }

  public static void startSenderwithCleanQueues(String senderId) {
    final IgnoredException exln = addIgnoredException("Could not connect");

    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    try {
      GatewaySender sender = getGatewaySender(senderId);
      sender.startWithCleanQueue();
    } finally {
      exp.remove();
      exp1.remove();
      exln.remove();
    }

  }

  public static void enableConflation(String senderId) {
    AbstractGatewaySender sender = (AbstractGatewaySender) getGatewaySender(senderId);
    sender.test_setBatchConflationEnabled(true);
  }

  public static Map<String, Object> getSenderToReceiverConnectionInfo(String senderId) {
    GatewaySender sender = getGatewaySender(senderId);
    Map<String, Object> connectionInfo = null;
    if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) {
      connectionInfo = new HashMap<>();
      GatewaySenderEventDispatcher dispatcher =
          ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
      if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) {
        ServerLocation serverLocation =
            ((GatewaySenderEventRemoteDispatcher) dispatcher).getConnection(false).getServer();
        connectionInfo.put("serverHost", serverLocation.getHostName());
        connectionInfo.put("serverPort", serverLocation.getPort());

      }
    }
    return connectionInfo;
  }

  public static void movePrimary(final DistributedMember destination, final String regionName,
      final int bucketId) {
    PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);

    BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage
        .send((InternalDistributedMember) destination, region, bucketId, true);
    assertThat(response.waitForResponse()).isTrue();
  }

  public static int getSecondaryQueueSizeInStats(String senderId) {
    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
    GatewaySenderStats statistics = sender.getStatistics();
    return statistics.getSecondaryEventQueueSize();
  }

  public static void checkQueueSizeInStats(String senderId, final int expectedQueueSize) {
    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
    GatewaySenderStats statistics = sender.getStatistics();
    await().untilAsserted(() -> assertThat(statistics.getEventQueueSize())
        .isEqualTo(expectedQueueSize));
  }

  public static void checkConnectionStats(String senderId) {
    AbstractGatewaySender sender =
        (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);

    Collection<ConnectionStats> statsCollection =
        sender.getProxy().getEndpointManager().getAllStats().values();
    assertThat(statsCollection.iterator().next()).isNotNull();
  }

  public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
    GatewaySenderStats statistics = sender.getStatistics();
    if (expectedQueueSize != -1) {
      final RegionQueue regionQueue;
      regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
      if (sender.isParallel()) {
        ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
            (ConcurrentParallelGatewaySenderQueue) regionQueue;
        parallelGatewaySenderQueue.getRegions();
      }
      await().untilAsserted(() -> assertThat(regionQueue.size()).isEqualTo(expectedQueueSize));
    }
    ArrayList<Integer> stats = new ArrayList<>();
    stats.add(statistics.getEventQueueSize());
    stats.add(statistics.getEventsReceived());
    stats.add(statistics.getEventsQueued());
    stats.add(statistics.getEventsDistributed());
    stats.add(statistics.getBatchesDistributed());
    stats.add(statistics.getBatchesRedistributed());
    stats.add(statistics.getEventsFiltered());
    stats.add(statistics.getEventsNotQueuedConflated());
    stats.add(statistics.getEventsConflatedFromBatches());
    stats.add(statistics.getConflationIndexesMapSize());
    stats.add(statistics.getSecondaryEventQueueSize());
    stats.add(statistics.getEventsProcessedByPQRM());
    stats.add(statistics.getEventsExceedingAlertThreshold());
    stats.add((int) statistics.getBatchesWithIncompleteTransactions());
    return stats;
  }

  public static int getGatewaySenderPoolDisconnects(String senderId) {
    AbstractGatewaySender sender =
        (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId);

    PoolStats poolStats = sender.getProxy().getStats();

    return poolStats.getDisConnects();
  }

  public static List<Integer> getSenderStatsForDroppedEvents(String senderId) {
    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
    GatewaySenderStats statistics = sender.getStatistics();
    ArrayList<Integer> stats = new ArrayList<>();
    int eventNotQueued = statistics.getEventsDroppedDueToPrimarySenderNotRunning();
    if (eventNotQueued > 0) {
      logger
          .info("Found " + eventNotQueued + " events dropped due to primary sender is not running");
    }
    stats.add(eventNotQueued);
    stats.add(statistics.getEventsNotQueued());
    stats.add(statistics.getEventsNotQueuedConflated());
    return stats;
  }

  public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
      final int eventsQueued, final int eventsDistributed) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await().untilAsserted(() -> {
      assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize);
      assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
      assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued);
      assertThat(statistics.getEventsDistributed()).isGreaterThanOrEqualTo(eventsDistributed);
    });
  }

  public static void checkGatewayReceiverStats(int processBatches, int eventsReceived,
      int creates) {
    checkGatewayReceiverStats(processBatches, eventsReceived, creates, false);
  }

  public static void checkGatewayReceiverStats(int processBatches, int eventsReceived,
      int creates, boolean isExact) {
    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
    GatewayReceiver receiver = gatewayReceivers.iterator().next();
    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();

    assertThat(stats instanceof GatewayReceiverStats).isTrue();
    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
    if (isExact) {
      assertThat(gatewayReceiverStats.getProcessBatchRequests() == processBatches).isTrue();
    } else {
      assertThat(gatewayReceiverStats.getProcessBatchRequests() >= processBatches).isTrue();
    }
    assertThat(gatewayReceiverStats.getEventsReceived()).isEqualTo(eventsReceived);
    assertThat(gatewayReceiverStats.getCreateRequest()).isEqualTo(creates);
  }

  public static List<Long> getReceiverStats() {
    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
    GatewayReceiver receiver = gatewayReceivers.iterator().next();
    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();
    assertThat(stats instanceof GatewayReceiverStats).isTrue();
    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
    List<Long> statsList = new ArrayList<>();
    statsList.add(gatewayReceiverStats.getEventsReceived());
    statsList.add(gatewayReceiverStats.getEventsRetried());
    statsList.add(gatewayReceiverStats.getProcessBatchRequests());
    statsList.add(gatewayReceiverStats.getDuplicateBatchesReceived());
    statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived());
    statsList.add(gatewayReceiverStats.getEarlyAcks());
    statsList.add(gatewayReceiverStats.getExceptionsOccurred());
    return statsList;
  }

  public static void checkMinimumGatewayReceiverStats(int processBatches, int eventsReceived) {
    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
    GatewayReceiver receiver = gatewayReceivers.iterator().next();
    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();

    assertThat(stats instanceof GatewayReceiverStats).isTrue();
    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
    assertThat(gatewayReceiverStats.getProcessBatchRequests() >= processBatches).isTrue();
    assertThat(gatewayReceiverStats.getEventsReceived() >= eventsReceived).isTrue();
  }

  public static void checkExceptionStats(int exceptionsOccurred) {
    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
    GatewayReceiver receiver = gatewayReceivers.iterator().next();
    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();

    assertThat(stats instanceof GatewayReceiverStats).isTrue();
    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
    if (exceptionsOccurred == 0) {
      assertThat(gatewayReceiverStats.getExceptionsOccurred()).isEqualTo(exceptionsOccurred);
    } else {
      assertThat(gatewayReceiverStats.getExceptionsOccurred() >= exceptionsOccurred).isTrue();
    }
  }

  public static void checkGatewayReceiverStatsHA(int processBatches, int eventsReceived,
      int creates) {
    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
    GatewayReceiver receiver = gatewayReceivers.iterator().next();
    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();

    assertThat(stats instanceof GatewayReceiverStats).isTrue();
    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
    assertThat(gatewayReceiverStats.getProcessBatchRequests() >= processBatches).isTrue();
    assertThat(gatewayReceiverStats.getEventsReceived() >= eventsReceived).isTrue();
    assertThat(gatewayReceiverStats.getCreateRequest() >= creates).isTrue();
  }

  public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await()
        .untilAsserted(() -> assertThat(statistics.getEventsFiltered()).isEqualTo(eventsFiltered));
  }

  public static void checkConflatedStats(String senderId, final int eventsConflated) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await().untilAsserted(
        () -> assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated));
  }

  public static void checkStats_Failover(String senderId, final int eventsReceived) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await().untilAsserted(() -> {
      assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived);
      assertThat((statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary()
          + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived);
    });
  }

  public static void checkBatchStats(String senderId, final int batches) {
    checkBatchStats(senderId, batches, false);
  }

  public static void checkBatchStats(String senderId, final int batches, boolean isExact) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await().untilAsserted(() -> {
      if (isExact) {
        assertThat(statistics.getBatchesDistributed()).isEqualTo(batches);
      } else {
        assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches);
      }
      assertThat(statistics.getBatchesRedistributed()).isEqualTo(0);
    });
  }

  public static void checkBatchStats(String senderId, final int batches,
      boolean isExact, final boolean batchesRedistributed) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await().untilAsserted(() -> {
      if (isExact) {
        assertThat(statistics.getBatchesDistributed()).isEqualTo(batches);
      } else {
        assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches);
      }
      assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
    });
  }

  public static void checkBatchStats(String senderId, final boolean batchesDistributed,
      final boolean batchesRedistributed) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await().untilAsserted(() -> {
      assertThat((statistics.getBatchesDistributed() > 0)).isEqualTo(batchesDistributed);
      assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed);
    });
  }

  public static void checkUnProcessedStats(String senderId, int events) {
    GatewaySenderStats statistics = getGatewaySenderStats(senderId);
    await().untilAsserted(() -> {
      assertThat((statistics.getUnprocessedEventsAddedBySecondary()
          + statistics.getUnprocessedTokensRemovedBySecondary())).isEqualTo(events);
      assertThat((statistics.getUnprocessedEventsRemovedByPrimary()
          + statistics.getUnprocessedTokensAddedByPrimary())).isEqualTo(events);
    });
  }

  public static GatewaySenderStats getGatewaySenderStats(String senderId) {
    GatewaySender sender = cache.getGatewaySender(senderId);
    return ((AbstractGatewaySender) sender).getStatistics();
  }

  public static void waitForSenderRunningState(String senderId) {
    final IgnoredException exln = addIgnoredException("Could not connect");
    try {
      Set<GatewaySender> senders = cache.getGatewaySenders();
      final GatewaySender sender = getGatewaySenderById(senders, senderId);
      await()
          .untilAsserted(
              () -> assertThat((sender != null && sender.isRunning())).as(
                  "Expected sender isRunning state to " + "be true but is false").isEqualTo(true));
    } finally {
      exln.remove();
    }
  }

  public static void waitForSenderNonRunningState(String senderId) {
    try (IgnoredException ignored = addIgnoredException("Could not connect")) {
      Set<GatewaySender> senders = cache.getGatewaySenders();
      final GatewaySender sender = getGatewaySenderById(senders, senderId);
      await()
          .untilAsserted(
              () -> assertThat((sender != null && !sender.isRunning())).as(
                  "Expected sender isRunning state to " + "be false but is true").isEqualTo(true));
    }
  }

  public static void waitForSenderToBecomePrimary(String senderId) {
    Set<GatewaySender> senders = ((GemFireCacheImpl) cache).getAllGatewaySenders();
    final GatewaySender sender = getGatewaySenderById(senders, senderId);
    await()
        .untilAsserted(
            () -> assertThat((sender != null && ((AbstractGatewaySender) sender).isPrimary())).as(
                "Expected sender primary state to " + "be true but is false").isEqualTo(true));
  }

  private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
    for (GatewaySender s : senders) {
      if (s.getId().equals(senderId)) {
        return s;
      }
    }
    // if none of the senders matches with the supplied senderid, return null
    return null;
  }

  public static Map<String, List<?>> checkQueue() {
    HashMap<String, List<?>> listenerAttrs = new HashMap<>();
    listenerAttrs.put("Create", listener1.createList);
    listenerAttrs.put("Update", listener1.updateList);
    listenerAttrs.put("Destroy", listener1.destroyList);
    return listenerAttrs;
  }

  public static void checkQueueOnSecondary(final Map<String, List<?>> primaryUpdatesMap) {
    final HashMap<String, List<?>> secondaryUpdatesMap = new HashMap<>();
    secondaryUpdatesMap.put("Create", listener1.createList);
    secondaryUpdatesMap.put("Update", listener1.updateList);
    secondaryUpdatesMap.put("Destroy", listener1.destroyList);

    await().untilAsserted(() -> {
      secondaryUpdatesMap.put("Create", listener1.createList);
      secondaryUpdatesMap.put("Update", listener1.updateList);
      secondaryUpdatesMap.put("Destroy", listener1.destroyList);
      assertThat(secondaryUpdatesMap.equals(primaryUpdatesMap)).as(
          "Expected secondary map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap)
          .isEqualTo(true);
    });
  }

  public static Map<String, List<?>> checkQueue2() {
    HashMap<String, List<?>> listenerAttrs = new HashMap<>();
    listenerAttrs.put("Create", listener2.createList);
    listenerAttrs.put("Update", listener2.updateList);
    listenerAttrs.put("Destroy", listener2.destroyList);
    return listenerAttrs;
  }

  public static <K> Map<String, List<K>> checkPR(String regionName) {
    PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
    @SuppressWarnings("unchecked")
    QueueListener<K, ?> listener = (QueueListener<K, ?>) region.getCacheListener();

    HashMap<String, List<K>> listenerAttrs = new HashMap<>();
    listenerAttrs.put("Create", listener.createList);
    listenerAttrs.put("Update", listener.updateList);
    listenerAttrs.put("Destroy", listener.destroyList);
    return listenerAttrs;
  }

  public static <K> Map<String, List<K>> checkBR(String regionName, int numBuckets) {
    PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
    HashMap<String, List<K>> listenerAttrs = new HashMap<>();
    for (int i = 0; i < numBuckets; i++) {
      BucketRegion br = region.getBucketRegion(i);
      @SuppressWarnings("unchecked")
      QueueListener<K, ?> listener = (QueueListener<K, ?>) br.getCacheListener();
      listenerAttrs.put("Create" + i, listener.createList);
      listenerAttrs.put("Update" + i, listener.updateList);
      listenerAttrs.put("Destroy" + i, listener.destroyList);
    }
    return listenerAttrs;
  }

  public static <K> Map<String, List<K>> checkQueue_BR(String senderId, int numBuckets) {
    GatewaySender sender = getGatewaySender(senderId);
    RegionQueue parallelQueue =
        ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];

    PartitionedRegion region = (PartitionedRegion) parallelQueue.getRegion();
    Map<String, List<K>> listenerAttrs = new HashMap<>();
    for (int i = 0; i < numBuckets; i++) {
      BucketRegion br = region.getBucketRegion(i);
      if (br != null) {
        @SuppressWarnings("unchecked")
        QueueListener<K, ?> listener = (QueueListener<K, ?>) br.getCacheListener();
        if (listener != null) {
          listenerAttrs.put("Create" + i, listener.createList);
          listenerAttrs.put("Update" + i, listener.updateList);
          listenerAttrs.put("Destroy" + i, listener.destroyList);
        }
      }
    }
    return listenerAttrs;
  }

  public static void addListenerOnBucketRegion(String regionName, int numBuckets) {
    WANTestBase test = new WANTestBase();
    test.addCacheListenerOnBucketRegion(regionName, numBuckets);
  }

  private void addCacheListenerOnBucketRegion(String regionName, int numBuckets) {
    PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
    for (int i = 0; i < numBuckets; i++) {
      BucketRegion br = region.getBucketRegion(i);
      @SuppressWarnings("unchecked")
      AttributesMutator<Object, Object> mutator = br.getAttributesMutator();
      listener1 = new QueueListener<>();
      mutator.addCacheListener(listener1);
    }
  }

  public static void addListenerOnQueueBucketRegion(String senderId, int numBuckets) {
    WANTestBase test = new WANTestBase();
    test.addCacheListenerOnQueueBucketRegion(senderId, numBuckets);
  }

  private void addCacheListenerOnQueueBucketRegion(String senderId, int numBuckets) {
    GatewaySender sender = getGatewaySender(senderId);
    RegionQueue parallelQueue =
        ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];

    PartitionedRegion region = (PartitionedRegion) parallelQueue.getRegion();
    for (int i = 0; i < numBuckets; i++) {
      BucketRegion br = region.getBucketRegion(i);
      if (br != null) {
        @SuppressWarnings("unchecked")
        AttributesMutator<Object, Object> mutator = br.getAttributesMutator();
        CacheListener<Object, Object> listener = new QueueListener<>();
        mutator.addCacheListener(listener);
      }
    }

  }

  public static void addQueueListener(String senderId, boolean isParallel) {
    WANTestBase test = new WANTestBase();
    test.addCacheQueueListener(senderId, isParallel);
  }

  public static void addSecondQueueListener(String senderId, boolean isParallel) {
    WANTestBase test = new WANTestBase();
    test.addSecondCacheQueueListener(senderId, isParallel);
  }

  public static void addListenerOnRegion(String regionName) {
    WANTestBase test = new WANTestBase();
    test.addCacheListenerOnRegion(regionName);
  }

  private void addCacheListenerOnRegion(String regionName) {
    Region<Object, Object> region = cache.getRegion(regionName);
    AttributesMutator<Object, Object> mutator = region.getAttributesMutator();
    listener1 = new QueueListener<>();
    mutator.addCacheListener(listener1);
  }

  private void addCacheQueueListener(String senderId, boolean isParallel) {
    GatewaySender sender = getGatewaySender(senderId);
    listener1 = new QueueListener<>();
    if (!isParallel) {
      Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      for (RegionQueue q : queues) {
        q.addCacheListener(listener1);
      }
    } else {
      RegionQueue parallelQueue =
          ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      parallelQueue.addCacheListener(listener1);
    }
  }

  private void addSecondCacheQueueListener(String senderId, boolean isParallel) {
    GatewaySender sender = getGatewaySender(senderId);
    listener2 = new QueueListener<>();
    if (!isParallel) {
      Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      for (RegionQueue q : queues) {
        q.addCacheListener(listener2);
      }
    } else {
      RegionQueue parallelQueue =
          ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      parallelQueue.addCacheListener(listener2);
    }
  }

  public static void pauseSender(String senderId) {
    final IgnoredException exln = addIgnoredException("Could not connect");
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      GatewaySender sender = getGatewaySender(senderId);
      sender.pause();
      ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause();

    } finally {
      exp.remove();
      exln.remove();
    }
  }

  public static void resumeSender(String senderId) {
    final IgnoredException exln = addIgnoredException("Could not connect");
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      GatewaySender sender = getGatewaySender(senderId);
      sender.resume();
    } finally {
      exp.remove();
      exln.remove();
    }
  }

  public static void stopSenderInVMsAsync(String senderId, VM... vms) {
    List<AsyncInvocation<?>> tasks = new LinkedList<>();
    for (VM vm : vms) {
      tasks.add(vm.invokeAsync(() -> stopSender(senderId)));
    }
    for (AsyncInvocation<?> invocation : tasks) {
      try {
        invocation.await();
      } catch (InterruptedException e) {
        fail("Stopping senders was interrupted");
      }
    }
  }

  public static void stopSender(String senderId) {
    final IgnoredException exln = addIgnoredException("Could not connect");
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      GatewaySender sender = getGatewaySender(senderId);
      AbstractGatewaySenderEventProcessor eventProcessor = null;
      if (sender instanceof AbstractGatewaySender) {
        eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor();
      }
      sender.stop();

      Set<RegionQueue> queues;
      if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
        queues = ((ConcurrentSerialGatewaySenderEventProcessor) eventProcessor).getQueues();
        for (RegionQueue queue : queues) {
          if (queue instanceof SerialGatewaySenderQueue) {
            assertThat(((SerialGatewaySenderQueue) queue).isRemovalThreadAlive()).isFalse();
          }
        }
      }
    } finally {
      exp.remove();
      exln.remove();
    }
  }

  public static void stopReceivers() {
    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
    for (GatewayReceiver receiver : receivers) {
      receiver.stop();
    }
  }

  public static void startReceivers() {
    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
    for (GatewayReceiver receiver : receivers) {
      try {
        receiver.start();
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
  }

  public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1,
      String dsName, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
      boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers,
      OrderPolicy policy, int socketBufferSize) {

    InternalGatewaySenderFactory gateway =
        (InternalGatewaySenderFactory) cache.createGatewaySenderFactory();
    gateway.setParallel(isParallel);
    gateway.setMaximumQueueMemory(maxMemory);
    gateway.setBatchSize(batchSize);
    gateway.setBatchConflationEnabled(isConflation);
    gateway.setManualStart(isManualStart);
    gateway.setDispatcherThreads(numDispatchers);
    gateway.setOrderPolicy(policy);
    gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
    gateway.setSocketBufferSize(socketBufferSize);
    if (filter != null) {
      eventFilter = filter;
      gateway.addGatewayEventFilter(filter);
    }
    if (isPersistent) {
      gateway.setPersistenceEnabled(true);
      gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
    } else {
      DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
      gateway.setDiskStoreName(store.getName());
    }
    return gateway;
  }

  public static void createSender(String dsName, int remoteDsId, boolean isParallel,
      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
      GatewayEventFilter filter, boolean isManualStart) {
    createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent,
        filter, isManualStart, false);
  }

  public static void createSender(String dsName, int remoteDsId, boolean isParallel,
      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
      GatewayEventFilter filter, boolean isManualStart, boolean groupTransactionEvents) {
    final IgnoredException exln = addIgnoredException("Could not connect");
    try {
      File persistentDirectory =
          new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
      persistentDirectory.mkdir();
      DiskStoreFactory dsf = cache.createDiskStoreFactory();
      File[] dirs1 = new File[] {persistentDirectory};
      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory,
          batchSize, isConflation, isPersistent, filter, isManualStart,
          numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY,
          GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
      gateway.setGroupTransactionEvents(groupTransactionEvents);
      if (groupTransactionEvents && gateway instanceof InternalGatewaySenderFactory) {
        // Set a very high value to avoid flakiness in test cases
        ((InternalGatewaySenderFactory) gateway).setRetriesToGetTransactionEventsFromQueue(1000);
      }
      gateway.create(dsName, remoteDsId);
    } finally {
      exln.remove();
    }
  }

  public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId,
      boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
      boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers,
      OrderPolicy orderPolicy) {
    createSenderWithMultipleDispatchers(dsName, remoteDsId,
        isParallel, maxMemory, batchSize, isConflation,
        isPersistent, filter, isManualStart, numDispatchers,
        orderPolicy, GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
  }

  public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId,
      boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation,
      boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers,
      OrderPolicy orderPolicy, int socketBufferSize) {
    final IgnoredException exln = addIgnoredException("Could not connect");
    try {
      File persistentDirectory =
          new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
      persistentDirectory.mkdir();
      DiskStoreFactory dsf = cache.createDiskStoreFactory();
      File[] dirs1 = new File[] {persistentDirectory};
      GatewaySenderFactory gateway =
          configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation,
              isPersistent, filter, isManualStart, numDispatchers, orderPolicy, socketBufferSize);
      gateway.create(dsName, remoteDsId);

    } finally {
      exln.remove();
    }
  }

  public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, Integer maxMemory,
      Integer batchSize, boolean isConflation, boolean isManualStart) {

    GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
    gateway.setParallel(true);
    gateway.setMaximumQueueMemory(maxMemory);
    gateway.setBatchSize(batchSize);
    gateway.setManualStart(isManualStart);
    // set dispatcher threads
    gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
    gateway.setBatchConflationEnabled(isConflation);
    gateway.create(dsName, remoteDsId);
  }

  public static void createSenderAlertThresholdWithoutDiskStore(String dsName, int remoteDsId,
      Integer maxMemory,
      Integer batchSize, boolean isConflation, boolean isManualStart, int alertthreshold) {

    GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
    gateway.setAlertThreshold(alertthreshold);
    gateway.setParallel(true);
    gateway.setMaximumQueueMemory(maxMemory);
    gateway.setBatchSize(batchSize);
    gateway.setManualStart(isManualStart);
    // set dispatcher threads
    gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
    gateway.setBatchConflationEnabled(isConflation);
    gateway.create(dsName, remoteDsId);
  }

  public static void createConcurrentSender(String dsName, int remoteDsId, boolean isParallel,
      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
      GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel, OrderPolicy policy) {

    File persistentDirectory =
        new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
    persistentDirectory.mkdir();
    DiskStoreFactory dsf = cache.createDiskStoreFactory();
    File[] dirs1 = new File[] {persistentDirectory};
    GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory,
        batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy,
        GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
    gateway.create(dsName, remoteDsId);
  }

  public static void createSenderForValidations(String dsName, int remoteDsId, boolean isParallel,
      Integer alertThreshold, boolean isConflation, boolean isPersistent,
      List<GatewayEventFilter> eventFilters, List<GatewayTransportFilter> transportFilters,
      boolean isManualStart, boolean isDiskSync) {
    IgnoredException exp1 =
        addIgnoredException(RegionDestroyedException.class.getName());
    try {
      File persistentDirectory =
          new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
      persistentDirectory.mkdir();
      DiskStoreFactory dsf = cache.createDiskStoreFactory();
      File[] dirs1 = new File[] {persistentDirectory};

      if (isParallel) {
        GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
        gateway.setParallel(true);
        gateway.setAlertThreshold(alertThreshold);
        ((InternalGatewaySenderFactory) gateway)
            .setLocatorDiscoveryCallback(new MyLocatorCallback());
        if (eventFilters != null) {
          for (GatewayEventFilter filter : eventFilters) {
            gateway.addGatewayEventFilter(filter);
          }
        }
        if (transportFilters != null) {
          for (GatewayTransportFilter filter : transportFilters) {
            gateway.addGatewayTransportFilter(filter);
          }
        }
        if (isPersistent) {
          gateway.setPersistenceEnabled(true);
          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName + "_Parallel").getName());
        } else {
          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Parallel");
          gateway.setDiskStoreName(store.getName());
        }
        gateway.setDiskSynchronous(isDiskSync);
        gateway.setBatchConflationEnabled(isConflation);
        gateway.setManualStart(isManualStart);
        // set dispatcher threads
        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
        gateway.create(dsName, remoteDsId);

      } else {
        GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
        gateway.setAlertThreshold(alertThreshold);
        gateway.setManualStart(isManualStart);
        // set dispatcher threads
        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
        ((InternalGatewaySenderFactory) gateway)
            .setLocatorDiscoveryCallback(new MyLocatorCallback());
        if (eventFilters != null) {
          for (GatewayEventFilter filter : eventFilters) {
            gateway.addGatewayEventFilter(filter);
          }
        }
        if (transportFilters != null) {
          for (GatewayTransportFilter filter : transportFilters) {
            gateway.addGatewayTransportFilter(filter);
          }
        }
        gateway.setBatchConflationEnabled(isConflation);
        if (isPersistent) {
          gateway.setPersistenceEnabled(true);
          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName + "_Serial").getName());
        } else {
          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Serial");
          gateway.setDiskStoreName(store.getName());
        }
        gateway.setDiskSynchronous(isDiskSync);
        gateway.create(dsName, remoteDsId);
      }
    } finally {
      exp1.remove();
    }
  }

  public static String createSenderWithDiskStore(String dsName, int remoteDsId, boolean isParallel,
      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
      GatewayEventFilter filter, String dsStore, boolean isManualStart) {
    final File persistentDirectory;
    if (dsStore == null) {
      persistentDirectory =
          new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
    } else {
      persistentDirectory = new File(dsStore);
    }
    logger.info("The ds is : " + persistentDirectory.getName());

    persistentDirectory.mkdir();
    DiskStoreFactory dsf = cache.createDiskStoreFactory();
    File[] dirs1 = new File[] {persistentDirectory};

    if (isParallel) {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setParallel(true);
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      // set dispatcher threads
      gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        String dsname = dsf.setDiskDirs(dirs1).create(dsName).getName();
        gateway.setDiskStoreName(dsname);
        logger.info("The DiskStoreName is : " + dsname);
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
        gateway.setDiskStoreName(store.getName());
        logger.info("The ds is : " + store.getName());
      }
      gateway.setBatchConflationEnabled(isConflation);
      gateway.create(dsName, remoteDsId);

    } else {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      // set dispatcher threads
      gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      gateway.setBatchConflationEnabled(isConflation);
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);

        gateway.setDiskStoreName(store.getName());
      }
      gateway.create(dsName, remoteDsId);
    }
    return persistentDirectory.getName();
  }


  public static void createSenderWithListener(String dsName, int remoteDsName, boolean isParallel,
      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
      GatewayEventFilter filter, boolean attachTwoListeners, boolean isManualStart) {
    File persistentDirectory =
        new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
    persistentDirectory.mkdir();
    DiskStoreFactory dsf = cache.createDiskStoreFactory();
    File[] dirs1 = new File[] {persistentDirectory};

    if (isParallel) {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setParallel(true);
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      // set dispatcher threads
      gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
        gateway.setDiskStoreName(store.getName());
      }
      gateway.setBatchConflationEnabled(isConflation);
      gateway.create(dsName, remoteDsName);

    } else {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      // set dispatcher threads
      gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      gateway.setBatchConflationEnabled(isConflation);
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
        gateway.setDiskStoreName(store.getName());
      }

      eventListener1 = new MyGatewaySenderEventListener();
      ((InternalGatewaySenderFactory) gateway).addAsyncEventListener(eventListener1);
      if (attachTwoListeners) {
        eventListener2 = new MyGatewaySenderEventListener2();
        ((InternalGatewaySenderFactory) gateway).addAsyncEventListener(eventListener2);
      }
      ((InternalGatewaySenderFactory) gateway).create(dsName);
    }
  }

  public static void createReceiverInVMs(int maximumTimeBetweenPings, VM... vms) {
    for (VM vm : vms) {
      vm.invoke(() -> createReceiverWithMaximumTimeBetweenPings(maximumTimeBetweenPings));
    }
  }


  public static void createReceiverInVMs(VM... vms) {
    createReceiverInVMs(-1, vms);
  }

  public static int createReceiver() {
    return createReceiverWithMaximumTimeBetweenPings(-1);
  }

  public static int createReceiverWithMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
    int port = getRandomAvailableTCPPort();
    fact.setStartPort(port);
    fact.setEndPort(port);
    fact.setManualStart(true);
    if (maximumTimeBetweenPings > 0) {
      fact.setMaximumTimeBetweenPings(maximumTimeBetweenPings);
    }
    GatewayReceiver receiver = fact.create();
    try {
      receiver.start();
    } catch (IOException e) {
      e.printStackTrace();
      Assert.fail(
          "Test " + getTestMethodName() + " failed to start GatewayReceiver on port " + port, e);
    }
    return port;
  }

  public static int createReceiverWithSSL(int locPort) {
    WANTestBase test = new WANTestBase();

    Properties gemFireProps = test.getDistributedSystemProperties();

    gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
    gemFireProps.put(GATEWAY_SSL_ENABLED, "true");
    gemFireProps.put(GATEWAY_SSL_PROTOCOLS, "any");
    gemFireProps.put(GATEWAY_SSL_CIPHERS, "any");
    gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, "true");

    gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks");
    gemFireProps.put(GATEWAY_SSL_KEYSTORE, createTempFileFromResource(WANTestBase.class,
        "/org/apache/geode/cache/client/internal/default.keystore").getAbsolutePath());
    gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password");
    gemFireProps.put(GATEWAY_SSL_TRUSTSTORE, createTempFileFromResource(WANTestBase.class,
        "/org/apache/geode/cache/client/internal/default.keystore").getAbsolutePath());
    gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password");

    gemFireProps.setProperty(MCAST_PORT, "0");
    gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]");

    logger.info("Starting cache ds with following properties \n" + gemFireProps);

    InternalDistributedSystem ds = test.getSystem(gemFireProps);
    cache = CacheFactory.create(ds);
    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
    int port = getRandomAvailableTCPPort();
    fact.setStartPort(port);
    fact.setEndPort(port);
    fact.setManualStart(true);
    GatewayReceiver receiver = fact.create();
    try {
      receiver.start();
    } catch (IOException e) {
      e.printStackTrace();
      fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port);
    }
    return port;
  }

  public static void createReceiverAndServer(int locPort) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");

    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);
    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
    int receiverPort = getRandomAvailableTCPPort();
    fact.setStartPort(receiverPort);
    fact.setEndPort(receiverPort);
    fact.setManualStart(true);
    GatewayReceiver receiver = fact.create();
    try {
      receiver.start();
    } catch (IOException e) {
      e.printStackTrace();
      fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + receiverPort);
    }
    CacheServer server = cache.addCacheServer();
    int serverPort = getRandomAvailableTCPPort();
    server.setPort(serverPort);
    server.setHostnameForClients("localhost");
    try {
      server.start();
    } catch (IOException e) {
      fail("Failed to start server ", e);
    }
  }

  public static int createServer(int locPort) {
    return createServer(locPort, -1);
  }

  public static int createServer(int locPort, int maximumTimeBetweenPings) {
    WANTestBase test = new WANTestBase();
    Properties properties = test.getDistributedSystemProperties();
    return createServer(locPort, maximumTimeBetweenPings, properties);
  }

  public static int createServer(int locPort, int maximumTimeBetweenPings, Properties props) {
    WANTestBase test = new WANTestBase();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);

    CacheServer server = cache.addCacheServer();
    int port = getRandomAvailableTCPPort();
    server.setPort(port);
    server.setHostnameForClients("localhost");
    if (maximumTimeBetweenPings > 0) {
      server.setMaximumTimeBetweenPings(maximumTimeBetweenPings);
    }
    try {
      server.start();
    } catch (IOException e) {
      fail("Failed to start server ", e);
    }
    return port;
  }

  public static void createClientWithLocatorAndRegion(int port0, String host, String regionName,
      ClientRegionShortcut regionType) {
    cache = (Cache) new ClientCacheFactory().addPoolLocator(host, port0).create();

    ((ClientCache) cache).createClientRegionFactory(regionType)
        .create(regionName);
  }

  public static void createClientWithLocatorAndRegion(int port0, String host, String regionName) {
    createClientWithLocatorAndRegion(port0, host);

    RegionFactory<?, ?> factory = cache.createRegionFactory(RegionShortcut.LOCAL);
    factory.setPoolName("pool");

    region = factory.create(regionName);
    region.registerInterestForAllKeys();
    logger.info("Distributed Region " + regionName + " created Successfully :" + region.toString());
  }

  public static void createClientWithLocatorAndRegion(final int port0, final String host) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "");

    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);

    assertThat(cache).isNotNull();
    CacheServerTestUtil.disableShufflingOfEndpoints();
    try {
      PoolManager.createFactory().addLocator(host, port0).setPingInterval(250)
          .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(20000)
          .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(3)
          .create("pool");
    } finally {
      CacheServerTestUtil.enableShufflingOfEndpoints();
    }
  }

  public static int createReceiver_PDX(int locPort) {
    WANTestBase test = new WANTestBase();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
    InternalDistributedSystem ds = test.getSystem(props);
    File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");

    cache = new InternalCacheBuilder(props)
        .setPdxPersistent(true)
        .setPdxDiskStore("pdxStore")
        .setIsExistingOk(false)
        .create(ds);

    cache.createDiskStoreFactory().setDiskDirs(new File[] {pdxDir}).setMaxOplogSize(1)
        .create("pdxStore");
    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
    int port = getRandomAvailableTCPPort();
    fact.setStartPort(port);
    fact.setEndPort(port);
    fact.setManualStart(true);
    GatewayReceiver receiver = fact.create();
    try {
      receiver.start();
    } catch (IOException e) {
      e.printStackTrace();
      fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port);
    }
    return port;
  }

  public static void doDistTXPuts(String regionName, int numPuts) {
    CacheTransactionManager txMgr = cache.getCacheTransactionManager();
    txMgr.setDistributed(true);

    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    IgnoredException exp2 =
        addIgnoredException(GatewaySenderException.class.getName());
    try {
      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
      for (long i = 1; i <= numPuts; i++) {
        txMgr.begin();
        r.put(i, i);
        txMgr.commit();
      }
    } finally {
      exp1.remove();
      exp2.remove();
    }
  }


  public static void doPuts(String regionName, int numPuts, Object value) {
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    IgnoredException exp2 =
        addIgnoredException(GatewaySenderException.class.getName());
    try {
      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
      for (long i = 0; i < numPuts; i++) {
        r.put(i, value);
      }
    } finally {
      exp1.remove();
      exp2.remove();
    }
  }

  public static void doPuts(String regionName, int numPuts) {
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    IgnoredException exp2 =
        addIgnoredException(GatewaySenderException.class.getName());
    try {
      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
      for (long i = 0; i < numPuts; i++) {
        r.put(i, "Value_" + i);
      }
    } finally {
      exp1.remove();
      exp2.remove();
    }
  }

  public static void doTxPuts(String regionName, int numPuts) {
    try (
        IgnoredException ignored = addIgnoredException(InterruptedException.class);
        IgnoredException ignored1 =
            addIgnoredException(GatewaySenderException.class)) {
      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
      for (long i = 0; i < numPuts; i++) {
        cache.getCacheTransactionManager().begin();
        r.put(i, "Value_" + i);
        cache.getCacheTransactionManager().commit();
      }
    }
  }

  public static void doPutsSameKey(String regionName, int numPuts, String key) {
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    IgnoredException exp2 =
        addIgnoredException(GatewaySenderException.class.getName());
    try {
      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
      for (long i = 0; i < numPuts; i++) {
        r.put(key, "Value_" + i);
      }
    } finally {
      exp1.remove();
      exp2.remove();
    }
  }


  public static void doPutsAfter300(String regionName, int numPuts) {
    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
    for (long i = 300; i < numPuts; i++) {
      r.put(i, "Value_" + i);
    }
  }

  public static void doPutsFrom(String regionName, int from, int numPuts) {
    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
    for (long i = from; i < numPuts; i++) {
      r.put(i, "Value_" + i);
    }
  }

  public static void doDestroys(String regionName, int keyNum) {
    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
    for (long i = 0; i < keyNum; i++) {
      r.destroy(i);
    }
  }

  public static void doPutAll(String regionName, int numPuts, int size) {
    Region<Long, String> r = cache.getRegion(SEPARATOR + regionName);
    for (long i = 0; i < numPuts; i++) {
      Map<Long, String> putAllMap = new HashMap<>();
      for (long j = 0; j < size; j++) {
        putAllMap.put((size * i) + j, "Value_" + i);
      }
      r.putAll(putAllMap, "putAllCallback");
      putAllMap.clear();
    }
  }


  public static void doPutsWithKeyAsString(String regionName, int numPuts) {
    Region<String, String> r = cache.getRegion(SEPARATOR + regionName);
    for (long i = 0; i < numPuts; i++) {
      r.put("Object_" + i, "Value_" + i);
    }
  }

  public static <K, V> void putGivenKeyValue(String regionName, Map<K, V> keyValues) {
    Region<K, V> r = cache.getRegion(SEPARATOR + regionName);
    for (K key : keyValues.keySet()) {
      r.put(key, keyValues.get(key));
    }
  }

  public static void doOrderAndShipmentPutsInsideTransactions(Map<Object, Object> keyValues,
      int eventsPerTransaction) {
    Region<Object, Object> orderRegion = cache.getRegion(orderRegionName);
    Region<Object, Object> shipmentRegion = cache.getRegion(shipmentRegionName);
    int eventInTransaction = 0;
    CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager();
    for (Object key : keyValues.keySet()) {
      if (eventInTransaction == 0) {
        cacheTransactionManager.begin();
      }
      Region<Object, Object> r;
      if (key instanceof OrderId) {
        r = orderRegion;
      } else {
        r = shipmentRegion;
      }
      r.put(key, keyValues.get(key));
      if (++eventInTransaction == eventsPerTransaction) {
        cacheTransactionManager.commit();
        eventInTransaction = 0;
      }
    }
    if (eventInTransaction != 0) {
      cacheTransactionManager.commit();
    }
  }

  public static <K, V> void doPutsInsideTransactions(String regionName, Map<K, V> keyValues,
      int eventsPerTransaction) {
    Region<K, V> r = cache.getRegion(Region.SEPARATOR + regionName);
    int eventInTransaction = 0;
    CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager();
    for (K key : keyValues.keySet()) {
      if (eventInTransaction == 0) {
        cacheTransactionManager.begin();
      }
      r.put(key, keyValues.get(key));
      if (++eventInTransaction == eventsPerTransaction) {
        cacheTransactionManager.commit();
        eventInTransaction = 0;
      }
    }
    if (eventInTransaction != 0) {
      cacheTransactionManager.commit();
    }
  }

  public static void destroyRegion(String regionName) {
    destroyRegion(regionName, -1);
  }

  public static void destroyRegion(String regionName, final int min) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    await().until(() -> r.size() > min);
    r.destroyRegion();
  }

  public static void localDestroyRegion(String regionName) {
    IgnoredException exp =
        addIgnoredException(PRLocallyDestroyedException.class.getName());
    try {
      Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
      r.localDestroyRegion();
    } finally {
      exp.remove();
    }
  }


  public static Map<CustId, Customer> putCustomerPartitionedRegion(int numPuts) {
    String valueSuffix = "";
    return putCustomerPartitionedRegion(numPuts, valueSuffix);
  }

  public static Map<CustId, Customer> updateCustomerPartitionedRegion(int numPuts) {
    String valueSuffix = "_update";
    return putCustomerPartitionedRegion(numPuts, valueSuffix);
  }

  protected static Map<CustId, Customer> putCustomerPartitionedRegion(int numPuts,
      String valueSuffix) {
    Map<CustId, Customer> custKeyValues = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      Customer customer = new Customer("name" + i, "Address" + i + valueSuffix);
      try {
        customerRegion.put(custid, customer);
        assertThat(customerRegion.containsKey(custid)).isTrue();
        assertThat(customerRegion.get(custid)).isEqualTo(customer);
        custKeyValues.put(custid, customer);
      } catch (Exception e) {
        fail(
            "putCustomerPartitionedRegion : failed while doing put operation in CustomerPartitionedRegion ",
            e);
      }
      logger.info("Customer :- { " + custid + " : " + customer + " }");
    }
    return custKeyValues;
  }

  public static Map<OrderId, Order> putOrderPartitionedRegion(int numPuts) {
    Map<OrderId, Order> orderKeyValues = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      int oid = i + 1;
      OrderId orderId = new OrderId(oid, custid);
      Order order = new Order("ORDER" + oid);
      try {
        orderRegion.put(orderId, order);
        orderKeyValues.put(orderId, order);
        assertThat(orderRegion.containsKey(orderId)).isTrue();
        assertThat(orderRegion.get(orderId)).isEqualTo(order);

      } catch (Exception e) {
        fail(
            "putOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
            e);
      }
      logger.info("Order :- { " + orderId + " : " + order + " }");
    }
    return orderKeyValues;
  }

  public static Map<CustId, Order> putOrderPartitionedRegionUsingCustId(int numPuts) {
    Map<CustId, Order> orderKeyValues = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      Order order = new Order("ORDER" + i);
      try {
        orderRegion.put(custid, order);
        orderKeyValues.put(custid, order);
        assertThat(orderRegion.containsKey(custid)).isTrue();
        assertThat(orderRegion.get(custid)).isEqualTo(order);

      } catch (Exception e) {
        fail(
            "putOrderPartitionedRegionUsingCustId : failed while doing put operation in OrderPartitionedRegion ",
            e);
      }
      logger.info("Order :- { " + custid + " : " + order + " }");
    }
    return orderKeyValues;
  }

  public static Map<OrderId, Order> updateOrderPartitionedRegion(int numPuts) {
    Map<OrderId, Order> orderKeyValues = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      int oid = i + 1;
      OrderId orderId = new OrderId(oid, custid);
      Order order = new Order("ORDER" + oid + "_update");
      try {
        orderRegion.put(orderId, order);
        orderKeyValues.put(orderId, order);
        assertThat(orderRegion.containsKey(orderId)).isTrue();
        assertThat(orderRegion.get(orderId)).isEqualTo(order);

      } catch (Exception e) {
        fail(
            "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
            e);
      }
      logger.info("Order :- { " + orderId + " : " + order + " }");
    }
    return orderKeyValues;
  }

  public static Map<CustId, Order> updateOrderPartitionedRegionUsingCustId(int numPuts) {
    Map<CustId, Order> orderKeyValues = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      Order order = new Order("ORDER" + i + "_update");
      try {
        orderRegion.put(custid, order);
        assertThat(orderRegion.containsKey(custid)).isTrue();
        assertThat(orderRegion.get(custid)).isEqualTo(order);
        orderKeyValues.put(custid, order);
      } catch (Exception e) {
        fail(
            "updateOrderPartitionedRegionUsingCustId : failed while doing put operation in OrderPartitionedRegion ",
            e);
      }
      logger.info("Order :- { " + custid + " : " + order + " }");
    }
    return orderKeyValues;
  }

  public static Map<ShipmentId, Shipment> putShipmentPartitionedRegion(int numPuts) {
    Map<ShipmentId, Shipment> shipmentKeyValue = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      int oid = i + 1;
      OrderId orderId = new OrderId(oid, custid);
      int sid = oid + 1;
      ShipmentId shipmentId = new ShipmentId(sid, orderId);
      Shipment shipment = new Shipment("Shipment" + sid);
      try {
        shipmentRegion.put(shipmentId, shipment);
        assertThat(shipmentRegion.containsKey(shipmentId)).isTrue();
        assertThat(shipmentRegion.get(shipmentId)).isEqualTo(shipment);
        shipmentKeyValue.put(shipmentId, shipment);
      } catch (Exception e) {
        fail(
            "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
            e);
      }
      logger.info("Shipment :- { " + shipmentId + " : " + shipment + " }");
    }
    return shipmentKeyValue;
  }

  public static void putColocatedPartitionedRegion(int numPuts) {
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      Customer customer = new Customer("Customer" + custid, "Address" + custid);
      customerRegion.put(custid, customer);
      int oid = i + 1;
      OrderId orderId = new OrderId(oid, custid);
      Order order = new Order("Order" + orderId);
      orderRegion.put(orderId, order);
      int sid = oid + 1;
      ShipmentId shipmentId = new ShipmentId(sid, orderId);
      Shipment shipment = new Shipment("Shipment" + sid);
      shipmentRegion.put(shipmentId, shipment);
    }
  }

  public static Map<CustId, Shipment> putShipmentPartitionedRegionUsingCustId(int numPuts) {
    Map<CustId, Shipment> shipmentKeyValue = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      Shipment shipment = new Shipment("Shipment" + i);
      try {
        shipmentRegion.put(custid, shipment);
        assertThat(shipmentRegion.containsKey(custid)).isTrue();
        assertThat(shipmentRegion.get(custid)).isEqualTo(shipment);
        shipmentKeyValue.put(custid, shipment);
      } catch (Exception e) {
        fail(
            "putShipmentPartitionedRegionUsingCustId : failed while doing put operation in ShipmentPartitionedRegion ",
            e);
      }
      logger.info("Shipment :- { " + custid + " : " + shipment + " }");
    }
    return shipmentKeyValue;
  }

  public static Map<ShipmentId, Shipment> updateShipmentPartitionedRegion(int numPuts) {
    Map<ShipmentId, Shipment> shipmentKeyValue = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      int oid = i + 1;
      OrderId orderId = new OrderId(oid, custid);
      int sid = oid + 1;
      ShipmentId shipmentId = new ShipmentId(sid, orderId);
      Shipment shipment = new Shipment("Shipment" + sid + "_update");
      try {
        shipmentRegion.put(shipmentId, shipment);
        assertThat(shipmentRegion.containsKey(shipmentId)).isTrue();
        assertThat(shipmentRegion.get(shipmentId)).isEqualTo(shipment);
        shipmentKeyValue.put(shipmentId, shipment);
      } catch (Exception e) {
        fail(
            "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
            e);
      }
      logger.info("Shipment :- { " + shipmentId + " : " + shipment + " }");
    }
    return shipmentKeyValue;
  }

  public static Map<CustId, Shipment> updateShipmentPartitionedRegionUsingCustId(int numPuts) {
    Map<CustId, Shipment> shipmentKeyValue = new HashMap<>();
    for (int i = 1; i <= numPuts; i++) {
      CustId custid = new CustId(i);
      Shipment shipment = new Shipment("Shipment" + i + "_update");
      try {
        shipmentRegion.put(custid, shipment);
        assertThat(shipmentRegion.containsKey(custid)).isTrue();
        assertThat(shipmentRegion.get(custid)).isEqualTo(shipment);
        shipmentKeyValue.put(custid, shipment);
      } catch (Exception e) {
        fail(
            "updateShipmentPartitionedRegionUsingCustId : failed while doing put operation in ShipmentPartitionedRegion ",
            e);
      }
      logger.info("Shipment :- { " + custid + " : " + shipment + " }");
    }
    return shipmentKeyValue;
  }

  public static void doPutsPDXSerializable(String regionName, int numPuts) {
    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
    for (int i = 0; i < numPuts; i++) {
      r.put("Key_" + i, new SimpleClass(i, (byte) i));
    }
  }

  public static void doPutsPDXSerializable2(String regionName, int numPuts) {
    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
    for (int i = 0; i < numPuts; i++) {
      r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i, "" + i, "" + i, i, i));
    }
  }


  public static void doTxPuts(String regionName) {
    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
    CacheTransactionManager mgr = cache.getCacheTransactionManager();

    mgr.begin();
    r.put(0, 0);
    r.put(100, 100);
    r.put(200, 200);
    mgr.commit();
  }

  public static void doTxPutsWithRetryIfError(String regionName, final long putsPerTransaction,
      final long transactions, long offset) {
    Region<Object, Object> r = cache.getRegion(Region.SEPARATOR + regionName);

    long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
    long j = 0;
    CacheTransactionManager mgr = cache.getCacheTransactionManager();
    for (int i = 0; i < transactions; i++) {
      boolean done = false;
      do {
        try {
          mgr.begin();
          for (j = 0; j < putsPerTransaction; j++) {
            long key = keyOffset + ((j + (10L * i)) * 100);
            String value = "Value_" + key;
            r.put(key, value);
          }
          mgr.commit();
          done = true;
        } catch (TransactionException e) {
          logger.info("Something went wrong with transaction [{},{}]. Retrying. Error: {}", i, j,
              e.getMessage());
          e.printStackTrace();
        } catch (IllegalStateException e1) {
          logger.info("Something went wrong with transaction [{},{}]. Retrying. Error: {}", i, j,
              e1.getMessage());
          e1.printStackTrace();
          try {
            mgr.rollback();
            logger.info("Rolled back transaction [{},{}]. Retrying. Error: {}", i, j,
                e1.getMessage());
          } catch (Exception e2) {
            logger.info(
                "Something went wrong when rolling back transaction [{},{}]. Retrying transaction. Error: {}",
                i, j, e2.getMessage());
            e2.printStackTrace();
          }
        }
      } while (!done);
    }
  }

  public static void doNextPuts(String regionName, int start, int numPuts) {
    IgnoredException exp =
        addIgnoredException(CacheClosedException.class.getName());
    try {
      Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
      for (long i = start; i < numPuts; i++) {
        r.put(i, i);
      }
    } finally {
      exp.remove();
    }
  }

  public static void checkQueueSize(String senderId, int numQueueEntries) {
    await()
        .untilAsserted(() -> testQueueSize(senderId, numQueueEntries));
  }

  public static void testQueueSize(String senderId, int numQueueEntries) {
    GatewaySender sender = cache.getGatewaySender(senderId);
    if (sender.isParallel()) {
      int totalSize = 0;
      Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      for (RegionQueue q : queues) {
        ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue) q;
        totalSize += prQ.size();
      }
      assertThat(totalSize).isEqualTo(numQueueEntries);
    } else {
      Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      int size = 0;
      for (RegionQueue q : queues) {
        size += q.size();
      }
      assertThat(size).isEqualTo(numQueueEntries);
    }
  }

  /**
   * To be used only for ParallelGatewaySender.
   *
   * @param senderId Id of the ParallelGatewaySender
   * @param numQueueEntries Expected number of ParallelGatewaySenderQueue entries
   */
  public static void checkPRQLocalSize(String senderId, final int numQueueEntries) {
    GatewaySender sender = null;
    for (GatewaySender s : cache.getGatewaySenders()) {
      if (s.getId().equals(senderId)) {
        sender = s;
        break;
      }
    }
    assertThat(sender).isNotNull();

    if (sender.isParallel()) {
      final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      await().untilAsserted(() -> {
        int size = 0;
        for (RegionQueue q : queues) {
          ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue) q;
          size += prQ.localSize();
        }
        assertThat(size).as(
            " Expected local queue entries: " + numQueueEntries + " but actual entries: " + size)
            .isEqualTo(numQueueEntries);
      });
    }
  }

  /**
   * To be used only for ParallelGatewaySender.
   *
   * @param senderId Id of the ParallelGatewaySender
   */
  public static int getPRQLocalSize(String senderId) {
    GatewaySender sender = null;
    for (GatewaySender s : cache.getGatewaySenders()) {
      if (s.getId().equals(senderId)) {
        sender = s;
        break;
      }
    }
    assertThat(sender).isNotNull();

    if (sender.isParallel()) {
      int totalSize = 0;
      Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      for (RegionQueue q : queues) {
        ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue) q;
        totalSize += prQ.localSize();
      }
      return totalSize;
    }
    return -1;
  }

  public static void doMultiThreadedPuts(String regionName, int numPuts) {
    final AtomicInteger ai = new AtomicInteger(-1);
    final ExecutorService execService = Executors.newFixedThreadPool(5, new ThreadFactory() {
      final AtomicInteger threadNum = new AtomicInteger();

      @Override
      public Thread newThread(final @NotNull Runnable r) {
        Thread result = new Thread(r, "Client Put Thread-" + threadNum.incrementAndGet());
        result.setDaemon(true);
        return result;
      }
    });

    final Region<Integer, Integer> r = cache.getRegion(SEPARATOR + regionName);

    List<Callable<Object>> tasks = new ArrayList<>();
    for (long i = 0; i < 5; i++) {
      tasks.add(new PutTask(r, ai, numPuts));
    }

    try {
      List<Future<Object>> l = execService.invokeAll(tasks);
      for (Future<Object> f : l) {
        f.get();
      }
    } catch (InterruptedException | ExecutionException e1) {
      // TODO: eats exception
      e1.printStackTrace();
    }

    execService.shutdown();
  }

  public static void validateRegionSize(String regionName, final int regionSize) {
    IgnoredException exp =
        addIgnoredException(ForceReattemptException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(CacheClosedException.class.getName());
    try {
      final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
      if (regionSize != r.keySet().size()) {
        await()
            .untilAsserted(() -> assertThat(r.keySet().size()).as(
                "Expected region entries: " + regionSize + " but actual entries: "
                    + r.keySet().size() + " present region keyset " + r.keySet())
                .isEqualTo(regionSize));
      }
    } finally {
      exp.remove();
      exp1.remove();
    }
  }

  public List<Object> getKeys(String regionName) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    return new ArrayList<>(r.keySet());
  }

  public void checkEqualRegionData(String regionName, VM vm1, VM vm2,
      boolean concurrencyChecksEnabled) {
    assertThat(vm1.invoke(() -> getRegionSize(regionName)))
        .isEqualTo(vm2.invoke(() -> getRegionSize(regionName)));
    Map<?, ?> regionData1 = vm1.invoke(() -> getRegionData(regionName));
    Map<?, ?> regionData2 = vm2.invoke(() -> getRegionData(regionName));
    assertThat(regionData1).isEqualTo(regionData2);

    if (concurrencyChecksEnabled) {
      Map<?, ?> regionKeysTimestamps1 = vm1.invoke(() -> getKeysTimestamps(regionName));
      Map<?, ?> regionKeysTimestamps2 = vm2.invoke(() -> getKeysTimestamps(regionName));
      assertThat(regionKeysTimestamps1).isEqualTo(regionKeysTimestamps2);
    }
  }

  private <K, V> Map<K, V> getRegionData(String regionName) {
    final Region<K, V> region = cache.getRegion(SEPARATOR + regionName);
    Map<K, V> map = new HashMap<>();
    for (K key : region.keySet()) {
      map.put(key, region.get(key));
    }
    return map;
  }

  private <K> Map<K, Long> getKeysTimestamps(String regionName) {
    final Region<K, ?> region = cache.getRegion(SEPARATOR + regionName);
    Map<K, Long> map = new HashMap<>();
    for (K key : region.keySet()) {
      map.put(key, getTimestampForEntry(key, regionName));
    }
    return map;
  }

  public static Object getValueForEntry(long key, String regionName) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    return r.get(key);
  }

  public static long getTimestampForEntry(Object key, String regionName) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    if (r.getEntry(key) == null) {
      return 0;
    }
    return r.getEntry(key).getStatistics().getLastModifiedTime();
  }

  public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
    AsyncEventListener theListener = null;

    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
      if (asyncQueueId.equals(asyncQueue.getId())) {
        theListener = asyncQueue.getAsyncEventListener();
      }
    }
    assertThat(theListener).isNotNull();

    final Map<?, ?> eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
    await()
        .untilAsserted(() -> assertThat(eventsMap.size()).as(
            "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size())
            .isEqualTo(expectedSize));
  }

  public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
    AsyncEventQueue theAsyncEventQueue = null;

    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
      if (asyncQueueId.equals(asyncChannel.getId())) {
        theAsyncEventQueue = asyncChannel;
      }
    }

    assertThat(theAsyncEventQueue).isNotNull();

    final GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue).getSender();

    if (sender.isParallel()) {
      final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      await().untilAsserted(() -> {
        int size = 0;
        for (RegionQueue q : queues) {
          size += q.size();
        }
        assertThat(size).as("Expected queue size to be : " + 0 + " but actual entries: " + size)
            .isEqualTo(0);
      });
    } else {
      await().untilAsserted(() -> {
        Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
        int size = 0;
        for (RegionQueue q : queues) {
          size += q.size();
        }
        assertThat(size).as("Expected queue size to be : " + 0 + " but actual entries: " + size)
            .isEqualTo(0);
      });
    }
  }

  public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
    AsyncEventListener theListener = null;

    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
      if (asyncEventQueueId.equals(asyncQueue.getId())) {
        theListener = asyncQueue.getAsyncEventListener();
      }
    }
    assertThat(theListener).isNotNull();

    final Map<?, ?> eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
    logger.info("The events map size is " + eventsMap.size());
    return eventsMap.size();
  }

  public static void validateRegionSize_PDX(String regionName, final int regionSize) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    await().untilAsserted(() -> {
      assertThat((regionSize <= r.keySet().size())).as(
          "Expected region entries: " + regionSize + " but actual entries: "
              + r.keySet().size() + " present region keyset " + r.keySet())
          .isEqualTo(true);
      assertThat((regionSize == r.size())).as(
          "Expected region size: " + regionSize + " but actual size: " + r.size()).isEqualTo(true);
    });
    for (int i = 0; i < regionSize; i++) {
      final int temp = i;
      await()
          .untilAsserted(() -> assertThat(r.get("Key_" + temp)).as(
              "keySet = " + r.keySet() + " values() = " + r.values() + "Region Size = " + r.size())
              .isEqualTo(new SimpleClass(temp, (byte) temp)));
    }
  }

  public static void validateRegionSizeOnly_PDX(String regionName, final int regionSize) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    await()
        .untilAsserted(
            () -> assertThat((regionSize <= r.keySet().size())).as(
                "Expected region entries: " + regionSize + " but actual entries: "
                    + r.keySet().size() + " present region keyset " + r.keySet())
                .isEqualTo(true));
  }

  public static void validateQueueSizeStat(String id, final int queueSize) {
    final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
    await()
        .untilAsserted(() -> assertThat(sender.getEventQueueSize()).isEqualTo(queueSize));
    assertThat(sender.getEventQueueSize()).isEqualTo(queueSize);
  }

  public static void validateSecondaryQueueSizeStat(String id, final int queueSize) {
    final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
    await()
        .untilAsserted(() -> assertThat(sender.getStatistics().getUnprocessedEventMapSize()).as(
            "Expected unprocessedEventMap is drained but actual is "
                + sender.getStatistics().getUnprocessedEventMapSize())
            .isEqualTo(queueSize));
    assertThat(sender.getStatistics().getUnprocessedEventMapSize()).isEqualTo(queueSize);
  }

  /**
   * This method is specifically written for pause and stop operations. This method validates that
   * the region size remains same for at least minimum number of verification attempts and also it
   * remains below a specified limit value. This validation will suffice for testing of pause/stop
   * operations.
   *
   */
  public static void validateRegionSizeRemainsSame(String regionName, final int regionSizeLimit) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    WaitCriterion wc = new WaitCriterion() {
      final int MIN_VERIFICATION_RUNS = 20;
      int sameRegionSizeCounter = 0;
      long previousSize = -1;

      @Override
      public boolean done() {
        if (r.keySet().size() == previousSize) {
          sameRegionSizeCounter++;
          int s = r.keySet().size();
          // if the sameRegionSizeCounter exceeds the minimum verification runs and regionSize is
          // below specified limit, then return true
          return sameRegionSizeCounter >= MIN_VERIFICATION_RUNS && s <= regionSizeLimit;

        } else { // current regionSize is not same as recorded previous regionSize
          previousSize = r.keySet().size(); // update the previousSize variable with current region
                                            // size
          sameRegionSizeCounter = 0;// reset the sameRegionSizeCounter
          return false;
        }
      }

      @Override
      public String description() {
        return "Expected region size to remain same below a specified limit but actual region size does not remain same or exceeded the specified limit "
            + sameRegionSizeCounter + " :regionSize " + previousSize;
      }
    };
    await().untilAsserted(wc);
  }

  public static String getRegionFullPath(String regionName) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    return r.getFullPath();
  }

  public static Integer getRegionSize(String regionName) {
    final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
    return r.keySet().size();
  }

  public static <K, V> void validateRegionContents(String regionName, final Map<K, V> keyValues) {
    final Region<K, V> r = cache.getRegion(SEPARATOR + regionName);
    await().untilAsserted(() -> {
      boolean matchFlag = true;
      for (K key : keyValues.keySet()) {
        if (!r.get(key).equals(keyValues.get(key))) {
          logger.info("The values are for key " + "  " + key + " " + r.get(key) + " in the map "
              + keyValues.get(key));
          matchFlag = false;
        }
      }
      assertThat(matchFlag).as("Expected region entries doesn't match").isEqualTo(true);
    });
  }



  public static void doHeavyPuts(String regionName, int numPuts) {
    Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
    // GatewaySender.DEFAULT_BATCH_SIZE * OBJECT_SIZE should be more than MAXIMUM_QUEUE_MEMORY
    // to guarantee overflow
    for (long i = 0; i < numPuts; i++) {
      r.put(i, new byte[1024 * 1024]);
    }
  }

  public static void addCacheListenerAndDestroyRegion(String regionName) {
    final Region<Long, Object> region = cache.getRegion(SEPARATOR + regionName);
    CacheListenerAdapter<Long, Object> cl = new CacheListenerAdapter<Long, Object>() {
      @Override
      public void afterCreate(EntryEvent<Long, Object> event) {
        if (event.getKey() == 99) {
          region.destroyRegion();
        }
      }
    };
    region.getAttributesMutator().addCacheListener(cl);
  }

  public static Boolean killSender(String senderId) {
    final IgnoredException exln = addIgnoredException("Could not connect");
    IgnoredException exp =
        addIgnoredException(CacheClosedException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      AbstractGatewaySender sender = (AbstractGatewaySender) getGatewaySender(senderId);
      if (sender.isPrimary()) {
        logger.info("Gateway sender is killed by a test");
        cache.getDistributedSystem().disconnect();
        return Boolean.TRUE;
      }
      return Boolean.FALSE;
    } finally {
      exp.remove();
      exp1.remove();
      exln.remove();
    }
  }

  public static void killSender() {
    logger.info("Gateway sender is going to be killed by a test");
    cache.close();
    cache.getDistributedSystem().disconnect();
    logger.info("Gateway sender is killed by a test");
  }

  public static void checkAllSiteMetaData(
      Map<Integer, Set<InetSocketAddress>> dsIdToLocatorAddresses, final int siteSizeToCheck) {
    List<Locator> locatorsConfigured = Locator.getLocators();
    Locator locator = locatorsConfigured.get(0);
    await().untilAsserted(() -> {
      Map<Integer, Set<DistributionLocatorId>> allSiteMetaData =
          ((InternalLocator) locator).getLocatorMembershipListener().getAllLocatorsInfo();
      assertThat(allSiteMetaData.size()).isEqualTo(siteSizeToCheck);
      for (Map.Entry<Integer, Set<InetSocketAddress>> entry : dsIdToLocatorAddresses.entrySet()) {
        Set<DistributionLocatorId> foundLocatorIds = allSiteMetaData.get(entry.getKey());
        Set<InetSocketAddress> expectedLocators = entry.getValue();
        final Set<InetSocketAddress> foundLocators = foundLocatorIds.stream()
            .map(distributionLocatorId -> new InetSocketAddress(
                distributionLocatorId.getHostnameForClients(), distributionLocatorId.getPort()))
            .collect(Collectors.toSet());
        assertThat(foundLocators).isEqualTo(expectedLocators);
      }
    });
  }

  public static Long checkAllSiteMetaDataFor3Sites(final Map<Integer, Set<String>> dsVsPort) {
    await()
        .untilAsserted(
            () -> assertThat((getSystemStatic() != null)).as("System is not initialized")
                .isEqualTo(true));
    List<Locator> locatorsConfigured = Locator.getLocators();
    Locator locator = locatorsConfigured.get(0);
    LocatorMembershipListener listener = ((InternalLocator) locator).getLocatorMembershipListener();
    assertThat(listener)
        .as("No locator membership listener available. WAN is likely not enabled. Is this test in the WAN project?")
        .isNotNull();
    final Map<Integer, Set<DistributionLocatorId>> allSiteMetaData = listener.getAllLocatorsInfo();
    System.out.println("allSiteMetaData : " + allSiteMetaData);

    await().untilAsserted(() -> {
      assertThat((dsVsPort.size() == allSiteMetaData.size())).isEqualTo(true);
      boolean completeFlag = true;
      for (Map.Entry<Integer, Set<String>> entry : dsVsPort.entrySet()) {
        Set<DistributionLocatorId> locators = allSiteMetaData.get(entry.getKey());
        for (String locatorInMetaData : entry.getValue()) {
          DistributionLocatorId locatorId = DistributionLocatorId.unmarshal(locatorInMetaData);
          if (!locators.contains(locatorId)) {
            completeFlag = false;
            break;
          }
        }
        if (!completeFlag) {
          break;
        }
      }
      assertThat(completeFlag).as(
          "Expected site Metadata: " + dsVsPort + " but actual meta data: " + allSiteMetaData)
          .isEqualTo(true);
    });
    return System.currentTimeMillis();
  }

  public static void putRemoteSiteLocators(int remoteDsId, Set<String> remoteLocators) {
    List<Locator> locatorsConfigured = Locator.getLocators();
    Locator locator = locatorsConfigured.get(0);
    if (remoteLocators != null) {
      // Add fake remote locators to the locators map
      ((InternalLocator) locator).getLocatorMembershipListener().getAllServerLocatorsInfo()
          .put(remoteDsId, remoteLocators);
    }
  }

  public static void checkLocatorsinSender(String senderId, InetSocketAddress locatorToWaitFor)
      throws InterruptedException {

    GatewaySender sender = getGatewaySender(senderId);

    MyLocatorCallback callback =
        (MyLocatorCallback) ((AbstractGatewaySender) sender).getLocatorDiscoveryCallback();

    boolean discovered = callback.waitForDiscovery(locatorToWaitFor, MAX_WAIT);
    assertThat(discovered).as("Waited " + MAX_WAIT + " for " + locatorToWaitFor
        + " to be discovered on client. List is now: " + callback.getDiscovered()).isTrue();
  }

  public static void validateQueueContents(final String senderId, final int regionSize) {
    IgnoredException exp1 =
        addIgnoredException(InterruptedException.class.getName());
    IgnoredException exp2 =
        addIgnoredException(GatewaySenderException.class.getName());
    try {
      GatewaySender sender = getGatewaySender(senderId);

      if (!sender.isParallel()) {
        final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
        await().untilAsserted(() -> {
          int size = 0;
          for (RegionQueue q : queues) {
            size += q.size();
          }
          assertThat(size).as(
              "Expected queue entries: " + regionSize + " but actual entries: " + size)
              .isEqualTo(regionSize);
        });
      } else if (sender.isParallel()) {
        final RegionQueue regionQueue;
        regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
        await().untilAsserted(() -> assertThat(regionQueue.size()).as(
            "Expected queue entries: " + regionSize + " but actual entries: " + regionQueue.size())
            .isEqualTo(regionSize));
      }
    } finally {
      exp1.remove();
      exp2.remove();
    }

  }

  public static String displayQueueContent(final RegionQueue queue) {
    if (queue instanceof ParallelGatewaySenderQueue) {
      ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue;
      return pgsq.displayContent();
    } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
      ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue;
      return pgsq.displayContent();
    }
    return null;
  }

  public static String displaySerialQueueContent(final AbstractGatewaySender sender) {
    StringBuilder message = new StringBuilder();
    message.append("Is Primary: ").append(sender.isPrimary()).append(", ").append("Queue Size: ")
        .append(sender.getEventQueueSize());

    if (sender.getQueues() != null) {
      message.append(", ").append("Queue Count: ").append(sender.getQueues().size());
      Stream<Object> stream = sender.getQueues().stream()
          .map(regionQueue -> ((SerialGatewaySenderQueue) regionQueue).displayContent());

      List<Object> list = stream.collect(Collectors.toList());
      message.append(", ").append("Keys: ").append(list);
    }

    AbstractGatewaySenderEventProcessor abstractProcessor = sender.getEventProcessor();
    if (abstractProcessor == null) {
      message.append(", ").append("Null Event Processor: ");
    }
    if (!sender.isPrimary()) {
      message.append("\n").append("Unprocessed Events: ")
          .append(abstractProcessor.printUnprocessedEvents()).append("\n");
      message.append("\n").append("Unprocessed Tokens: ")
          .append(abstractProcessor.printUnprocessedTokens()).append("\n");
    }

    return message.toString();
  }

  public static Integer getQueueContentSize(final String senderId) {
    return getQueueContentSize(senderId, false);
  }

  public static Integer getQueueContentSize(final String senderId, boolean includeSecondary) {
    GatewaySender sender = getGatewaySender(senderId);

    if (!sender.isParallel()) {
      // if sender is serial, the queues will be all primary or all secondary at one member
      final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      int size = 0;
      for (RegionQueue q : queues) {
        size += q.size();
      }
      return size;
    } else if (sender.isParallel()) {
      RegionQueue regionQueue =
          ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) {
        return ((ConcurrentParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
      } else if (regionQueue instanceof ParallelGatewaySenderQueue) {
        return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
      } else {
        fail("Not implemented yet");
      }
    }
    fail("Not yet implemented?");
    return 0;
  }

  public static void validateParallelSenderQueueBucketSize(final String senderId,
      final int bucketSize) {
    GatewaySender sender = getGatewaySender(senderId);
    RegionQueue regionQueue =
        ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
    Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
        .getAllLocalPrimaryBucketRegions();
    for (BucketRegion bucket : buckets) {
      assertThat(bucket.keySet().size()).as(
          "Expected bucket entries for bucket " + bucket.getId() + " is different than actual.")
          .isEqualTo(bucketSize);
    }
  }

  public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) {
    IgnoredException exp =
        addIgnoredException(RegionDestroyedException.class.getName());
    IgnoredException exp1 =
        addIgnoredException(ForceReattemptException.class.getName());
    try {
      GatewaySender sender = getGatewaySender(senderId);
      final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
      RegionQueue queue = abstractSender.getEventProcessor().queue;
      await().untilAsserted(() -> assertThat(abstractSender.getEventQueueSize()).as(
          "Expected events in all primary queues are drained but actual is "
              + abstractSender.getEventQueueSize() + ". Queue content is: "
              + displayQueueContent(queue))
          .isEqualTo(0));
      assertThat(abstractSender.getEventQueueSize()).as(
          "Expected events in all primary queues after drain is 0").isEqualTo(0);
      await().untilAsserted(() -> assertThat(abstractSender.getSecondaryEventQueueSize()).as(
          "Expected events in all secondary queues are drained but actual is "
              + abstractSender.getSecondaryEventQueueSize() + ". Queue content is: "
              + displayQueueContent(queue))
          .isEqualTo(0));
      assertThat(abstractSender.getSecondaryEventQueueSize()).as(
          "Expected events in all secondary queues after drain is 0").isEqualTo(0);
    } finally {
      exp.remove();
      exp1.remove();
    }

  }

  public static Integer validateAfterAck(final String senderId) {
    GatewaySender sender = getGatewaySender(senderId);

    final MyGatewayEventFilter_AfterAck filter =
        (MyGatewayEventFilter_AfterAck) sender.getGatewayEventFilters().get(0);
    return filter.getAckList().size();
  }

  public static int verifyAndGetEventsDispatchedByConcurrentDispatchers(final String senderId) {
    GatewaySender sender = getGatewaySender(senderId);
    ConcurrentParallelGatewaySenderEventProcessor cProc =
        (ConcurrentParallelGatewaySenderEventProcessor) ((AbstractGatewaySender) sender)
            .getEventProcessor();
    if (cProc == null) {
      return 0;
    }

    int totalDispatched = 0;
    for (ParallelGatewaySenderEventProcessor lProc : cProc.getProcessors()) {
      totalDispatched += lProc.getNumEventsDispatched();
    }
    assertThat(totalDispatched > 0).isTrue();
    return totalDispatched;
  }

  public static Long getNumberOfEntriesOverflownToDisk(final String senderId) {
    GatewaySender sender = getGatewaySender(senderId);

    long numEntries = 0;
    if (sender.isParallel()) {
      RegionQueue regionQueue;
      regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      numEntries = ((ConcurrentParallelGatewaySenderQueue) regionQueue)
          .getNumEntriesOverflowOnDiskTestOnly();
    }
    return numEntries;
  }

  public static Long getNumberOfEntriesInVM(final String senderId) {
    GatewaySender sender = getGatewaySender(senderId);
    RegionQueue regionQueue;
    long numEntries = 0;
    if (sender.isParallel()) {
      regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      numEntries = ((ConcurrentParallelGatewaySenderQueue) regionQueue).getNumEntriesInVMTestOnly();
    }
    return numEntries;
  }

  public static int getNumOfPossibleDuplicateEvents(final String senderId) {
    GatewaySender sender = getGatewaySender(senderId);
    RegionQueue regionQueue;
    int numEntries = 0;
    if (sender.isParallel()) {
      regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      numEntries =
          ((ConcurrentParallelGatewaySenderQueue) regionQueue).getNumOfPossibleDuplicateEvents();
    }
    return numEntries;
  }

  public static void verifyTmpDroppedEventSize(String senderId, int size) {
    GatewaySender sender = getGatewaySender(senderId);

    AbstractGatewaySender ags = (AbstractGatewaySender) sender;
    await().untilAsserted(() -> assertThat(ags.getTmpDroppedEventSize()).as(
        "Expected tmpDroppedEvents size: " + size
            + " but actual size: " + ags.getTmpDroppedEventSize())
        .isEqualTo(size));
  }

  /**
   * Checks that the bucketToTempQueueMap for a partitioned region
   * that holds events for buckets that are not available locally, is empty.
   */
  public static void validateEmptyBucketToTempQueueMap(String senderId) {
    GatewaySender sender = getGatewaySender(senderId);

    int size = 0;
    Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
    for (Object queue : queues) {
      PartitionedRegion region =
          (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) queue).getRegion();
      int buckets = region.getTotalNumberOfBuckets();
      for (int bucket = 0; bucket < buckets; bucket++) {
        BlockingQueue<GatewaySenderEventImpl> newQueue =
            ((ConcurrentParallelGatewaySenderQueue) queue).getBucketTmpQueue(bucket);
        if (newQueue != null) {
          size += newQueue.size();
        }
      }
    }

    final int finalSize = size;
    assertThat(finalSize).as("Expected elements in TempQueueMap: " + 0
        + " but actual size: " + finalSize).isEqualTo(0);

  }

  private static GatewaySender getGatewaySender(String senderId) {
    Set<GatewaySender> senders = cache.getGatewaySenders();
    GatewaySender sender = null;
    for (GatewaySender s : senders) {
      if (s.getId().equals(senderId)) {
        sender = s;
        break;
      }
    }
    return sender;
  }

  public static void verifyQueueSize(String senderId, int size) {
    GatewaySender sender = getGatewaySender(senderId);

    if (!sender.isParallel()) {
      final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      int queueSize = 0;
      for (RegionQueue q : queues) {
        queueSize += q.size();
      }

      assertThat(queueSize).as("verifyQueueSize failed for sender " + senderId).isEqualTo(size);
    } else if (sender.isParallel()) {
      RegionQueue regionQueue =
          ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      assertThat(regionQueue.size()).as("verifyQueueSize failed for sender " + senderId)
          .isEqualTo(size);
    }
  }

  public static void verifyRegionQueueNotEmpty(String senderId) {
    GatewaySender sender = getGatewaySender(senderId);

    if (!sender.isParallel()) {
      final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      int queueSize = 0;
      for (RegionQueue q : queues) {
        queueSize += q.size();
      }
      assertThat(queues.size() > 0).isTrue();
      assertThat(queueSize > 0).isTrue();
    } else if (sender.isParallel()) {
      RegionQueue regionQueue =
          ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
      assertThat(regionQueue.size() > 0).isTrue();
    }
  }

  public static void waitForConcurrentSerialSenderQueueToDrain(String senderId) {
    Set<GatewaySender> senders = cache.getGatewaySenders();
    GatewaySender sender =
        senders.stream().filter(s -> s.getId().equals(senderId)).findFirst().get();

    await().untilAsserted(() -> {
      Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
      for (RegionQueue q : queues) {
        assertThat(q.size()).isEqualTo(0);
      }
    });
  }

  /**
   * Test methods for sender operations
   *
   */
  public static void verifySenderPausedState(String senderId) {
    GatewaySender sender = cache.getGatewaySender(senderId);
    assertThat(sender.isPaused()).isTrue();
  }

  public static void verifySenderResumedState(String senderId) {
    GatewaySender sender = cache.getGatewaySender(senderId);
    assertThat(sender.isPaused()).isFalse();
    assertThat(sender.isRunning()).isTrue();
  }

  public static void verifySenderStoppedState(String senderId) {
    GatewaySender sender = cache.getGatewaySender(senderId);
    assertThat(sender.isRunning()).isFalse();
  }

  public static void verifySenderRunningState(String senderId) {
    GatewaySender sender = cache.getGatewaySender(senderId);
    assertThat(sender.isRunning()).isTrue();
  }

  public static void verifySenderConnectedState(String senderId, boolean shouldBeConnected) {
    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
    if (shouldBeConnected) {
      assertThat(sender.getEventProcessor().getDispatcher().isConnectedToRemote()).isTrue();
    } else {
      assertThat(sender.getEventProcessor().getDispatcher().isConnectedToRemote()).isFalse();
    }
  }

  public static void verifyPool(String senderId, boolean poolShouldExist,
      int expectedPoolLocatorsSize) {
    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
    PoolImpl pool = sender.getProxy();
    if (poolShouldExist) {
      assertThat(pool.getLocators().size()).isEqualTo(expectedPoolLocatorsSize);
    } else {
      assertThat(pool).isNull();
    }
  }

  public static void removeSenderFromTheRegion(String senderId, String regionName) {
    Region<?, ?> region = cache.getRegion(regionName);
    region.getAttributesMutator().removeGatewaySenderId(senderId);
  }

  public static void destroySender(String senderId) {
    GatewaySender sender = getGatewaySender(senderId);
    sender.destroy();
  }

  public static void verifySenderDestroyed(String senderId, boolean isParallel) {
    AbstractGatewaySender sender = (AbstractGatewaySender) getGatewaySender(senderId);
    assertThat(sender).isNull();

    final String queueRegionNameSuffix;
    if (isParallel) {
      queueRegionNameSuffix = ParallelGatewaySenderQueue.QSTRING;
    } else {
      queueRegionNameSuffix = "_SERIAL_GATEWAY_SENDER_QUEUE";
    }

    assertThat(((GemFireCacheImpl) cache).getAllRegions()).allSatisfy(
        r -> assertThat(r.getName())
            .as("Region underlying the sender is not destroyed.")
            .doesNotContain(senderId + queueRegionNameSuffix));
  }

  public static void destroyAsyncEventQueue(String id) {
    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(id);
    aeq.destroy();
  }

  protected static void verifyListenerEvents(final long expectedNumEvents) {
    await()
        .until(() -> listener1.getNumEvents() == expectedNumEvents);
  }

  protected Integer[] createLNAndNYLocators() {
    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
    Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
    return new Integer[] {lnPort, nyPort};
  }

  protected void validateRegionSizes(String regionName, int expectedRegionSize, VM... vms) {
    for (VM vm : vms) {
      vm.invoke(() -> validateRegionSize(regionName, expectedRegionSize));
    }
  }

  public static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {

    private final Set<InetSocketAddress> discoveredLocators = new HashSet<>();

    private final Set<InetSocketAddress> removedLocators = new HashSet<>();

    @SuppressWarnings("unchecked")
    @Override
    public synchronized void locatorsDiscovered(@SuppressWarnings("rawtypes") List locators) {
      discoveredLocators.addAll(locators);
      notifyAll();
    }

    @SuppressWarnings("unchecked")
    @Override
    public synchronized void locatorsRemoved(@SuppressWarnings("rawtypes") List locators) {
      removedLocators.addAll(locators);
      notifyAll();
    }

    public boolean waitForDiscovery(InetSocketAddress locator, long time)
        throws InterruptedException {
      return waitFor(discoveredLocators, locator, time);
    }


    private synchronized boolean waitFor(Set<InetSocketAddress> set, InetSocketAddress locator,
        long time)
        throws InterruptedException {
      long remaining = time;
      long endTime = System.currentTimeMillis() + time;
      while (!set.contains(locator) && remaining >= 0) {
        wait(remaining);
        remaining = endTime - System.currentTimeMillis();
      }
      return set.contains(locator);
    }

    public synchronized Set<InetSocketAddress> getDiscovered() {
      return new HashSet<>(discoveredLocators);
    }

  }

  protected static class PutTask implements Callable<Object> {
    private final Region<Integer, Integer> region;

    private final AtomicInteger key_value;

    private final int numPuts;

    public PutTask(Region<Integer, Integer> region, AtomicInteger key_value, int numPuts) {
      this.region = region;
      this.key_value = key_value;
      this.numPuts = numPuts;
    }

    @Override
    public Object call() {
      while (true) {
        int key = key_value.incrementAndGet();
        if (key < numPuts) {
          region.put(key, key);
        } else {
          break;
        }
      }
      return null;
    }
  }

  public static class MyGatewayEventFilter implements GatewayEventFilter, Serializable {

    String Id = "MyGatewayEventFilter";

    boolean beforeEnqueueInvoked;
    boolean beforeTransmitInvoked;
    boolean afterAckInvoked;

    public MyGatewayEventFilter() {}

    @Override
    public boolean beforeEnqueue(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      beforeEnqueueInvoked = true;
      return !((Long) event.getKey() >= 500 && (Long) event.getKey() < 600);
    }

    @Override
    public boolean beforeTransmit(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      beforeTransmitInvoked = true;
      return !((Long) event.getKey() >= 600 && (Long) event.getKey() < 700);
    }

    @Override
    public void close() {}

    public String toString() {
      return Id;
    }

    @Override
    public void afterAcknowledgement(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      afterAckInvoked = true;
    }

    public boolean equals(Object obj) {
      if (this == obj) {
        return true;
      }
      if (!(obj instanceof MyGatewayEventFilter)) {
        return false;
      }
      MyGatewayEventFilter filter = (MyGatewayEventFilter) obj;
      return Id.equals(filter.Id);
    }
  }

  public static class MyGatewayEventFilter_AfterAck implements GatewayEventFilter, Serializable {

    String Id = "MyGatewayEventFilter_AfterAck";

    ConcurrentSkipListSet<Long> ackList = new ConcurrentSkipListSet<>();

    public MyGatewayEventFilter_AfterAck() {}

    @Override
    public boolean beforeEnqueue(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      return true;
    }

    @Override
    public boolean beforeTransmit(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      return true;
    }

    @Override
    public void close() {}

    public String toString() {
      return Id;
    }

    @Override
    public void afterAcknowledgement(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      ackList.add((Long) event.getKey());
    }

    public Set<Long> getAckList() {
      return ackList;
    }

    public boolean equals(Object obj) {
      if (this == obj) {
        return true;
      }
      if (!(obj instanceof MyGatewayEventFilter)) {
        return false;
      }
      MyGatewayEventFilter filter = (MyGatewayEventFilter) obj;
      return Id.equals(filter.Id);
    }
  }

  public static class PDXGatewayEventFilter implements GatewayEventFilter, Serializable {

    String Id = "PDXGatewayEventFilter";

    public int beforeEnqueueInvoked;
    public int beforeTransmitInvoked;
    public int afterAckInvoked;

    public PDXGatewayEventFilter() {}

    @Override
    public boolean beforeEnqueue(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      System.out.println("Invoked enqueue for " + event);
      beforeEnqueueInvoked++;
      return true;
    }

    @Override
    public boolean beforeTransmit(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      System.out.println("Invoked transmit for " + event);
      beforeTransmitInvoked++;
      return true;
    }

    @Override
    public void close() {}

    public String toString() {
      return Id;
    }

    @Override
    public void afterAcknowledgement(@SuppressWarnings("rawtypes") GatewayQueueEvent event) {
      System.out.println("Invoked afterAck for " + event);
      afterAckInvoked++;
    }

    public boolean equals(Object obj) {
      if (this == obj) {
        return true;
      }
      if (!(obj instanceof MyGatewayEventFilter)) {
        return false;
      }
      MyGatewayEventFilter filter = (MyGatewayEventFilter) obj;
      return Id.equals(filter.Id);
    }
  }

  @Override
  public final void preTearDown() throws Exception {
    cleanupVM();
    List<AsyncInvocation<?>> invocations = new ArrayList<>();
    final Host host = getHost(0);
    for (int i = 0; i < host.getVMCount(); i++) {
      invocations.add(host.getVM(i).invokeAsync(WANTestBase::cleanupVM));
    }
    for (AsyncInvocation<?> invocation : invocations) {
      invocation.await();
    }
  }

  public static void cleanupVM() {
    if (Locator.hasLocator()) {
      Locator.getLocator().stop();
    }
    closeCache();
    JUnit4DistributedTestCase.cleanDiskDirs();
  }

  public static void closeCache() {
    if (cache != null && !cache.isClosed()) {
      cache.close();
      cache.getDistributedSystem().disconnect();
      cache = null;
    } else {
      WANTestBase test = new WANTestBase();
      if (test.isConnectedToDS()) {
        test.getSystem().disconnect();
      }
    }
  }

  public static void deletePDXDir() throws IOException {
    File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
    FileUtils.deleteDirectory(pdxDir);
  }

  public static void shutdownLocator() {
    WANTestBase test = new WANTestBase();
    test.getSystem().disconnect();
  }

  public static void printEventListenerMap() {
    ((MyGatewaySenderEventListener) eventListener1).printMap();
  }

  @Override
  public Properties getDistributedSystemProperties() {
    // For now all WANTestBase tests allocate off-heap memory even though
    // many of them never use it.
    // The problem is that WANTestBase has static methods that create instances
    // of WANTestBase (instead of instances of the subclass). So we can't override
    // this method so that only the off-heap subclasses allocate off heap memory.
    Properties props = new Properties();
    props.setProperty(OFF_HEAP_MEMORY_SIZE, "200m");

    return props;
  }

  /**
   * Returns true if the test should create off-heap regions. OffHeap tests should over-ride this
   * method and return false.
   */
  public boolean isOffHeap() {
    return false;
  }

  /**
   * Checks whether a Async Queue MBean is created or not
   *
   * @param vm reference to VM
   */
  public static void checkAsyncQueueMBean(final VM vm, final boolean shouldExist) {
    SerializableRunnable checkAsyncQueueMBean =
        new SerializableRunnable("Check Async Queue MBean") {
          @Override
          public void run() {
            ManagementService service = ManagementService.getManagementService(cache);
            AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean("pn");
            if (shouldExist) {
              assertThat(bean).isNotNull();
            } else {
              assertThat(bean).isNull();
            }
          }
        };
    vm.invoke(checkAsyncQueueMBean);
  }

  /**
   * Checks Proxy GatewayReceiver
   *
   * @param vm reference to VM
   */
  public static void checkProxyReceiver(final VM vm, final DistributedMember senderMember) {
    SerializableRunnable checkProxySender = new SerializableRunnable("Check Proxy Receiver") {
      @Override
      public void run() {
        ManagementService service = ManagementService.getManagementService(cache);
        GatewayReceiverMXBean bean = null;
        try {
          bean = MBeanUtil.getGatewayReceiverMbeanProxy(senderMember);
        } catch (Exception e) {
          fail("Could not obtain Sender Proxy in desired time " + e);
        }
        assertThat(bean).isNotNull();
        final ObjectName receiverMBeanName = service.getGatewayReceiverMBeanName(senderMember);
        try {
          MBeanUtil.printBeanDetails(receiverMBeanName);
        } catch (Exception e) {
          fail("Error while Printing Bean Details " + e);
        }

      }
    };
    vm.invoke(checkProxySender);
  }

  /**
   * Checks Proxy GatewaySender
   *
   * @param vm reference to VM
   */
  public static void checkProxySender(final VM vm, final DistributedMember senderMember) {
    SerializableRunnable checkProxySender = new SerializableRunnable("Check Proxy Sender") {
      @Override
      public void run() {
        ManagementService service = ManagementService.getManagementService(cache);
        GatewaySenderMXBean bean = null;
        try {
          bean = MBeanUtil.getGatewaySenderMbeanProxy(senderMember, "pn");
        } catch (Exception e) {
          fail("Could not obtain Sender Proxy in desired time " + e);
        }
        assertThat(bean).isNotNull();
        final ObjectName senderMBeanName = service.getGatewaySenderMBeanName(senderMember, "pn");
        try {
          MBeanUtil.printBeanDetails(senderMBeanName);
        } catch (Exception e) {
          fail("Error while Printing Bean Details " + e);
        }

        if (service.isManager()) {
          DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
          await().untilAsserted(() -> {
            Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus();
            assertThat(dsMap).doesNotContainValue(false);
            // dsMap.entrySet().forEach(entry -> assertThat(entry.getValue()).as(
            // "Should be true " + entry.getKey()).isTrue());
          });
        }

      }
    };
    vm.invoke(checkProxySender);
  }

  /**
   * Checks whether a GatewayReceiverMBean is created or not
   *
   * @param vm reference to VM
   */
  public static void checkReceiverMBean(final VM vm) {
    SerializableRunnable checkMBean = new SerializableRunnable("Check Receiver MBean") {
      @Override
      public void run() {
        ManagementService service = ManagementService.getManagementService(cache);
        GatewayReceiverMXBean bean = service.getLocalGatewayReceiverMXBean();
        assertThat(bean).isNotNull();
      }
    };
    vm.invoke(checkMBean);
  }

  public static void checkReceiverNavigationAPIS(final VM vm,
      final DistributedMember receiverMember) {
    SerializableRunnable checkNavigationAPIS =
        new SerializableRunnable("Check Receiver Navigation APIs") {
          @Override
          public void run() {
            ManagementService service = ManagementService.getManagementService(cache);
            DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
            ObjectName expectedName = service.getGatewayReceiverMBeanName(receiverMember);
            try {
              await("wait for member to be added to DistributedSystemBridge membership list")
                  .untilAsserted(
                      () -> assertThat(bean.fetchGatewayReceiverObjectName(receiverMember.getId()))
                          .isNotNull());
              ObjectName actualName = bean.fetchGatewayReceiverObjectName(receiverMember.getId());
              assertThat(actualName).isEqualTo(expectedName);
            } catch (Exception e) {
              fail("Receiver Navigation Failed " + e);
            }

            assertThat(bean.listGatewayReceiverObjectNames().length).isEqualTo(1);

          }
        };
    vm.invoke(checkNavigationAPIS);
  }

  /**
   * Checks whether a GatewayReceiverMBean is created or not
   *
   * @param vm reference to VM
   */
  public static void checkSenderMBean(final VM vm, final String regionPath, boolean connected) {
    SerializableRunnable checkMBean = new SerializableRunnable("Check Sender MBean") {
      @Override
      public void run() {
        ManagementService service = ManagementService.getManagementService(cache);

        GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
        assertThat(bean).isNotNull();
        await()
            .untilAsserted(() -> assertThat(bean.isConnected()).isEqualTo(connected));

        ObjectName regionBeanName = service.getRegionMBeanName(
            cache.getDistributedSystem().getDistributedMember(), SEPARATOR + regionPath);
        RegionMXBean rBean = service.getMBeanInstance(regionBeanName, RegionMXBean.class);
        assertThat(rBean.isGatewayEnabled()).isTrue();


      }
    };
    vm.invoke(checkMBean);
  }

  public static void checkSenderNavigationAPIS(final VM vm, final DistributedMember senderMember) {
    SerializableRunnable checkNavigationAPIS =
        new SerializableRunnable("Check Sender Navigation APIs") {
          @Override
          public void run() {
            ManagementService service = ManagementService.getManagementService(cache);
            DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
            ObjectName expectedName = service.getGatewaySenderMBeanName(senderMember, "pn");
            try {
              ObjectName actualName = bean.fetchGatewaySenderObjectName(senderMember.getId(), "pn");
              assertThat(actualName).isEqualTo(expectedName);
            } catch (Exception e) {
              fail("Sender Navigation Failed " + e);
            }

            assertThat(bean.listGatewaySenderObjectNames().length).isEqualTo(2);
            try {
              assertThat(bean.listGatewaySenderObjectNames(senderMember.getId()).length).isEqualTo(
                  1);
            } catch (Exception e) {
              fail("Sender Navigation Failed " + e);
            }

          }
        };
    vm.invoke(checkNavigationAPIS);
  }

  /**
   * start a gateway sender
   *
   * @param vm reference to VM
   */
  public static void startGatewaySender(final VM vm) {
    SerializableRunnable stopGatewaySender = new SerializableRunnable("Start Gateway Sender") {
      @Override
      public void run() {
        ManagementService service = ManagementService.getManagementService(cache);
        GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
        bean.start();
        assertThat(bean.isRunning()).isTrue();
      }
    };
    vm.invoke(stopGatewaySender);
  }

  /**
   * stops a gateway sender
   *
   * @param vm reference to VM
   */
  public static void stopGatewaySender(final VM vm) {
    SerializableRunnable stopGatewaySender = new SerializableRunnable("Stop Gateway Sender") {
      @Override
      public void run() {
        ManagementService service = ManagementService.getManagementService(cache);
        GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
        bean.stop();
        assertThat(bean.isRunning()).isFalse();
      }
    };
    vm.invoke(stopGatewaySender);
  }

  /**
   * Checks Proxy Async Queue
   *
   * @param vm reference to VM
   */
  public static void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember,
      final boolean shouldExist) {
    SerializableRunnable checkProxyAsyncQueue =
        new SerializableRunnable("Check Proxy Async Queue") {
          @Override
          public void run() {
            SystemManagementService service =
                (SystemManagementService) ManagementService.getManagementService(cache);
            final ObjectName queueMBeanName =
                service.getAsyncEventQueueMBeanName(senderMember, "pn");
            AsyncEventQueueMXBean bean = null;
            if (shouldExist) {
              // Verify the MBean proxy exists
              try {
                bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn");
              } catch (Exception e) {
                fail("Could not obtain Sender Proxy in desired time " + e);
              }
              assertThat(bean).isNotNull();

              try {
                MBeanUtil.printBeanDetails(queueMBeanName);
              } catch (Exception e) {
                fail("Error while Printing Bean Details " + e);
              }
            } else {
              // Verify the MBean proxy doesn't exist
              bean = service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class);
              assertThat(bean).isNull();
            }
          }
        };
    vm.invoke(checkProxyAsyncQueue);
  }

  public static DistributedMember getMember() {
    return ((GemFireCacheImpl) cache).getMyId();
  }

  public static ManagementService getManagementService() {
    return ManagementService.getManagementService(cache);
  }

  /**
   * Checks Proxy GatewaySender
   *
   * @param vm reference to VM
   */
  public static void checkRemoteClusterStatus(final VM vm, final DistributedMember senderMember) {
    SerializableRunnable checkProxySender = new SerializableRunnable("DS Map Size") {
      @Override
      public void run() {
        await().untilAsserted(() -> {
          final ManagementService service = ManagementService.getManagementService(cache);
          final DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
          assertThat(dsBean != null).as(
              "Failed while waiting for getDistributedSystemMXBean to complete and get results")
              .isEqualTo(true);
        });
        ManagementService service = ManagementService.getManagementService(cache);
        final DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
        Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus();
        logger.info("Ds Map is: " + dsMap.size());
        assertThat(dsMap.size() > 0).isEqualTo(true);
      }
    };
    vm.invoke(checkProxySender);
  }


}
