IGNITE-15769 Fixed composition key error (#79)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 777b992..2120318 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -99,7 +99,12 @@
CacheEntryVersion order = evt.version();
- KeyCacheObject key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
+ KeyCacheObject key;
+
+ if (evt.key() instanceof KeyCacheObject)
+ key = (KeyCacheObject)evt.key();
+ else
+ key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
if (evt.value() != null) {
evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 6db6eb3..8438cf6 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -25,7 +25,9 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
@@ -288,6 +290,82 @@
}
}
+ /** Replication with complex SQL key. Data inserted via SQL. */
+ @Test
+ public void testActivePassiveReplicationComplexKeyWithSQL() throws Exception {
+ doTestActivePassiveSqlDataReplicationComplexKey("T2", (ignite, id) -> executeSql(
+ ignite,
+ "INSERT INTO T2 (ID, SUBID, NAME, ORGID) VALUES(?, ?, ?, ?)",
+ id,
+ "SUBID",
+ "Name" + id,
+ id * 42
+ ));
+ }
+
+ /** Replication with complex SQL key. Data inserted via key-value API. */
+ @Test
+ public void testActivePassiveReplicationComplexKeyWithKeyValue() throws Exception {
+ doTestActivePassiveSqlDataReplicationComplexKey("T3", (ignite, id) -> {
+ ignite.cache("T3").put(new TestKey(id, "SUBID"), new TestVal("Name" + id, id * 42));
+ });
+ }
+
+ /** */
+ public void doTestActivePassiveSqlDataReplicationComplexKey(String name, BiConsumer<IgniteEx, Integer> addData) throws Exception {
+ String createTbl = "CREATE TABLE IF NOT EXISTS " + name + "(" +
+ " ID INT NOT NULL, " +
+ " SUBID VARCHAR NOT NULL, " +
+ " NAME VARCHAR, " +
+ " ORGID INT, " +
+ " PRIMARY KEY (ID, SUBID))" +
+ " WITH \"CACHE_NAME=" + name + "," +
+ "KEY_TYPE=" + TestKey.class.getName() + "," +
+ "VALUE_TYPE=" + TestVal.class.getName() + "," +
+ "ATOMICITY=" + atomicity.name() + "\";";
+
+ executeSql(srcCluster[0], createTbl);
+ executeSql(destCluster[0], createTbl);
+
+ addData.accept(destCluster[0], -1);
+ executeSql(destCluster[0], "DELETE FROM " + name);
+
+ IntStream.range(0, KEYS_CNT).forEach(i -> addData.accept(srcCluster[0], i));
+
+ List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(name);
+
+ try {
+ Function<Integer, GridAbsPredicate> waitForTblSz = expSz -> () -> {
+ long cnt = (Long)executeSql(destCluster[0], "SELECT COUNT(*) FROM " + name).get(0).get(0);
+
+ return cnt == expSz;
+ };
+
+ assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));
+
+ checkMetrics();
+
+ List<List<?>> data = executeSql(destCluster[0], "SELECT ID, SUBID, NAME, ORGID FROM " + name + " ORDER BY ID");
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ assertEquals(i, data.get(i).get(0));
+ assertEquals("SUBID", data.get(i).get(1));
+ assertEquals("Name" + i, data.get(i).get(2));
+ assertEquals(i * 42, data.get(i).get(3));
+ }
+
+ executeSql(srcCluster[0], "DELETE FROM " + name);
+
+ assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
+
+ checkMetrics();
+ }
+ finally {
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.cancel();
+ }
+ }
+
/** Active/Passive mode means changes made only in one cluster. */
@Test
public void testActivePassiveSqlDataReplication() throws Exception {
@@ -491,4 +569,39 @@
assertNotNull(longMetric.apply(LAST_EVT_TIME));
assertNotNull(longMetric.apply(EVTS_CNT));
}
+
+
+ /** */
+ private static class TestKey {
+ /** Id. */
+ private final int id;
+
+ /** Sub id. */
+ private final String subId;
+
+ /** */
+ public TestKey(int id, String subId) {
+ this.id = id;
+ this.subId = subId;
+ }
+
+ public int getId() {
+ return id;
+ }
+ }
+
+ /** */
+ private static class TestVal {
+ /** Name. */
+ private final String name;
+
+ /** Org id. */
+ private final int orgId;
+
+ /** */
+ public TestVal(String name, int orgId) {
+ this.name = name;
+ this.orgId = orgId;
+ }
+ }
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index ce51a7b..8f1bb5c 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -21,7 +21,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.Set;
import java.util.function.Function;
import org.apache.ignite.cdc.AbstractReplicationTest;
import org.apache.ignite.cdc.CdcConfiguration;
@@ -35,7 +34,6 @@
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_TOPIC;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -62,7 +60,6 @@
KAFKA.start();
}
- KAFKA.createTopic(DFLT_TOPIC, DFLT_PARTS, 1);
KAFKA.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
KAFKA.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
}
@@ -71,28 +68,38 @@
@Override protected void afterTest() throws Exception {
super.afterTest();
- KAFKA.deleteTopic(DFLT_TOPIC);
- KAFKA.deleteTopic(SRC_DEST_TOPIC);
- KAFKA.deleteTopic(DEST_SRC_TOPIC);
+ KAFKA.getAllTopicsInCluster().forEach(t -> {
+ try {
+ KAFKA.deleteTopic(t);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
- waitForCondition(() -> {
- Set<String> topics = KAFKA.getAllTopicsInCluster();
-
- return !topics.contains(DFLT_TOPIC) && !topics.contains(SRC_DEST_TOPIC) && !topics.contains(DEST_SRC_TOPIC);
- }, getTestTimeout());
+ waitForCondition(() -> KAFKA.getAllTopicsInCluster().isEmpty(), getTestTimeout());
}
/** {@inheritDoc} */
@Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
+ try {
+ KAFKA.createTopic(cache, DFLT_PARTS, 1);
+
+ waitForCondition(() -> KAFKA.getAllTopicsInCluster().contains(cache), getTestTimeout());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (IgniteEx ex : srcCluster)
- futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, cache));
+ futs.add(igniteToKafka(ex.configuration(), cache, cache));
for (int i = 0; i < destCluster.length; i++) {
futs.add(kafkaToIgnite(
cache,
- DFLT_TOPIC,
+ cache,
destClusterCliCfg[i],
i * (DFLT_PARTS / 2),
(i + 1) * (DFLT_PARTS / 2)