blob: 7a4585cd914df5e72af7bc6c976d6a6514b8d056 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.wan;
import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.parallel.BatchRemovalThreadHelper;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.DUnitLauncher;
import org.apache.geode.test.junit.categories.WanTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
import org.apache.geode.test.version.VersionManager;
@SuppressWarnings("ConstantConditions")
@Category(WanTest.class)
@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
public abstract class WANRollingUpgradeDUnitTest extends JUnit4CacheTestCase {
@Parameterized.Parameters(name = "from_v{0}")
public static Collection data() {
List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent();
if (result.size() < 1) {
throw new RuntimeException("No older versions of Geode were found to test against");
} else {
System.out.println("running against these versions: " + result);
}
return result;
}
// the old version of Geode we're testing against
@Parameterized.Parameter
public String oldVersion;
@Rule
public transient GfshCommandRule gfsh = new GfshCommandRule();
void startLocator(int port, int distributedSystemId, String locators,
String remoteLocators) throws IOException {
startLocator(port, distributedSystemId, locators,
remoteLocators, false);
}
void startLocator(int port, int distributedSystemId, String locators,
String remoteLocators, boolean enableClusterConfiguration) throws IOException {
Properties props = getLocatorProperties(distributedSystemId, locators, remoteLocators,
enableClusterConfiguration);
Locator.startLocatorAndDS(port, null, props);
}
int startLocatorWithJmxManager(int port, int distributedSystemId, String locators,
String remoteLocators) throws IOException {
Properties props = getLocatorProperties(distributedSystemId, locators, remoteLocators);
int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort();
props.put(JMX_MANAGER_PORT, String.valueOf(jmxPort));
props.put(JMX_MANAGER, "true");
props.put(JMX_MANAGER_START, "true");
Locator.startLocatorAndDS(port, null, props);
return jmxPort;
}
private Properties getLocatorProperties(int distributedSystemId, String locators,
String remoteLocators) {
return getLocatorProperties(distributedSystemId, locators,
remoteLocators, false);
}
private Properties getLocatorProperties(int distributedSystemId, String locators,
String remoteLocators, boolean enableClusterConfiguration) {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(distributedSystemId));
props.setProperty(LOCATORS, locators);
if (remoteLocators != null) {
props.setProperty(REMOTE_LOCATORS, remoteLocators);
}
props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel);
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, String.valueOf(enableClusterConfiguration));
return props;
}
void stopLocator() {
InternalLocator.getLocator().stop();
}
VM rollLocatorToCurrent(VM rollLocator, int port, int distributedSystemId,
String locators, String remoteLocators) {
return rollLocatorToCurrent(rollLocator, port, distributedSystemId,
locators, remoteLocators, false);
}
VM rollLocatorToCurrent(VM rollLocator, int port, int distributedSystemId,
String locators, String remoteLocators, boolean enableClusterConfiguration) {
rollLocator.invoke(() -> stopLocator());
VM newLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, rollLocator.getId());
newLocator.invoke(() -> startLocator(port, distributedSystemId, locators, remoteLocators,
enableClusterConfiguration));
return newLocator;
}
VM rollStartAndConfigureServerToCurrent(VM oldServer, String locators,
int distributedSystem, String regionName, String senderId, int messageSyncInterval) {
oldServer.invoke(() -> closeCache());
VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
startAndConfigureServers(rollServer, null, locators, distributedSystem, regionName, senderId,
messageSyncInterval);
return rollServer;
}
void startAndConfigureServers(VM server1, VM server2, String locators,
int distributedSystem, String regionName, String senderId, int messageSyncInterval) {
// Start and configure servers
// - Create Cache
// - Create CacheServer
// - Create GatewaySender
// - Create GatewayReceiver
// - Create Region
// Start and configure server 1
server1.invoke(() -> createCache(locators));
server1.invoke(() -> addCacheServer());
server1.invoke(() -> createGatewaySender(senderId, distributedSystem, messageSyncInterval));
server1.invoke(() -> createGatewayReceiver());
server1.invoke(() -> createPartitionedRegion(regionName, senderId));
// Start and configure server 2 if necessary
if (server2 != null) {
server2.invoke(() -> createCache(locators));
server2.invoke(() -> addCacheServer());
server2.invoke(() -> createGatewaySender(senderId, distributedSystem, messageSyncInterval));
server2.invoke(() -> createGatewayReceiver());
server2.invoke(() -> createPartitionedRegion(regionName, senderId));
}
}
void doClientPutsAndVerifyEvents(VM client, VM localServer1, VM localServer2,
VM remoteServer1, VM remoteServer2, String hostName, int locatorPort, String regionName,
int numPuts, String senderId, boolean primaryOnly) {
// Start client
client.invoke(() -> startClient(hostName, locatorPort, regionName));
// Do puts from client
client.invoke(() -> doPuts(regionName, numPuts));
// Wait for local site queues to be empty
localServer1.invoke(() -> waitForQueueRegionToCertainSize(senderId, 0, primaryOnly));
localServer2.invoke(() -> waitForQueueRegionToCertainSize(senderId, 0, primaryOnly));
// Verify remote site received events
int remoteServer1EventsReceived = remoteServer1.invoke(() -> getEventsReceived(regionName));
int remoteServer2EventsReceived = remoteServer2.invoke(() -> getEventsReceived(regionName));
assertEquals(numPuts, remoteServer1EventsReceived + remoteServer2EventsReceived);
// Clear events received in both sites
localServer1.invoke(() -> clearEventsReceived(regionName));
localServer2.invoke(() -> clearEventsReceived(regionName));
remoteServer1.invoke(() -> clearEventsReceived(regionName));
remoteServer2.invoke(() -> clearEventsReceived(regionName));
}
void stopSenderAndVerifyEvents(VM localServer1, VM localServer2, VM remoteServer1,
VM remoteServer2, String senderId, String regionName, int numPuts) {
// Verify the secondary events still exist
int localServer1QueueSize = localServer1.invoke(() -> getQueueRegionSize(senderId, false));
int localServer2QueueSize = localServer2.invoke(() -> getQueueRegionSize(senderId, false));
assertEquals(numPuts, localServer1QueueSize + localServer2QueueSize);
// Stop one sender
localServer1.invoke(() -> closeCache());
// Wait for the other sender's queue to be empty
localServer2.invoke(() -> waitForQueueRegionToCertainSize(senderId, 0, false));
// Verify remote site did not receive any events. The events received were previously cleared on
// all members, so there should be 0 events received on the remote site.
int remoteServer1EventsReceived = remoteServer1.invoke(() -> getEventsReceived(regionName));
int remoteServer2EventsReceived = remoteServer2.invoke(() -> getEventsReceived(regionName));
assertEquals(0, remoteServer1EventsReceived + remoteServer2EventsReceived);
}
String getCreateGatewaySenderCommand(String id, int remoteDsId) {
CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, id);
csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID,
String.valueOf(remoteDsId));
return csb.toString();
}
public void createCache(String locators) {
createCache(locators, false, false);
}
public void createCache(String locators, boolean enableClusterConfiguration,
boolean useClusterConfiguration) {
Properties props = new Properties();
props.setProperty(ENABLE_CLUSTER_CONFIGURATION, String.valueOf(enableClusterConfiguration));
props.setProperty(USE_CLUSTER_CONFIGURATION, String.valueOf(useClusterConfiguration));
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, locators);
props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel);
getCache(props);
}
private void addCacheServer() throws Exception {
CacheServer server = getCache().addCacheServer();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
server.setPort(port);
server.start();
}
protected void startClient(String hostName, int locatorPort, String regionName) {
ClientCacheFactory ccf = new ClientCacheFactory().addPoolLocator(hostName, locatorPort);
ClientCache cache = getClientCache(ccf);
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
}
void createGatewaySender(String id, int remoteDistributedSystemId,
int messageSyncInterval) {
// Setting the messageSyncInterval controls how often the BatchRemovalThread sends processed
// events from the primary to the secondary. Setting it high prevents the events from being
// removed from the secondary.
BatchRemovalThreadHelper.setMessageSyncInterval(messageSyncInterval);
GatewaySenderFactory gsf = getCache().createGatewaySenderFactory();
gsf.setParallel(true);
gsf.create(id, remoteDistributedSystemId);
}
void resetAllMessageSyncIntervals(VM site1Server1, VM site1Server2, VM site2Server1,
VM site2Server2) {
site1Server1.invoke(() -> resetMessageSyncInterval());
site1Server2.invoke(() -> resetMessageSyncInterval());
site2Server1.invoke(() -> resetMessageSyncInterval());
site2Server2.invoke(() -> resetMessageSyncInterval());
}
private void resetMessageSyncInterval() {
BatchRemovalThreadHelper
.setMessageSyncInterval(ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
}
void createGatewayReceiver() {
getCache().createGatewayReceiverFactory().create();
}
private void createPartitionedRegion(String regionName, String gatewaySenderId) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setTotalNumBuckets(10);
getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
.addCacheListener(new EventCountCacheListener()).addGatewaySenderId(gatewaySenderId)
.setPartitionAttributes(paf.create()).create(regionName);
}
protected void doPuts(String regionName, int numPuts) {
Region region = getCache().getRegion(regionName);
for (int i = 0; i < numPuts; i++) {
region.put(i, i);
}
}
protected void pauseSender(String senderId) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
for (GatewaySender s : senders) {
if (s.getId().equals(senderId)) {
sender = s;
break;
}
}
sender.pause();
((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause();
} finally {
exp.remove();
exln.remove();
}
}
protected void resumeSender(String senderId) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
for (GatewaySender s : senders) {
if (s.getId().equals(senderId)) {
sender = s;
break;
}
}
sender.resume();
} finally {
exp.remove();
exln.remove();
}
}
protected void waitForQueueRegionToCertainSize(String gatewaySenderId, int expectedSize,
boolean primaryOnly) {
await()
.until(() -> getQueueRegionSize(gatewaySenderId, primaryOnly) == expectedSize);
}
protected int getQueueRegionSize(String gatewaySenderId, boolean primaryOnly) {
// This method currently only supports parallel senders. It gets the size of the local data set
// from the
// underlying co-located region. Depending on the value of primaryOnly, it gets either the local
// primary data set (just primary buckets) or all local data set (primary and secondary
// buckets).
AbstractGatewaySender ags =
(AbstractGatewaySender) getCache().getGatewaySender(gatewaySenderId);
ConcurrentParallelGatewaySenderQueue prq =
(ConcurrentParallelGatewaySenderQueue) ags.getQueues().iterator().next();
Region region = prq.getRegion();
Region localDataSet = primaryOnly ? PartitionRegionHelper.getLocalPrimaryData(region)
: PartitionRegionHelper.getLocalData(region);
return localDataSet.size();
}
protected Integer getEventsReceived(String regionName) {
Region region = getCache().getRegion(regionName);
EventCountCacheListener cl =
(EventCountCacheListener) region.getAttributes().getCacheListener();
return cl.getEventsReceived();
}
protected void clearEventsReceived(String regionName) {
Region region = getCache().getRegion(regionName);
EventCountCacheListener cl =
(EventCountCacheListener) region.getAttributes().getCacheListener();
cl.clearEventsReceived();
}
static class EventCountCacheListener extends CacheListenerAdapter {
AtomicInteger eventsReceived = new AtomicInteger();
@Override
public void afterCreate(EntryEvent event) {
process(event);
}
@Override
public void afterUpdate(EntryEvent event) {
process(event);
}
void process(EntryEvent event) {
incrementEventsReceived();
}
int incrementEventsReceived() {
return this.eventsReceived.incrementAndGet();
}
int getEventsReceived() {
return this.eventsReceived.get();
}
void clearEventsReceived() {
this.eventsReceived.set(0);
}
}
}