IGNITE-15064 Added tests for replicated caches (#67)
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index fb980ed..17468df 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cdc;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -26,7 +27,9 @@
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -44,6 +47,8 @@
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -52,22 +57,34 @@
/** */
@RunWith(Parameterized.class)
public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
- /** Cache mode. */
+ /** Cache atomicity mode. */
@Parameterized.Parameter
- public CacheAtomicityMode cacheMode;
+ public CacheAtomicityMode atomicity;
+
+ /** Cache replication mode. */
+ @Parameterized.Parameter(1)
+ public CacheMode mode;
/** */
- @Parameterized.Parameter(1)
- public int backupCnt;
+ @Parameterized.Parameter(2)
+ public int backups;
/** @return Test parameters. */
- @Parameterized.Parameters(name = "cacheMode={0},backupCnt={1}")
+ @Parameterized.Parameters(name = "atomicity={0}, mode={1}, backupCnt={2}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();
- for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
- for (int i = 0; i < 2; i++)
- params.add(new Object[] {mode, i});
+ for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
+ for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
+ for (int backups = 0; backups < 2; backups++) {
+ // backupCount ignored for REPLICATED caches.
+ if (backups > 0 && mode == REPLICATED)
+ continue;
+
+ params.add(new Object[] {atomicity, mode, backups});
+ }
+ }
+ }
return params;
}
@@ -97,7 +114,7 @@
}
/** */
- public static final int KEYS_CNT = 1000;
+ public static final int KEYS_CNT = 500;
/** */
protected static IgniteEx[] srcCluster;
@@ -130,7 +147,7 @@
CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();
cfgPlugin.setClusterId(clusterId);
- cfgPlugin.setCaches(new HashSet<>(Collections.singletonList(ACTIVE_ACTIVE_CACHE)));
+ cfgPlugin.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
cfgPlugin.setConflictResolveField("reqId");
cfg.setPluginProviders(cfgPlugin);
@@ -175,7 +192,11 @@
}
/** */
- private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(String clusterTag, String clientPrefix, int idx) throws Exception {
+ private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(
+ String clusterTag,
+ String clientPrefix,
+ int idx
+ ) throws Exception {
IgniteEx[] cluster = new IgniteEx[] {
startGrid(idx + 1),
startGrid(idx + 2)
@@ -205,17 +226,17 @@
List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();
try {
- IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].createCache(ACTIVE_PASSIVE_CACHE);
+ IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);
- destCache.put(1, ConflictResolvableTestData.create());
- destCache.remove(1);
+ destCache.put(KEYS_CNT + 1, ConflictResolvableTestData.create());
+ destCache.remove(KEYS_CNT + 1);
// Updates for "ignored-cache" should be ignored because of CDC consume configuration.
runAsync(generateData(IGNORED_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
runAsync(generateData(ACTIVE_PASSIVE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
IgniteCache<Integer, ConflictResolvableTestData> srcCache =
- srcCluster[srcCluster.length - 1].getOrCreateCache(ACTIVE_PASSIVE_CACHE);
+ createCache(srcCluster[srcCluster.length - 1], ACTIVE_PASSIVE_CACHE);
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
@@ -234,8 +255,8 @@
/** Active/Active mode means changes made in both clusters. */
@Test
public void testActiveActiveReplication() throws Exception {
- IgniteCache<Integer, ConflictResolvableTestData> srcCache = srcCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
- IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
+ IgniteCache<Integer, ConflictResolvableTestData> srcCache = createCache(srcCluster[0], ACTIVE_ACTIVE_CACHE);
+ IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_ACTIVE_CACHE);
// Even keys goes to src cluster.
runAsync(generateData(ACTIVE_ACTIVE_CACHE, srcCluster[srcCluster.length - 1],
@@ -262,9 +283,9 @@
}
/** */
- public static Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
+ public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
return () -> {
- IgniteCache<Integer, ConflictResolvableTestData> cache = ign.getOrCreateCache(cacheName);
+ IgniteCache<Integer, ConflictResolvableTestData> cache = createCache(ign, cacheName);
keys.forEach(i -> cache.put(i, ConflictResolvableTestData.create()));
};
@@ -312,6 +333,19 @@
}
/** */
+ private IgniteCache<Integer, ConflictResolvableTestData> createCache(IgniteEx ignite, String name) {
+ CacheConfiguration<Integer, ConflictResolvableTestData> ccfg = new CacheConfiguration<Integer, ConflictResolvableTestData>()
+ .setName(name)
+ .setCacheMode(mode)
+ .setAtomicityMode(atomicity);
+
+ if (mode != REPLICATED)
+ ccfg.setBackups(backups);
+
+ return ignite.getOrCreateCache(ccfg);
+ }
+
+ /** */
protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();
/** */