blob: bd0b429fa14a5913f871ff7270103d327d21a09f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallbackAdapter;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.WanTest;
/**
* @since GemFire 7.0.1
*/
@Category({WanTest.class})
public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
protected static final String regionName = "testRegion";
protected static Cache cache;
private static Set<IgnoredException> expectedExceptions = new HashSet<IgnoredException>();
@Override
public final void preTearDown() throws Exception {
closeCache();
Invoke.invokeInEveryVM(this::closeCache);
}
@Test
public void testUpdateVersionAfterCreateWithSerialSender() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0); // locator site1
VM vm1 = host.getVM(1); // server2 site1
VM vm2 = host.getVM(2); // locator site2
VM vm3 = host.getVM(3); // server1 site2
VM vm4 = host.getVM(4); // server2 site2
final String key = "key-1";
// Site 1
Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
vm1.invoke(() -> this.createCache(lnPort));
vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
vm1.invoke(() -> this.startSender("ln1"));
vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
// Site 2
Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
vm4.invoke(() -> this.createCache(nyPort));
vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
remoteTag.getVersionTimeStamp());
}
@Test
public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0); // locator site1
VM vm1 = host.getVM(1); // server1 site1
VM vm2 = host.getVM(2); // locator site2
VM vm3 = host.getVM(3); // server1 site2
VM vm4 = host.getVM(4); // server2 site2
final String key = "key-1";
// Site 1
Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
vm1.invoke(() -> this.createCache(lnPort));
vm1.invoke(() -> this.createSender("ln1", 2, false, 10, 1, false, false, null, true));
vm1.invoke(() -> this.createReplicatedRegion(regionName, "ln1"));
vm1.invoke(() -> this.startSender("ln1"));
vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
// Site 2
Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
vm3.invoke(() -> this.createReplicatedRegion(regionName, ""));
vm4.invoke(() -> this.createCache(nyPort));
vm4.invoke(() -> this.createReplicatedRegion(regionName, ""));
VersionTag localTag = vm1.invoke(() -> putEntryAndGetReplicatedRegionVersionTag(key));
VersionTag remoteTag = vm4.invoke(() -> getReplicatedRegionVersionTag(key, localTag));
assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
remoteTag.getVersionTimeStamp());
}
@Test
public void testUpdateVersionAfterCreateWithParallelSender() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0); // locator site1
VM vm1 = host.getVM(1); // server1 site1
VM vm2 = host.getVM(2); // locator site2
VM vm3 = host.getVM(3); // server1 site2
VM vm4 = host.getVM(4); // server2 site2
// Site 1
Integer lnPort = vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
final String key = "key-1";
vm1.invoke(() -> this.createCache(lnPort));
vm1.invoke(() -> this.createSender("ln1", 2, true, 10, 1, false, false, null, true));
vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
vm1.invoke(() -> this.startSender("ln1"));
vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
// Site 2
Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
vm4.invoke(() -> this.createCache(nyPort));
vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
remoteTag.getVersionTimeStamp());
}
@Test
public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0); // locator site1
VM vm1 = host.getVM(1); // server1 site1
VM vm2 = host.getVM(2); // locator site2
VM vm3 = host.getVM(3); // server1 site2
VM vm4 = host.getVM(4); // server2 site2
// Site 1
Integer lnPort = (Integer) vm0.invoke(() -> this.createFirstLocatorWithDSId(1));
final String key = "key-1";
vm1.invoke(() -> this.createCache(lnPort));
vm1.invoke(
() -> this.createConcurrentSender("ln1", 2, false, 10, 2, false, false, null, true, 2));
vm1.invoke(() -> this.createPartitionedRegion(regionName, "ln1", 1, 1));
vm1.invoke(() -> this.startSender("ln1"));
vm1.invoke(() -> this.waitForSenderRunningState("ln1"));
// Site 2
Integer nyPort = (Integer) vm2.invoke(() -> this.createFirstRemoteLocator(2, lnPort));
Integer nyRecPort = (Integer) vm3.invoke(() -> this.createReceiver(nyPort));
vm3.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
vm4.invoke(() -> this.createCache(nyPort));
vm4.invoke(() -> this.createPartitionedRegion(regionName, "", 1, 1));
VersionTag localTag = vm1.invoke(() -> putEntryAndGetPartitionedRegionVersionTag(key));
VersionTag remoteTag = vm4.invoke(() -> getPartitionedRegionVersionTag(key, localTag));
assertEquals("Local and remote site have different timestamps", localTag.getVersionTimeStamp(),
remoteTag.getVersionTimeStamp());
}
private VersionTag putEntryAndGetReplicatedRegionVersionTag(String key) {
Region region = cache.getRegion(regionName);
assertTrue(region instanceof DistributedRegion);
region.put(key, "value-1");
region.put(key, "value-2");
Entry entry = region.getEntry(key);
assertTrue(entry instanceof NonTXEntry);
RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
VersionStamp stamp = regionEntry.getVersionStamp();
// Create a duplicate entry version tag from stamp with newer
// time-stamp.
VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
VersionTag versionTag = VersionTag.create(memberId);
int entryVersion = stamp.getEntryVersion() - 1;
int dsid = stamp.getDistributedSystemId();
// Increment the time by 1 in case the time is the same as the previous event.
// The entry's version timestamp can be incremented by 1 in certain circumstances.
// See AbstractRegionEntry.generateVersionTag.
long time = System.currentTimeMillis() + 1;
versionTag.setEntryVersion(entryVersion);
versionTag.setDistributedSystemId(dsid);
versionTag.setVersionTimeStamp(time);
versionTag.setIsRemoteForTesting();
EntryEventImpl event =
createNewEvent((DistributedRegion) region, versionTag, entry.getKey(), "value-3");
((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
// Verify the new stamp
entry = region.getEntry(key);
assertTrue(entry instanceof NonTXEntry);
regionEntry = ((NonTXEntry) entry).getRegionEntry();
stamp = regionEntry.getVersionStamp();
assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
stamp.getVersionTimeStamp());
assertEquals(entryVersion + 1, stamp.getEntryVersion());
assertEquals(dsid, stamp.getDistributedSystemId());
return stamp.asVersionTag();
}
private VersionTag putEntryAndGetPartitionedRegionVersionTag(String key) {
Region region = cache.getRegion(regionName);
assertTrue(region instanceof PartitionedRegion);
region.put(key, "value-1");
region.put(key, "value-2");
Entry entry = region.getEntry(key);
assertTrue(entry instanceof EntrySnapshot);
RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
VersionStamp stamp = regionEntry.getVersionStamp();
// Create a duplicate entry version tag from stamp with newer
// time-stamp.
VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
VersionTag versionTag = VersionTag.create(memberId);
int entryVersion = stamp.getEntryVersion() - 1;
int dsid = stamp.getDistributedSystemId();
// Increment the time by 1 in case the time is the same as the previous event.
// The entry's version timestamp can be incremented by 1 in certain circumstances.
// See AbstractRegionEntry.generateVersionTag.
long time = System.currentTimeMillis() + 1;
versionTag.setEntryVersion(entryVersion);
versionTag.setDistributedSystemId(dsid);
versionTag.setVersionTimeStamp(time);
versionTag.setIsRemoteForTesting();
EntryEventImpl event =
createNewEvent((PartitionedRegion) region, versionTag, entry.getKey(), "value-3");
((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
// Verify the new stamp
entry = region.getEntry(key);
assertTrue(entry instanceof EntrySnapshot);
regionEntry = ((EntrySnapshot) entry).getRegionEntry();
stamp = regionEntry.getVersionStamp();
assertEquals("Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", time,
stamp.getVersionTimeStamp());
assertEquals(++entryVersion, stamp.getEntryVersion());
assertEquals(dsid, stamp.getDistributedSystemId());
return stamp.asVersionTag();
}
private VersionTag getReplicatedRegionVersionTag(final String key, final VersionTag localTag) {
final Region region = cache.getRegion(regionName);
await().until(() -> region.getEntry(key) != null);
await().until(() -> {
Entry entry = region.getEntry(key);
assertTrue(entry instanceof NonTXEntry);
RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
});
Entry entry = region.getEntry(key);
assertTrue(entry instanceof NonTXEntry);
RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
VersionStamp stamp = regionEntry.getVersionStamp();
return stamp.asVersionTag();
}
private VersionTag getPartitionedRegionVersionTag(final String key, final VersionTag localTag) {
final PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
await().until(() -> {
Entry<?, ?> entry = null;
try {
entry = region.getDataStore().getEntryLocally(0, key, false, false);
} catch (EntryNotFoundException | ForceReattemptException e) {
// expected
} catch (PRLocallyDestroyedException e) {
throw new RuntimeException("unexpected exception", e);
}
if (entry != null) {
LogWriterUtils.getLogWriter().info("found entry " + entry);
}
return (entry != null);
});
await().until(() -> {
Entry entry = region.getEntry(key);
assertTrue(entry instanceof EntrySnapshot);
RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
return regionEntry.getVersionStamp().getVersionTimeStamp() == localTag.getVersionTimeStamp();
});
Entry entry = region.getEntry(key);
assertTrue(entry instanceof EntrySnapshot);
RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
VersionStamp stamp = regionEntry.getVersionStamp();
return stamp.asVersionTag();
}
private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, Object key,
Object value) {
VersionTagHolder updateEvent = new VersionTagHolder(tag);
updateEvent.setOperation(Operation.UPDATE);
updateEvent.setRegion(region);
if (region instanceof PartitionedRegion) {
updateEvent.setKeyInfo(((PartitionedRegion) region).getKeyInfo(key));
} else {
updateEvent.setKeyInfo(new KeyInfo(key, value, null));
}
updateEvent.setNewValue(value);
updateEvent.setGenerateCallbacks(true);
updateEvent.distributedMember = region.getSystem().getDistributedMember();
updateEvent.setNewEventId(region.getSystem());
return updateEvent;
}
/*
* Helper Methods
*/
private void createCache(Integer locPort) {
UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + locPort + "]");
props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
IgnoredException ex =
new IgnoredException("could not get remote locator information for remote site");
cache.getLogger().info(ex.getAddMessage());
expectedExceptions.add(ex);
ex = new IgnoredException("Pool ln1 is not available");
cache.getLogger().info(ex.getAddMessage());
expectedExceptions.add(ex);
}
private void closeCache() {
if (cache != null && !cache.isClosed()) {
for (IgnoredException expectedException : expectedExceptions) {
cache.getLogger().info(expectedException.getRemoveMessage());
}
expectedExceptions.clear();
cache.getDistributedSystem().disconnect();
cache.close();
}
cache = null;
}
public void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory,
Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
boolean isManualStart) {
File persistentDirectory =
new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] {persistentDirectory};
if (isParallel) {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(true);
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManualStart);
((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
gateway.addGatewayEventFilter(filter);
}
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}
gateway.setBatchConflationEnabled(isConflation);
gateway.create(dsName, remoteDsId);
} else {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManualStart);
((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
gateway.addGatewayEventFilter(filter);
}
gateway.setBatchConflationEnabled(isConflation);
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}
gateway.create(dsName, remoteDsId);
}
}
private void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies,
Integer totalNumBuckets) {
AttributesFactory fact = new AttributesFactory();
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
fact.addGatewaySenderId(senderId);
}
}
PartitionAttributesFactory pFact = new PartitionAttributesFactory();
pFact.setTotalNumBuckets(totalNumBuckets);
pFact.setRedundantCopies(redundantCopies);
pFact.setRecoveryDelay(0);
fact.setPartitionAttributes(pFact.create());
Region r = cache.createRegionFactory(fact.create()).create(regionName);
assertNotNull(r);
}
private void createReplicatedRegion(String regionName, String senderIds) {
AttributesFactory fact = new AttributesFactory();
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
fact.addGatewaySenderId(senderId);
}
}
fact.setDataPolicy(DataPolicy.REPLICATE);
fact.setScope(Scope.DISTRIBUTED_ACK);
Region r = cache.createRegionFactory(fact.create()).create(regionName);
assertNotNull(r);
}
private void waitForSenderRunningState(String senderId) {
GatewaySender sender = cache.getGatewaySender(senderId);
await()
.until(() -> sender != null && sender.isRunning());
}
private Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
props.setProperty(LOCATORS, "localhost[" + port + "]");
props.setProperty(START_LOCATOR,
"localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
startLocatorDistributedSystem(props);
return port;
}
private void startLocatorDistributedSystem(Properties props) {
// Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
try {
getSystem(props);
} finally {
System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
}
}
private void createConcurrentSender(String dsName, int remoteDsId, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) {
File persistentDirectory =
new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] {persistentDirectory};
if (isParallel) {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(true);
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManualStart);
((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
gateway.addGatewayEventFilter(filter);
}
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}
gateway.setBatchConflationEnabled(isConflation);
gateway.create(dsName, remoteDsId);
} else {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManualStart);
((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
gateway.addGatewayEventFilter(filter);
}
gateway.setBatchConflationEnabled(isConflation);
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}
gateway.setDispatcherThreads(concurrencyLevel);
gateway.create(dsName, remoteDsId);
}
}
private int createReceiver(int locPort) {
UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + locPort + "]");
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
fact.setStartPort(port);
fact.setEndPort(port);
GatewayReceiver receiver = fact.create();
try {
receiver.start();
} catch (IOException e) {
e.printStackTrace();
fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port);
}
return port;
}
private void startSender(String senderId) {
Set<GatewaySender> senders = cache.getGatewaySenders();
for (GatewaySender sender : senders) {
if (sender.getId().equals(senderId)) {
sender.start();
break;
}
}
}
protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {
private final Set discoveredLocators = new HashSet();
private final Set removedLocators = new HashSet();
@Override
public synchronized void locatorsDiscovered(List locators) {
discoveredLocators.addAll(locators);
notifyAll();
}
@Override
public synchronized void locatorsRemoved(List locators) {
removedLocators.addAll(locators);
notifyAll();
}
public boolean waitForDiscovery(InetSocketAddress locator, long time)
throws InterruptedException {
return waitFor(discoveredLocators, locator, time);
}
public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException {
return waitFor(removedLocators, locator, time);
}
private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time)
throws InterruptedException {
long remaining = time;
long endTime = System.currentTimeMillis() + time;
while (!set.contains(locator) && remaining >= 0) {
wait(remaining);
remaining = endTime - System.currentTimeMillis();
}
return set.contains(locator);
}
public synchronized Set getDiscovered() {
return new HashSet(discoveredLocators);
}
public synchronized Set getRemoved() {
return new HashSet(removedLocators);
}
}
private Integer createFirstLocatorWithDSId(int dsId) {
UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
Properties props = test.getDistributedSystemProperties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + dsId);
props.setProperty(LOCATORS, "localhost[" + port + "]");
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
props.setProperty(START_LOCATOR,
"localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
startLocatorDistributedSystem(props);
return port;
}
}