/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;

import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallbackAdapter;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.WanTest;

/**
 * @since GemFire 7.0.1
 */
@Category({WanTest.class})
public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {

  protected static final String regionName = "testRegion";
  protected static Cache cache;
  private static Set<IgnoredException> expectedExceptions = new HashSet<IgnoredException>();

  @Override
  public final void preTearDown() throws Exception {
    closeCache();
    Invoke.invokeInEveryVM(this::closeCache);
  }

  @Test
  public void testUpdateVersionAfterCreateWithSerialSender() {
    Host host = Host.getHost(0);
    VM vm0 = host.getVM(0); // locator site1
    VM vm1 = host.getVM(1); // server2 site1

    VM vm2 = host.getVM(2); // locator site2
    VM vm3 = host.getVM(3); // server1 site2
    VM vm4 = host.getVM(4); // server2 site2

    final String key = "key-1";

    // Site 1
    Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));

    vm1.invoke(() -> this.createCache(lnPort));
    vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));

    vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
    vm1.invoke(() -> this.startSender("ln1"));
    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));

    // Site 2
    Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));

    vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
    vm4.invoke(() -> this.createCache(nyPort));
    vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));

    VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
    VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));

    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
        remoteTag.getVersionTimeStamp());
  }

  @Test
  public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
    Host host = Host.getHost(0);
    VM vm0 = host.getVM(0); // locator site1
    VM vm1 = host.getVM(1); // server1 site1

    VM vm2 = host.getVM(2); // locator site2
    VM vm3 = host.getVM(3); // server1 site2
    VM vm4 = host.getVM(4); // server2 site2

    final String key = "key-1";

    // Site 1
    Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));

    vm1.invoke(() -> this.createCache(lnPort));
    vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));

    vm1.invoke(() -> this.createReplicatedRegion(regionName, "ln1"));
    vm1.invoke(() -> this.startSender("ln1"));
    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));

    // Site 2
    Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));

    vm3.invoke(() -> this.createReplicatedRegion(regionName, ""));
    vm4.invoke(() -> this.createCache(nyPort));
    vm4.invoke(() -> this.createReplicatedRegion(regionName, ""));

    VersionTag localTag = vm1.invoke(() -> putEntryAndGetReplicatedRegionVersionTag(key));
    VersionTag remoteTag = vm4.invoke(() -> getReplicatedRegionVersionTag(key, localTag));

    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
        remoteTag.getVersionTimeStamp());
  }

  @Test
  public void testUpdateVersionAfterCreateWithParallelSender() {
    Host host = Host.getHost(0);
    VM vm0 = host.getVM(0); // locator site1
    VM vm1 = host.getVM(1); // server1 site1

    VM vm2 = host.getVM(2); // locator site2
    VM vm3 = host.getVM(3); // server1 site2
    VM vm4 = host.getVM(4); // server2 site2

    // Site 1
    Integer lnPort = vm0.invoke(() -> this.createFirstLocatorWithDSId(1));

    final String key = "key-1";

    vm1.invoke(() -> this.createCache(lnPort));
    vm1.invoke(() -> this.createSender("ln1", 2, true, 10, 1, false, false, null, true));

    vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
    vm1.invoke(() -> this.startSender("ln1"));
    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));

    // Site 2
    Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));

    vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
    vm4.invoke(() -> this.createCache(nyPort));
    vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));

    VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
    VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));

    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
        remoteTag.getVersionTimeStamp());
  }

  @Test
  public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
    Host host = Host.getHost(0);
    VM vm0 = host.getVM(0); // locator site1
    VM vm1 = host.getVM(1); // server1 site1

    VM vm2 = host.getVM(2); // locator site2
    VM vm3 = host.getVM(3); // server1 site2
    VM vm4 = host.getVM(4); // server2 site2

    // Site 1
    Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));

    final String key = "key-1";

    vm1.invoke(() -> this.createCache(lnPort));
    vm1.invoke(
        () -> this.createConcurrentSender("ln1", 2, false, 10, 2, false, false, null, true, 2));

    vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
    vm1.invoke(() -> this.startSender("ln1"));
    vm1.invoke(() -> this.waitForSenderRunningState("ln1"));

    // Site 2
    Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
    Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));

    vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
    vm4.invoke(() -> this.createCache(nyPort));
    vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));

    VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
    VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));

    assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
        remoteTag.getVersionTimeStamp());
  }

  private VersionTag putEntryAndGetReplicatedRegionVersionTag(String key) {
    Region region = cache.getRegion(regionName);
    assertTrue(region instanceof DistributedRegion);

    region.put(key, "value-1");
    region.put(key, "value-2");
    Entry entry = region.getEntry(key);
    assertTrue(entry instanceof NonTXEntry);
    RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();

    VersionStamp stamp = regionEntry.getVersionStamp();

    // Create a duplicate entry version tag from stamp with newer
    // time-stamp.
    VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
    VersionTag versionTag = VersionTag.create(memberId);

    int entryVersion = stamp.getEntryVersion() - 1;
    int dsid = stamp.getDistributedSystemId();

    // Increment the time by 1 in case the time is the same as the previous event.
    // The entry's version timestamp can be incremented by 1 in certain circumstances.
    // See AbstractRegionEntry.generateVersionTag.
    long time = System.currentTimeMillis() + 1;

    versionTag.setEntryVersion(entryVersion);
    versionTag.setDistributedSystemId(dsid);
    versionTag.setVersionTimeStamp(time);
    versionTag.setIsRemoteForTesting();

    EntryEventImpl event =
        createNewEvent((DistributedRegion) region, versionTag, entry.getKey(), "value-3");

    ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);

    // Verify the new stamp
    entry = region.getEntry(key);
    assertTrue(entry instanceof NonTXEntry);
    regionEntry = ((NonTXEntry) entry).getRegionEntry();

    stamp = regionEntry.getVersionStamp();
    assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
        stamp.getVersionTimeStamp());
    assertEquals(entryVersion + 1, stamp.getEntryVersion());
    assertEquals(dsid, stamp.getDistributedSystemId());

    return stamp.asVersionTag();
  }

  private VersionTag putEntryAndGetPartitionedRegionVersionTag(String key) {
    Region region = cache.getRegion(regionName);
    assertTrue(region instanceof PartitionedRegion);

    region.put(key, "value-1");
    region.put(key, "value-2");
    Entry entry = region.getEntry(key);
    assertTrue(entry instanceof EntrySnapshot);
    RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();

    VersionStamp stamp = regionEntry.getVersionStamp();

    // Create a duplicate entry version tag from stamp with newer
    // time-stamp.
    VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
    VersionTag versionTag = VersionTag.create(memberId);

    int entryVersion = stamp.getEntryVersion() - 1;
    int dsid = stamp.getDistributedSystemId();

    // Increment the time by 1 in case the time is the same as the previous event.
    // The entry's version timestamp can be incremented by 1 in certain circumstances.
    // See AbstractRegionEntry.generateVersionTag.
    long time = System.currentTimeMillis() + 1;

    versionTag.setEntryVersion(entryVersion);
    versionTag.setDistributedSystemId(dsid);
    versionTag.setVersionTimeStamp(time);
    versionTag.setIsRemoteForTesting();

    EntryEventImpl event =
        createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");

    ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);

    // Verify the new stamp
    entry = region.getEntry(key);
    assertTrue(entry instanceof EntrySnapshot);
    regionEntry = ((EntrySnapshot) entry).getRegionEntry();

    stamp = regionEntry.getVersionStamp();
    assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
        stamp.getVersionTimeStamp());
    assertEquals(++entryVersion, stamp.getEntryVersion());
    assertEquals(dsid, stamp.getDistributedSystemId());

    return stamp.asVersionTag();
  }

  private VersionTag getReplicatedRegionVersionTag(final String key, final VersionTag localTag) {
    final Region region = cache.getRegion(regionName);

    await().until(() -> region.getEntry(key) != null);

    await().until(() -> {
      Entry entry = region.getEntry(key);
      assertTrue(entry instanceof NonTXEntry);
      RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
      return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
    });

    Entry entry = region.getEntry(key);
    assertTrue(entry instanceof NonTXEntry);
    RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();

    VersionStamp stamp = regionEntry.getVersionStamp();

    return stamp.asVersionTag();
  }

  private VersionTag getPartitionedRegionVersionTag(final String key, final VersionTag localTag) {
    final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);

    await().until(() -> {
      Entry<?, ?> entry = null;
      try {
        entry = region.getDataStore().getEntryLocally(0, key, false, false);
      } catch (EntryNotFoundException | ForceReattemptException e) {
        // expected
      } catch (PRLocallyDestroyedException e) {
        throw new RuntimeException("unexpected exception", e);
      }
      if (entry != null) {
        LogWriterUtils.getLogWriter().info("found entry " + entry);
      }
      return (entry != null);
    });

    await().until(() -> {
      Entry entry = region.getEntry(key);
      assertTrue(entry instanceof EntrySnapshot);
      RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
      return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
    });

    Entry entry = region.getEntry(key);
    assertTrue(entry instanceof EntrySnapshot);
    RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();

    VersionStamp stamp = regionEntry.getVersionStamp();

    return stamp.asVersionTag();
  }

  private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, Object key,
      Object value) {
    VersionTagHolder updateEvent = new VersionTagHolder(tag);
    updateEvent.setOperation(Operation.UPDATE);
    updateEvent.setRegion(region);
    if (region instanceof PartitionedRegion) {
      updateEvent.setKeyInfo(((PartitionedRegion) region).getKeyInfo(key));
    } else {
      updateEvent.setKeyInfo(new KeyInfo(key, value, null));
    }
    updateEvent.setNewValue(value);
    updateEvent.setGenerateCallbacks(true);
    updateEvent.distributedMember = region.getSystem().getDistributedMember();
    updateEvent.setNewEventId(region.getSystem());
    return updateEvent;
  }

  /*
   * Helper Methods
   */

  private void createCache(Integer locPort) {
    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
    props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
    props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
    props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);
    IgnoredException ex =
        new IgnoredException("could not get remote locator information for remote site");
    cache.getLogger().info(ex.getAddMessage());
    expectedExceptions.add(ex);
    ex = new IgnoredException("Pool ln1 is not available");
    cache.getLogger().info(ex.getAddMessage());
    expectedExceptions.add(ex);
  }

  private void closeCache() {
    if (cache != null && !cache.isClosed()) {
      for (IgnoredException expectedException : expectedExceptions) {
        cache.getLogger().info(expectedException.getRemoveMessage());
      }
      expectedExceptions.clear();
      cache.getDistributedSystem().disconnect();
      cache.close();
    }
    cache = null;
  }

  public void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory,
      Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
      boolean isManualStart) {
    File persistentDirectory =
        new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
    persistentDirectory.mkdir();
    DiskStoreFactory dsf = cache.createDiskStoreFactory();
    File[] dirs1 = new File[] {persistentDirectory};
    if (isParallel) {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setParallel(true);
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
        gateway.setDiskStoreName(store.getName());
      }
      gateway.setBatchConflationEnabled(isConflation);
      gateway.create(dsName, remoteDsId);

    } else {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      gateway.setBatchConflationEnabled(isConflation);
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
        gateway.setDiskStoreName(store.getName());
      }
      gateway.create(dsName, remoteDsId);
    }
  }

  private void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies,
      Integer totalNumBuckets) {
    RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION);

    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }

    PartitionAttributesFactory pFact = new PartitionAttributesFactory();
    pFact.setTotalNumBuckets(totalNumBuckets);
    pFact.setRedundantCopies(redundantCopies);
    pFact.setRecoveryDelay(0);
    fact.setPartitionAttributes(pFact.create());
    Region r = fact.create(regionName);
    assertNotNull(r);
  }

  private void createReplicatedRegion(String regionName, String senderIds) {
    RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE);

    if (senderIds != null) {
      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
      while (tokenizer.hasMoreTokens()) {
        String senderId = tokenizer.nextToken();
        fact.addGatewaySenderId(senderId);
      }
    }

    Region r = fact.create(regionName);
    assertNotNull(r);
  }

  private void waitForSenderRunningState(String senderId) {
    GatewaySender sender = cache.getGatewaySender(senderId);
    await()
        .until(() -> sender != null && sender.isRunning());
  }

  private Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
    props.setProperty(LOCATORS, "localhost[" + port + "]");
    props.setProperty(START_LOCATOR,
        "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
    props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
    props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
    startLocatorDistributedSystem(props);
    return port;
  }

  private void startLocatorDistributedSystem(Properties props) {
    // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
    System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
    try {
      getSystem(props);
    } finally {
      System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
    }
  }

  private void createConcurrentSender(String dsName, int remoteDsId, boolean isParallel,
      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
      GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) {
    File persistentDirectory =
        new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
    persistentDirectory.mkdir();
    DiskStoreFactory dsf = cache.createDiskStoreFactory();
    File[] dirs1 = new File[] {persistentDirectory};

    if (isParallel) {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setParallel(true);
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
        gateway.setDiskStoreName(store.getName());
      }
      gateway.setBatchConflationEnabled(isConflation);
      gateway.create(dsName, remoteDsId);

    } else {
      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
      gateway.setMaximumQueueMemory(maxMemory);
      gateway.setBatchSize(batchSize);
      gateway.setManualStart(isManualStart);
      ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
      if (filter != null) {
        gateway.addGatewayEventFilter(filter);
      }
      gateway.setBatchConflationEnabled(isConflation);
      if (isPersistent) {
        gateway.setPersistenceEnabled(true);
        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
      } else {
        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
        gateway.setDiskStoreName(store.getName());
      }
      gateway.setDispatcherThreads(concurrencyLevel);
      gateway.create(dsName, remoteDsId);
    }
  }

  private int createReceiver(int locPort) {
    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "localhost[" + locPort + "]");

    InternalDistributedSystem ds = test.getSystem(props);
    cache = CacheFactory.create(ds);
    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
    fact.setStartPort(port);
    fact.setEndPort(port);
    GatewayReceiver receiver = fact.create();
    try {
      receiver.start();
    } catch (IOException e) {
      e.printStackTrace();
      fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port);
    }
    return port;
  }

  private void startSender(String senderId) {
    Set<GatewaySender> senders = cache.getGatewaySenders();
    for (GatewaySender sender : senders) {
      if (sender.getId().equals(senderId)) {
        sender.start();
        break;
      }
    }
  }

  protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {

    private final Set discoveredLocators = new HashSet();

    private final Set removedLocators = new HashSet();

    @Override
    public synchronized void locatorsDiscovered(List locators) {
      discoveredLocators.addAll(locators);
      notifyAll();
    }

    @Override
    public synchronized void locatorsRemoved(List locators) {
      removedLocators.addAll(locators);
      notifyAll();
    }

    public boolean waitForDiscovery(InetSocketAddress locator, long time)
        throws InterruptedException {
      return waitFor(discoveredLocators, locator, time);
    }

    public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException {
      return waitFor(removedLocators, locator, time);
    }

    private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time)
        throws InterruptedException {
      long remaining = time;
      long endTime = System.currentTimeMillis() + time;
      while (!set.contains(locator) && remaining >= 0) {
        wait(remaining);
        remaining = endTime - System.currentTimeMillis();
      }
      return set.contains(locator);
    }

    public synchronized Set getDiscovered() {
      return new HashSet(discoveredLocators);
    }

    public synchronized Set getRemoved() {
      return new HashSet(removedLocators);
    }
  }

  private Integer createFirstLocatorWithDSId(int dsId) {
    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
    Properties props = test.getDistributedSystemProperties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
    props.setProperty(LOCATORS, "localhost[" + port + "]");
    props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
    props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
    props.setProperty(START_LOCATOR,
        "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
    startLocatorDistributedSystem(props);
    return port;
  }
}
