IGNITE-15064 Added tests for replicated caches (#67)

diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index fb980ed..17468df 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cdc;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -26,7 +27,9 @@
 import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -44,6 +47,8 @@
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -52,22 +57,34 @@
 /** */
 @RunWith(Parameterized.class)
 public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
-    /** Cache mode. */
+    /** Cache atomicity mode. */
     @Parameterized.Parameter
-    public CacheAtomicityMode cacheMode;
+    public CacheAtomicityMode atomicity;
+
+    /** Cache replication mode. */
+    @Parameterized.Parameter(1)
+    public CacheMode mode;
 
     /** */
-    @Parameterized.Parameter(1)
-    public int backupCnt;
+    @Parameterized.Parameter(2)
+    public int backups;
 
     /** @return Test parameters. */
-    @Parameterized.Parameters(name = "cacheMode={0},backupCnt={1}")
+    @Parameterized.Parameters(name = "atomicity={0}, mode={1}, backupCnt={2}")
     public static Collection<?> parameters() {
         List<Object[]> params = new ArrayList<>();
 
-        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
-            for (int i = 0; i < 2; i++)
-                params.add(new Object[] {mode, i});
+        for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
+            for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
+                for (int backups = 0; backups < 2; backups++) {
+                    // backupCount ignored for REPLICATED caches.
+                    if (backups > 0 && mode == REPLICATED)
+                        continue;
+
+                    params.add(new Object[] {atomicity, mode, backups});
+                }
+            }
+        }
 
         return params;
     }
@@ -97,7 +114,7 @@
     }
 
     /** */
-    public static final int KEYS_CNT = 1000;
+    public static final int KEYS_CNT = 500;
 
     /** */
     protected static IgniteEx[] srcCluster;
@@ -130,7 +147,7 @@
             CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();
 
             cfgPlugin.setClusterId(clusterId);
-            cfgPlugin.setCaches(new HashSet<>(Collections.singletonList(ACTIVE_ACTIVE_CACHE)));
+            cfgPlugin.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
             cfgPlugin.setConflictResolveField("reqId");
 
             cfg.setPluginProviders(cfgPlugin);
@@ -175,7 +192,11 @@
     }
 
     /** */
-    private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(String clusterTag, String clientPrefix, int idx) throws Exception {
+    private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(
+        String clusterTag,
+        String clientPrefix,
+        int idx
+    ) throws Exception {
         IgniteEx[] cluster = new IgniteEx[] {
             startGrid(idx + 1),
             startGrid(idx + 2)
@@ -205,17 +226,17 @@
         List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();
 
         try {
-            IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].createCache(ACTIVE_PASSIVE_CACHE);
+            IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);
 
-            destCache.put(1, ConflictResolvableTestData.create());
-            destCache.remove(1);
+            destCache.put(KEYS_CNT + 1, ConflictResolvableTestData.create());
+            destCache.remove(KEYS_CNT + 1);
 
             // Updates for "ignored-cache" should be ignored because of CDC consume configuration.
             runAsync(generateData(IGNORED_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
             runAsync(generateData(ACTIVE_PASSIVE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
 
             IgniteCache<Integer, ConflictResolvableTestData> srcCache =
-                srcCluster[srcCluster.length - 1].getOrCreateCache(ACTIVE_PASSIVE_CACHE);
+                createCache(srcCluster[srcCluster.length - 1], ACTIVE_PASSIVE_CACHE);
 
             waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
 
@@ -234,8 +255,8 @@
     /** Active/Active mode means changes made in both clusters. */
     @Test
     public void testActiveActiveReplication() throws Exception {
-        IgniteCache<Integer, ConflictResolvableTestData> srcCache = srcCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
-        IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
+        IgniteCache<Integer, ConflictResolvableTestData> srcCache = createCache(srcCluster[0], ACTIVE_ACTIVE_CACHE);
+        IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_ACTIVE_CACHE);
 
         // Even keys goes to src cluster.
         runAsync(generateData(ACTIVE_ACTIVE_CACHE, srcCluster[srcCluster.length - 1],
@@ -262,9 +283,9 @@
     }
 
     /** */
-    public static Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
+    public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
         return () -> {
-            IgniteCache<Integer, ConflictResolvableTestData> cache = ign.getOrCreateCache(cacheName);
+            IgniteCache<Integer, ConflictResolvableTestData> cache = createCache(ign, cacheName);
 
             keys.forEach(i -> cache.put(i, ConflictResolvableTestData.create()));
         };
@@ -312,6 +333,19 @@
     }
 
     /** */
+    private IgniteCache<Integer, ConflictResolvableTestData> createCache(IgniteEx ignite, String name) {
+        CacheConfiguration<Integer, ConflictResolvableTestData> ccfg = new CacheConfiguration<Integer, ConflictResolvableTestData>()
+            .setName(name)
+            .setCacheMode(mode)
+            .setAtomicityMode(atomicity);
+
+        if (mode != REPLICATED)
+            ccfg.setBackups(backups);
+
+        return ignite.getOrCreateCache(ccfg);
+    }
+
+    /** */
     protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();
 
     /** */