blob: 7e46dc32fa9e5164cdb77874b410d0eb02c3f089 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
@RunWith(Parameterized.class)
public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
/** Cache atomicity mode. */
@Parameterized.Parameter
public CacheAtomicityMode atomicity;
/** Cache replication mode. */
@Parameterized.Parameter(1)
public CacheMode mode;
/** */
@Parameterized.Parameter(2)
public int backups;
/** @return Test parameters. */
@Parameterized.Parameters(name = "atomicity={0}, mode={1}, backupCnt={2}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();
for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
for (int backups = 0; backups < 2; backups++) {
// backupCount ignored for REPLICATED caches.
if (backups > 0 && mode == REPLICATED)
continue;
params.add(new Object[] {atomicity, mode, backups});
}
}
}
return params;
}
/** */
public static final String ACTIVE_PASSIVE_CACHE = "active-passive-cache";
/** */
public static final String ACTIVE_ACTIVE_CACHE = "active-active-cache";
/** */
public static final String IGNORED_CACHE = "ignored-cache";
/** */
public static final byte SRC_CLUSTER_ID = 1;
/** */
public static final byte DEST_CLUSTER_ID = 2;
/** */
private enum WaitDataMode {
/** */
EXISTS,
/** */
REMOVED
}
/** */
public static final int KEYS_CNT = 500;
/** */
protected static IgniteEx[] srcCluster;
/** */
protected static IgniteEx[] destCluster;
/** */
protected static IgniteConfiguration[] srcClusterCliCfg;
/** */
protected static IgniteConfiguration[] destClusterCliCfg;
/** */
private int discoPort = TcpDiscoverySpi.DFLT_PORT;
/** */
private byte clusterId = SRC_CLUSTER_ID;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
.setDiscoverySpi(new TcpDiscoverySpi()
.setLocalPort(discoPort)
.setIpFinder(new TcpDiscoveryVmIpFinder() {{
setAddresses(Collections.singleton("127.0.0.1:" + discoPort + ".." + (discoPort + DFLT_PORT_RANGE)));
}}));
if (!cfg.isClientMode()) {
CacheVersionConflictResolverPluginProvider<?> cfgPlugin1 = new CacheVersionConflictResolverPluginProvider<>();
cfgPlugin1.setClusterId(clusterId);
cfgPlugin1.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
cfgPlugin1.setConflictResolveField("reqId");
CacheVersionConflictResolverPluginProvider<?> cfgPlugin2 = new CacheVersionConflictResolverPluginProvider<>();
cfgPlugin2.setClusterId(clusterId);
cfgPlugin2.setCaches(new HashSet<>(Arrays.asList("T1")));
cfgPlugin2.setConflictResolveField("ID");
cfg.setPluginProviders(cfgPlugin1);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)));
cfg.getDataStorageConfiguration()
.setWalForceArchiveTimeout(5_000)
.setCdcEnabled(true);
cfg.setConsistentId(igniteInstanceName);
}
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> cluster = setupCluster("source", "src-cluster-client", 0);
srcCluster = cluster.get1();
srcClusterCliCfg = cluster.get2();
discoPort += DFLT_PORT_RANGE + 1;
clusterId = DEST_CLUSTER_ID;
cluster = setupCluster("destination", "dest-cluster-client", 2);
destCluster = cluster.get1();
destClusterCliCfg = cluster.get2();
String srcTag = srcCluster[0].cluster().tag();
String destTag = destCluster[0].cluster().tag();
assertNotNull(srcTag);
assertNotNull(destTag);
assertFalse(srcTag.equals(destTag));
}
/** */
private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(
String clusterTag,
String clientPrefix,
int idx
) throws Exception {
IgniteEx[] cluster = new IgniteEx[] {
startGrid(idx + 1),
startGrid(idx + 2)
};
IgniteConfiguration[] clusterCliCfg = new IgniteConfiguration[2];
for (int i = 0; i < 2; i++)
clusterCliCfg[i] = optimize(getConfiguration(clientPrefix + i).setClientMode(true));
cluster[0].cluster().state(ACTIVE);
cluster[0].cluster().tag(clusterTag);
return F.t(cluster, clusterCliCfg);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** Active/Passive mode means changes made only in one cluster. */
@Test
public void testActivePassiveReplication() throws Exception {
List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
try {
IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);
destCache.put(KEYS_CNT + 1, ConflictResolvableTestData.create());
destCache.remove(KEYS_CNT + 1);
// Updates for "ignored-cache" should be ignored because of CDC consume configuration.
runAsync(generateData(IGNORED_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
runAsync(generateData(ACTIVE_PASSIVE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
IgniteCache<Integer, ConflictResolvableTestData> srcCache =
createCache(srcCluster[srcCluster.length - 1], ACTIVE_PASSIVE_CACHE);
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
IntStream.range(0, KEYS_CNT).forEach(srcCache::remove);
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
assertFalse(destCluster[0].cacheNames().contains(IGNORED_CACHE));
}
finally {
for (IgniteInternalFuture<?> fut : futs)
fut.cancel();
}
}
/** Active/Passive mode means changes made only in one cluster. */
@Test
public void testActivePassiveSqlDataReplication() throws Exception {
String createTbl = "CREATE TABLE T1(ID BIGINT PRIMARY KEY, NAME VARCHAR) WITH \"CACHE_NAME=T1,VALUE_TYPE=T1Type\"";
String insertQry = "INSERT INTO T1 VALUES(?, ?)";
String deleteQry = "DELETE FROM T1";
executeSql(srcCluster[0], createTbl);
executeSql(destCluster[0], createTbl);
executeSql(destCluster[0], insertQry, -1, "Name-1");
executeSql(destCluster[0], deleteQry);
IntStream.range(0, KEYS_CNT).forEach(id -> executeSql(srcCluster[0], insertQry, id, "Name" + id));
List<IgniteInternalFuture<?>> futs = startActivePassiveCdc("T1");
try {
Function<Integer, GridAbsPredicate> waitForTblSz = expSz -> () -> {
long cnt = (Long)executeSql(destCluster[0], "SELECT COUNT(*) FROM T1").get(0).get(0);
return cnt == expSz;
};
assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));
List<List<?>> data = executeSql(destCluster[0], "SELECT ID, NAME FROM T1 ORDER BY ID");
for (int i = 0; i < KEYS_CNT; i++) {
assertEquals((long)i, data.get(i).get(0));
assertEquals("Name" + i, data.get(i).get(1));
}
executeSql(srcCluster[0], deleteQry);
assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
}
finally {
for (IgniteInternalFuture<?> fut : futs)
fut.cancel();
}
}
/** Active/Active mode means changes made in both clusters. */
@Test
public void testActiveActiveReplication() throws Exception {
IgniteCache<Integer, ConflictResolvableTestData> srcCache = createCache(srcCluster[0], ACTIVE_ACTIVE_CACHE);
IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_ACTIVE_CACHE);
// Even keys goes to src cluster.
runAsync(generateData(ACTIVE_ACTIVE_CACHE, srcCluster[srcCluster.length - 1],
IntStream.range(0, KEYS_CNT).filter(i -> i % 2 == 0)));
// Odd keys goes to dest cluster.
runAsync(generateData(ACTIVE_ACTIVE_CACHE, destCluster[destCluster.length - 1],
IntStream.range(0, KEYS_CNT).filter(i -> i % 2 != 0)));
List<IgniteInternalFuture<?>> futs = startActiveActiveCdc();
try {
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 == 0).forEach(srcCache::remove));
runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 != 0).forEach(destCache::remove));
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
}
finally {
for (IgniteInternalFuture<?> fut : futs)
fut.cancel();
}
}
/** */
public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
return () -> {
IgniteCache<Integer, ConflictResolvableTestData> cache = createCache(ign, cacheName);
keys.forEach(i -> cache.put(i, ConflictResolvableTestData.create()));
};
}
/** */
public void waitForSameData(
IgniteCache<Integer, ConflictResolvableTestData> src,
IgniteCache<Integer, ConflictResolvableTestData> dest,
int keysCnt,
WaitDataMode mode,
List<IgniteInternalFuture<?>> futs
) throws IgniteInterruptedCheckedException {
assertTrue(waitForCondition(() -> {
for (int i = 0; i < keysCnt; i++) {
if (mode == WaitDataMode.EXISTS) {
if (!src.containsKey(i) || !dest.containsKey(i))
return checkFuts(false, futs);
}
else if (mode == WaitDataMode.REMOVED) {
if (src.containsKey(i) || dest.containsKey(i))
return checkFuts(false, futs);
continue;
}
else
throw new IllegalArgumentException(mode + " not supported.");
ConflictResolvableTestData data = dest.get(i);
if (!data.equals(src.get(i)))
return checkFuts(false, futs);
}
return checkFuts(true, futs);
}, getTestTimeout()));
}
/** */
private boolean checkFuts(boolean res, List<IgniteInternalFuture<?>> futs) {
for (int i = 0; i < futs.size(); i++)
assertFalse("Fut " + i, futs.get(i).isDone());
return res;
}
/** */
private IgniteCache<Integer, ConflictResolvableTestData> createCache(IgniteEx ignite, String name) {
CacheConfiguration<Integer, ConflictResolvableTestData> ccfg = new CacheConfiguration<Integer, ConflictResolvableTestData>()
.setName(name)
.setCacheMode(mode)
.setAtomicityMode(atomicity);
if (mode != REPLICATED)
ccfg.setBackups(backups);
return ignite.getOrCreateCache(ccfg);
}
/** */
private List<List<?>> executeSql(IgniteEx node, String sqlText, Object... args) {
return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
}
/** */
protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache);
/** */
protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
}