blob: c4a8399b13347955911ce03a3bea21d5a7dbc523 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
import org.apache.geode.test.version.VersionManager;
/**
* This test class tests the communication pattern of a transaction for replicate and partition
* region. There are five VMs that two CacheServers, CacheServer with Pool and two clients. The
* communicate of a transaction is the following three patterns. In addition, this test class also
* tests transactions from older version clients to check backwards compatibility.<br>
* <ol>
* <li>CacheServer (transaction)-> CacheServer</li>
* <li>Client (transaction)-> CacheServer</li>
* <li>Old version client (transaction)-> CacheServer</li>
* <li>CacheServer via Pool (transaction)-> CacheServer</li>
* </ol>
*/
@Category(BackwardCompatibilityTest.class)
@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
@SuppressWarnings("serial")
public abstract class TxCommitMessageBCTestBase extends JUnit4DistributedTestCase {
@Parameterized.Parameter
public String testVersion;
@Parameterized.Parameters
public static Collection<String> data() {
List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent();
if (result.size() < 1) {
throw new RuntimeException("No older versions of Geode were found to test against");
} else {
System.out.println("running against these versions: " + result);
}
return result;
}
protected static VM server1 = null;
protected static VM server2 = null;
protected static VM server3 = null;
protected static VM client = null;
protected static VM oldClient = null;
protected static GemFireCacheImpl cache;
protected static final String REPLICATE_REGION_NAME =
TxCommitMessageBCTestBase.class.getSimpleName() + "_ReplicateRegion";
protected static final String PARTITION_REGION_NAME =
TxCommitMessageBCTestBase.class.getSimpleName() + "_PartitionRegion";
protected static String KEY1 = "KEY1";
protected static String KEY2 = "KEY2";
@Override
public final void postSetUp() throws Exception {
disconnectFromDS();
Host host = Host.getHost(0);
server1 = host.getVM(0); // server
server2 = host.getVM(1); // server
server3 = host.getVM(2); // server with pool
client = host.getVM(testVersion, 3); // client
oldClient = host.getVM(4); // client old version
int port1 =
server1.invoke(() -> TxCommitMessageBCTestBase.createServerCache());
int port2 =
server2.invoke(() -> TxCommitMessageBCTestBase.createServerCache());
server3.invoke(() -> TxCommitMessageBCTestBase
.createServerCacheWithPool(host.getHostName(), new Integer[] {port1, port2}));
client.invoke(() -> TxCommitMessageBCTestBase
.createClientCache(host.getHostName(), new Integer[] {port1, port2}));
oldClient.invoke(() -> TxCommitMessageBCTestBase
.createClientCache(host.getHostName(), new Integer[] {port1, port2}));
}
@Override
public final void preTearDown() throws Exception {
closeCache();
server1.invoke(() -> TxCommitMessageBCTestBase.closeCache());
server2.invoke(() -> TxCommitMessageBCTestBase.closeCache());
server3.invoke(() -> TxCommitMessageBCTestBase.closeCache());
client.invoke(() -> TxCommitMessageBCTestBase.closeCache());
oldClient.invoke(() -> TxCommitMessageBCTestBase.closeCache());
}
public static void closeCache() throws Exception {
if (cache != null) {
cache.close();
}
}
@SuppressWarnings("deprecation")
public static int createServerCache() throws Exception {
Properties props = new Properties();
TxCommitMessageBCTestBase test =
new TxCommitMessageBCTestBase() {};
DistributedSystem ds = test.getSystem(props);
ds.disconnect();
cache = (GemFireCacheImpl) CacheFactory.create(test.getSystem());
RegionFactory<String, String> rf1 = cache.createRegionFactory(RegionShortcut.REPLICATE);
rf1.create(REPLICATE_REGION_NAME);
PartitionAttributesFactory<String, Integer> paf2 = new PartitionAttributesFactory<>();
PartitionAttributes<String, Integer> pa2 =
paf2.setRedundantCopies(1).setPartitionResolver(new PartitionResolver<String, Integer>() {
@Override
public Object getRoutingObject(EntryOperation<String, Integer> opDetails) {
return opDetails.getKey().substring(0, 3);
}
@Override
public String getName() {
return getClass().getSimpleName();
}
@Override
public void close() {}
}).create();
RegionFactory<String, String> rf2 =
cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT);
rf2.setPartitionAttributes(pa2);
rf2.create(PARTITION_REGION_NAME);
CacheServer server = cache.addCacheServer();
server.setPort(getRandomAvailableTCPPort());
server.start();
return server.getPort();
}
@SuppressWarnings("deprecation")
public static void createServerCacheWithPool(String hostName, Integer[] ports) throws Exception {
Properties props = new Properties();
TxCommitMessageBCTestBase test =
new TxCommitMessageBCTestBase() {};
DistributedSystem ds = test.getSystem(props);
ds.disconnect();
cache = (GemFireCacheImpl) CacheFactory.create(test.getSystem());
String poolName = "ClientPool";
PoolFactory pf = PoolManager.createFactory().setSubscriptionEnabled(true);
for (int port : ports) {
pf.addServer(hostName, port);
}
Pool pool = pf.create(poolName);
RegionFactory<String, Integer> rf1 = cache.createRegionFactory(RegionShortcut.LOCAL);
rf1.setDataPolicy(DataPolicy.EMPTY);
rf1.setPoolName(pool.getName());
Region<String, Integer> region1 = rf1.create(REPLICATE_REGION_NAME);
region1.registerInterest("ALL_KEYS");
RegionFactory<String, Integer> rf2 = cache.createRegionFactory(RegionShortcut.LOCAL);
rf2.setDataPolicy(DataPolicy.EMPTY);
rf2.setPoolName(pool.getName());
Region<String, Integer> region2 = rf2.create(PARTITION_REGION_NAME);
region2.registerInterest("ALL_KEYS");
}
@SuppressWarnings("deprecation")
public static void createClientCache(String hostName, Integer[] ports) throws Exception {
Properties props = new Properties();
DistributedSystem ds = new TxCommitMessageBCTestBase() {}.getSystem(props);
ds.disconnect();
ClientCacheFactory ccf = new ClientCacheFactory(props);
ccf.setPoolSubscriptionEnabled(true);
for (int port : ports) {
ccf.addPoolServer(hostName, port);
}
cache = (GemFireCacheImpl) ccf.create();
ClientRegionFactory<String, Integer> crf1 =
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
Region<String, Integer> region1 = crf1.create(REPLICATE_REGION_NAME);
region1.registerInterest("ALL_KEYS");
ClientRegionFactory<String, Integer> crf2 =
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
Region<String, Integer> region2 = crf2.create(PARTITION_REGION_NAME);
region2.registerInterest("ALL_KEYS");
}
@SuppressWarnings("unchecked")
public static void doTxPuts(String regionName) throws Exception {
Region<String, Integer> region = cache.getRegion(regionName);
CacheTransactionManager txMngr = cache.getCacheTransactionManager();
txMngr.begin();
Integer value1 = region.get(KEY1);
if (value1 == null) {
value1 = 1;
} else {
value1++;
}
region.put(KEY1, value1);
Integer value2 = region.get(KEY2);
if (value2 == null) {
value2 = 1000;
} else {
value2++;
}
region.put(KEY2, value2);
txMngr.commit();
}
@SuppressWarnings("unchecked")
public static void doTxPutsBoth(String regionNameReplicate, String regionNamePartition)
throws Exception {
Region<String, Integer> regionReplicate = cache.getRegion(regionNameReplicate);
Region<String, Integer> regionPartition = cache.getRegion(regionNamePartition);
CacheTransactionManager txMngr = cache.getCacheTransactionManager();
txMngr.begin();
Integer valPart1 = regionPartition.get(KEY1);
if (valPart1 == null) {
valPart1 = 1500;
} else {
valPart1++;
}
regionPartition.put(KEY1, valPart1);
Integer valPart2 = regionPartition.get(KEY2);
if (valPart2 == null) {
valPart2 = 2000;
} else {
valPart2++;
}
regionPartition.put(KEY2, valPart2);
Integer valRepl1 = regionReplicate.get(KEY1);
if (valRepl1 == null) {
valRepl1 = 500;
} else {
valRepl1++;
}
regionReplicate.put(KEY1, valRepl1);
Integer valRepl2 = regionReplicate.get(KEY2);
if (valRepl2 == null) {
valRepl2 = 1000;
} else {
valRepl2++;
}
regionReplicate.put(KEY2, valRepl2);
txMngr.commit();
}
@SuppressWarnings("unchecked")
public static List<Integer> doGets(String regionName) throws Exception {
Region<String, Integer> region = cache.getRegion(regionName);
Integer value1 = region.get(KEY1);
Integer value2 = region.get(KEY2);
return Arrays.asList(value1, value2);
}
protected static void setVersion(String field, Object value) throws Exception {
Field targetField = value.getClass().getDeclaredField(field);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.set(targetField,
targetField.getModifiers() & ~Modifier.PRIVATE & ~Modifier.FINAL);
targetField.set(null, value);
}
}