IGNITE-15769 Fixed composition key error (#79)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 777b992..2120318 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -99,7 +99,12 @@
 
             CacheEntryVersion order = evt.version();
 
-            KeyCacheObject key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
+            KeyCacheObject key;
+
+            if (evt.key() instanceof KeyCacheObject)
+                key = (KeyCacheObject)evt.key();
+            else
+                key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
 
             if (evt.value() != null) {
                 evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 6db6eb3..8438cf6 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -25,7 +25,9 @@
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.IntConsumer;
 import java.util.stream.IntStream;
 import javax.management.DynamicMBean;
 import org.apache.ignite.IgniteCache;
@@ -288,6 +290,82 @@
         }
     }
 
+    /** Replication with complex SQL key. Data inserted via SQL. */
+    @Test
+    public void testActivePassiveReplicationComplexKeyWithSQL() throws Exception {
+        doTestActivePassiveSqlDataReplicationComplexKey("T2", (ignite, id) -> executeSql(
+            ignite,
+            "INSERT INTO T2 (ID, SUBID, NAME, ORGID) VALUES(?, ?, ?, ?)",
+            id,
+            "SUBID",
+            "Name" + id,
+            id * 42
+        ));
+    }
+
+    /** Replication with complex SQL key. Data inserted via key-value API. */
+    @Test
+    public void testActivePassiveReplicationComplexKeyWithKeyValue() throws Exception {
+        doTestActivePassiveSqlDataReplicationComplexKey("T3", (ignite, id) -> {
+            ignite.cache("T3").put(new TestKey(id, "SUBID"), new TestVal("Name" + id, id * 42));
+        });
+    }
+
+    /** */
+    public void doTestActivePassiveSqlDataReplicationComplexKey(String name, BiConsumer<IgniteEx, Integer> addData) throws Exception {
+        String createTbl = "CREATE TABLE IF NOT EXISTS " + name + "(" +
+            "    ID INT NOT NULL, " +
+            "    SUBID VARCHAR NOT NULL, " +
+            "    NAME VARCHAR, " +
+            "    ORGID INT, " +
+            "    PRIMARY KEY (ID, SUBID))" +
+            "    WITH \"CACHE_NAME=" + name + "," +
+            "KEY_TYPE=" + TestKey.class.getName() + "," +
+            "VALUE_TYPE=" + TestVal.class.getName() + "," +
+            "ATOMICITY=" + atomicity.name() + "\";";
+
+        executeSql(srcCluster[0], createTbl);
+        executeSql(destCluster[0], createTbl);
+
+        addData.accept(destCluster[0], -1);
+        executeSql(destCluster[0], "DELETE FROM " + name);
+
+        IntStream.range(0, KEYS_CNT).forEach(i -> addData.accept(srcCluster[0], i));
+
+        List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(name);
+
+        try {
+            Function<Integer, GridAbsPredicate> waitForTblSz = expSz -> () -> {
+                long cnt = (Long)executeSql(destCluster[0], "SELECT COUNT(*) FROM " + name).get(0).get(0);
+
+                return cnt == expSz;
+            };
+
+            assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));
+
+            checkMetrics();
+
+            List<List<?>> data = executeSql(destCluster[0], "SELECT ID, SUBID, NAME, ORGID FROM " + name + " ORDER BY ID");
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                assertEquals(i, data.get(i).get(0));
+                assertEquals("SUBID", data.get(i).get(1));
+                assertEquals("Name" + i, data.get(i).get(2));
+                assertEquals(i * 42, data.get(i).get(3));
+            }
+
+            executeSql(srcCluster[0], "DELETE FROM " + name);
+
+            assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
+
+            checkMetrics();
+        }
+        finally {
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.cancel();
+        }
+    }
+
     /** Active/Passive mode means changes made only in one cluster. */
     @Test
     public void testActivePassiveSqlDataReplication() throws Exception {
@@ -491,4 +569,39 @@
         assertNotNull(longMetric.apply(LAST_EVT_TIME));
         assertNotNull(longMetric.apply(EVTS_CNT));
     }
+
+
+    /** */
+    private static class TestKey {
+        /** Id. */
+        private final int id;
+
+        /** Sub id. */
+        private final String subId;
+
+        /** */
+        public TestKey(int id, String subId) {
+            this.id = id;
+            this.subId = subId;
+        }
+
+        public int getId() {
+            return id;
+        }
+    }
+
+    /** */
+    private static class TestVal {
+        /** Name. */
+        private final String name;
+
+        /** Org id. */
+        private final int orgId;
+
+        /** */
+        public TestVal(String name, int orgId) {
+            this.name = name;
+            this.orgId = orgId;
+        }
+    }
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index ce51a7b..8f1bb5c 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -21,7 +21,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 import java.util.function.Function;
 import org.apache.ignite.cdc.AbstractReplicationTest;
 import org.apache.ignite.cdc.CdcConfiguration;
@@ -35,7 +34,6 @@
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 
 import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_TOPIC;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -62,7 +60,6 @@
             KAFKA.start();
         }
 
-        KAFKA.createTopic(DFLT_TOPIC, DFLT_PARTS, 1);
         KAFKA.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
         KAFKA.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
     }
@@ -71,28 +68,38 @@
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
-        KAFKA.deleteTopic(DFLT_TOPIC);
-        KAFKA.deleteTopic(SRC_DEST_TOPIC);
-        KAFKA.deleteTopic(DEST_SRC_TOPIC);
+        KAFKA.getAllTopicsInCluster().forEach(t -> {
+            try {
+                KAFKA.deleteTopic(t);
+            }
+            catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
 
-        waitForCondition(() -> {
-            Set<String> topics = KAFKA.getAllTopicsInCluster();
-
-            return !topics.contains(DFLT_TOPIC) && !topics.contains(SRC_DEST_TOPIC) && !topics.contains(DEST_SRC_TOPIC);
-        }, getTestTimeout());
+        waitForCondition(() -> KAFKA.getAllTopicsInCluster().isEmpty(), getTestTimeout());
     }
 
     /** {@inheritDoc} */
     @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
+        try {
+            KAFKA.createTopic(cache, DFLT_PARTS, 1);
+
+            waitForCondition(() -> KAFKA.getAllTopicsInCluster().contains(cache), getTestTimeout());
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (IgniteEx ex : srcCluster)
-            futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, cache));
+            futs.add(igniteToKafka(ex.configuration(), cache, cache));
 
         for (int i = 0; i < destCluster.length; i++) {
             futs.add(kafkaToIgnite(
                 cache,
-                DFLT_TOPIC,
+                cache,
                 destClusterCliCfg[i],
                 i * (DFLT_PARTS / 2),
                 (i + 1) * (DFLT_PARTS / 2)