IGNITE-14362: CDC Ignite to Kafka extension (#49)
diff --git a/.gitignore b/.gitignore
index 00dcb00..59df72e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,6 +25,7 @@
.metadata
bin/
!/modules/performance-statistics-ext/bin
+!/modules/cdc-ext/bin
tmp/
*.tmp
*.bak
diff --git a/modules/cdc-ext/assembly/cdc-ext.xml b/modules/cdc-ext/assembly/cdc-ext.xml
new file mode 100644
index 0000000..baaee78
--- /dev/null
+++ b/modules/cdc-ext/assembly/cdc-ext.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
+ http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>cdc-ext</id>
+
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <formats>
+ <format>zip</format>
+ </formats>
+
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}</directory>
+ <outputDirectory>/libs/optional/ignite-cdc-ext</outputDirectory>
+ <includes>
+ <include>${project.build.finalName}.${project.packaging}</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/libs</directory>
+ <outputDirectory>/libs/optional/ignite-cdc-ext</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/bin</directory>
+ <outputDirectory>/bin</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/modules/cdc-ext/bin/kafka-to-ignite.sh b/modules/cdc-ext/bin/kafka-to-ignite.sh
new file mode 100755
index 0000000..068a229
--- /dev/null
+++ b/modules/cdc-ext/bin/kafka-to-ignite.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export MAIN_CLASS="org.apache.ignite.cdc.kafka.KafkaToIgniteCommandLineStartup"
+
+if [ "${IGNITE_HOME:-}" = "" ];
+ then IGNITE_HOME_TMP="$(dirname "$(cd "$(dirname "$0")"; "pwd")")";
+ else IGNITE_HOME_TMP=${IGNITE_HOME};
+fi
+
+${IGNITE_HOME_TMP}/bin/ignite.sh "$@"
diff --git a/modules/cdc-ext/modules/core/src/test/config/log4j-test.xml b/modules/cdc-ext/modules/core/src/test/config/log4j-test.xml
new file mode 100644
index 0000000..cc2146e
--- /dev/null
+++ b/modules/cdc-ext/modules/core/src/test/config/log4j-test.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+ "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="DEBUG"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+
+ <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="levelMin" value="DEBUG"/>
+ <param name="levelMax" value="INFO"/>
+ </filter>
+ </appender>
+
+ <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.err"/>
+
+ <param name="Threshold" value="WARN"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org">
+ <level value="INFO"/>
+ </category>
+
+ <root>
+ <level value="INFO"/>
+
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="CONSOLE_ERR"/>
+ </root>
+</log4j:configuration>
diff --git a/modules/cdc-ext/modules/core/src/test/config/tests.properties b/modules/cdc-ext/modules/core/src/test/config/tests.properties
new file mode 100644
index 0000000..228ed16
--- /dev/null
+++ b/modules/cdc-ext/modules/core/src/test/config/tests.properties
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file
diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
new file mode 100644
index 0000000..f85b1b8
--- /dev/null
+++ b/modules/cdc-ext/pom.xml
@@ -0,0 +1,157 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-extensions-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <properties>
+ <kafka.version>2.7.0</kafka.version>
+ <test.containers.version>1.15.1</test.containers.version>
+ <slf4j.version>1.7.30</slf4j.version>
+ </properties>
+
+ <artifactId>ignite-cdc-ext</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${ignite.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${ignite.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${ignite.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${ignite.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/libs</outputDirectory>
+ <includeScope>compile</includeScope>
+ <excludeArtifactIds>ignite-core,ignite-spring,ignite-shmem</excludeArtifactIds>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>cdc-ext</id>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <appendAssemblyId>false</appendAssemblyId>
+ <descriptors>
+ <descriptor>assembly/cdc-ext.xml</descriptor>
+ </descriptors>
+ <finalName>ignite-cdc-ext</finalName>
+ <attach>false</attach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
new file mode 100644
index 0000000..a8fe3ff
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster.
+ */
+public abstract class CdcEventsApplier {
+ /** Maximum batch size. */
+ private final int maxBatchSize;
+
+ /** Caches. */
+ private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();
+
+ /** Update batch. */
+ private final Map<KeyCacheObject, GridCacheDrInfo> updBatch = new HashMap<>();
+
+ /** Remove batch. */
+ private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new HashMap<>();
+
+ /** */
+ private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
+
+ /** */
+ private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
+
+ /** */
+ protected final AtomicLong evtsApplied = new AtomicLong();
+
+ /**
+ * @param maxBatchSize Maximum batch size.
+ */
+ public CdcEventsApplier(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ /**
+ * @param evts Events to process.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
+ IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;
+
+ for (CdcEvent evt : evts) {
+ if (log().isDebugEnabled())
+ log().debug("Event received [key=" + evt.key() + ']');
+
+ IgniteInternalCache<BinaryObject, BinaryObject> cache = ignCaches.computeIfAbsent(evt.cacheId(), cacheId -> {
+ for (String cacheName : ignite().cacheNames()) {
+ if (CU.cacheId(cacheName) == cacheId) {
+ // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
+ ignite().cache(cacheName);
+
+ return ignite().cachex(cacheName);
+ }
+ }
+
+ throw new IllegalStateException("Cache with id not found [cacheId=" + cacheId + ']');
+ });
+
+ if (cache != currCache) {
+ applyIf(currCache, hasUpdates, hasRemoves);
+
+ currCache = cache;
+ }
+
+ CacheEntryVersion order = evt.version();
+
+ KeyCacheObject key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
+
+ if (evt.value() != null) {
+ applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
+
+ CacheObject val = new CacheObjectImpl(evt.value(), null);
+
+ updBatch.put(key, new GridCacheDrInfo(val,
+ new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
+ }
+ else {
+ applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
+
+ rmvBatch.put(key,
+ new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
+ }
+
+ evtsApplied.incrementAndGet();
+ }
+
+ if (currCache != null)
+ applyIf(currCache, hasUpdates, hasRemoves);
+ }
+
+ /**
+ * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required.
+ *
+ * @param cache Current cache.
+ * @param applyUpd Apply update batch flag supplier.
+ * @param applyRmv Apply remove batch flag supplier.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void applyIf(
+ IgniteInternalCache<BinaryObject, BinaryObject> cache,
+ BooleanSupplier applyUpd,
+ BooleanSupplier applyRmv
+ ) throws IgniteCheckedException {
+ if (applyUpd.getAsBoolean()) {
+ if (log().isDebugEnabled())
+ log().debug("Applying put batch [cache=" + cache.name() + ']');
+
+ cache.putAllConflict(updBatch);
+
+ updBatch.clear();
+ }
+
+ if (applyRmv.getAsBoolean()) {
+ if (log().isDebugEnabled())
+ log().debug("Applying remove batch [cache=" + cache.name() + ']');
+
+ cache.removeAllConflict(rmvBatch);
+
+ rmvBatch.clear();
+ }
+ }
+
+ /** @return {@code True} if update batch should be applied. */
+ private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
+ return map.size() >= maxBatchSize || map.containsKey(key);
+ }
+
+ /** @return Ignite instance. */
+ protected abstract IgniteEx ignite();
+
+ /** @return Logger. */
+ protected abstract IgniteLogger log();
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
new file mode 100644
index 0000000..b8840bb
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
+ * Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
+ * It expected that {@code ignite-cdc} will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka unavailability or network issues.
+ *
+ * If you have plans to apply written messages to the other Ignite cluster in active-active manner,
+ * e.g. concurrent updates of the same entry in other cluster is possible,
+ * please, be aware of {@link CacheVersionConflictResolverImpl} conflict resolved.
+ * Configuration of {@link CacheVersionConflictResolverImpl} can be found in {@link KafkaToIgniteCdcStreamer} documentation.
+ *
+ * @see CdcMain
+ * @see CacheVersionConflictResolverImpl
+ */
+public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
+ /** Destination cluster client configuration. */
+ private final IgniteConfiguration destIgniteCfg;
+
+ /** Handle only primary entry flag. */
+ private final boolean onlyPrimary;
+
+ /** Destination Ignite cluster client */
+ private IgniteEx dest;
+
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Cache IDs. */
+ private final Set<Integer> cachesIds;
+
+ /**
+ * @param destIgniteCfg Configuration of the destination Ignite node.
+ * @param onlyPrimary Only primary flag.
+ * @param caches Cache names.
+ * @param maxBatchSize Maximum batch size.
+ */
+ public IgniteToIgniteCdcStreamer(IgniteConfiguration destIgniteCfg, boolean onlyPrimary, Set<String> caches, int maxBatchSize) {
+ super(maxBatchSize);
+
+ this.destIgniteCfg = destIgniteCfg;
+ this.onlyPrimary = onlyPrimary;
+
+ cachesIds = caches.stream()
+ .mapToInt(CU::cacheId)
+ .boxed()
+ .collect(Collectors.toSet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ if (log.isInfoEnabled())
+ log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
+
+ dest = (IgniteEx)Ignition.start(destIgniteCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+ try {
+ apply(() -> F.iterator(
+ evts,
+ F.identity(),
+ true,
+ evt -> !onlyPrimary || evt.primary(),
+ evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
+ evt -> evt.version().otherClusterVersion() == null));
+
+ if (log.isInfoEnabled())
+ log.info("Events applied [evtsApplied=" + evtsApplied.get() + ']');
+
+ return true;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ dest.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteEx ignite() {
+ return dest;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteLogger log() {
+ return log;
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
new file mode 100644
index 0000000..5227c56
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ * Intermediate component to provide {@link CacheVersionConflictResolverImpl} for specific cache.
+ *
+ * @see CacheVersionConflictResolverImpl
+ * @see CacheVersionConflictResolver
+ */
+public class CacheConflictResolutionManagerImpl<K, V> implements CacheConflictResolutionManager<K, V> {
+ /**
+ * Field for conflict resolve.
+ * Value of this field will be used to compare two entries in case of conflicting changes.
+ * Note, values of this field must implement {@link Comparable}.
+ *
+ * @see CacheVersionConflictResolverImpl
+ */
+ private final String conflictResolveField;
+
+ /** Grid cache context. */
+ private GridCacheContext<K, V> cctx;
+
+ /**
+ * @param conflictResolveField Field to resolve conflicts.
+ */
+ public CacheConflictResolutionManagerImpl(String conflictResolveField) {
+ this.conflictResolveField = conflictResolveField;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheVersionConflictResolver conflictResolver() {
+ return new CacheVersionConflictResolverImpl(
+ cctx.versions().dataCenterId(),
+ conflictResolveField,
+ cctx.logger(CacheVersionConflictResolverImpl.class)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(GridCacheContext<K, V> cctx) {
+ this.cctx = cctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel, boolean destroy) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ // No-op.
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
new file mode 100644
index 0000000..5d152f9
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import javax.cache.Cache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Intermediate component to provide {@link CacheConflictResolutionManagerImpl} for specific cache.
+ *
+ * @see CacheConflictResolutionManagerImpl
+ * @see CacheVersionConflictResolverImpl
+ * @see CacheVersionConflictResolver
+ */
+public class CacheVersionConflictResolverCachePluginProvider<K, V, C extends CachePluginConfiguration<K, V>>
+ implements CachePluginProvider<C> {
+ /**
+ * Field for conflict resolve.
+ * Value of this field will be used to compare two entries in case of conflicting changes.
+ * Note, values of this field must implement {@link Comparable}.
+ *
+ * @see CacheVersionConflictResolverImpl
+ */
+ private final String conflictResolveField;
+
+ /**
+ * @param conflictResolveField Field to resolve conflicts.
+ */
+ public CacheVersionConflictResolverCachePluginProvider(String conflictResolveField) {
+ this.conflictResolveField = conflictResolveField;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Nullable @Override public <T> T createComponent(Class<T> cls) {
+ if (cls.equals(CacheConflictResolutionManager.class))
+ return (T)new CacheConflictResolutionManagerImpl<>(conflictResolveField);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validate() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateRemote(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
+ return null;
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
new file mode 100644
index 0000000..a6551ea
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * This class implements simple conflict resolution algorithm.
+ * Algorithm decides which version of the entry should be used "new" or "old".
+ * The following steps performed:
+ * <ul>
+ * <li>If entry is freshly created then new version used - {@link GridCacheVersionedEntryEx#isStartVersion()}.</li>
+ * <li>If change made in this cluster then new version used - {@link GridCacheVersionedEntryEx#dataCenterId()}.</li>
+ * <li>If cluster of new entry equal to cluster of old entry
+ * then entry with the greater {@link GridCacheVersionedEntryEx#order()} used.</li>
+ * <li>If {@link #conflictResolveField} provided and field of new entry greater then new version used.</li>
+ * <li>If {@link #conflictResolveField} provided and field of old entry greater then old version used.</li>
+ * <li>Conflict can't be resolved. Update ignored. Old version used.</li>
+ * </ul>
+ */
+public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver {
+ /**
+ * Cluster id.
+ */
+ private final byte clusterId;
+
+ /**
+ * Field for conflict resolve.
+ * Value of this field will be used to compare two entries in case of conflicting changes.
+ * values of this field must implement {@link Comparable} interface.
+ * <pre><i>Note, value of this field used to resolve conflict for external updates only.</i>
+ *
+ * @see CacheVersionConflictResolverImpl
+ */
+ private final String conflictResolveField;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** If {@code true} then conflict resolving with the value field enabled. */
+ private final boolean conflictResolveFieldEnabled;
+
+ /**
+ * @param clusterId Data center id.
+ * @param conflictResolveField Field to resolve conflicts.
+ * @param log Logger.
+ */
+ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
+ this.clusterId = clusterId;
+ this.conflictResolveField = conflictResolveField;
+ this.log = log;
+
+ conflictResolveFieldEnabled = conflictResolveField != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
+ CacheObjectValueContext ctx,
+ GridCacheVersionedEntryEx<K, V> oldEntry,
+ GridCacheVersionedEntryEx<K, V> newEntry,
+ boolean atomicVerComparator
+ ) {
+ GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
+
+ if (isUseNew(ctx, oldEntry, newEntry))
+ res.useNew();
+ else
+ res.useOld();
+
+ return res;
+ }
+
+ /**
+ * @param ctx Context.
+ * @param oldEntry Old entry.
+ * @param newEntry New entry.
+ * @param <K> Key type.
+ * @param <V> Key type.
+ * @return {@code True} is should use new entry.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private <K, V> boolean isUseNew(
+ CacheObjectValueContext ctx,
+ GridCacheVersionedEntryEx<K, V> oldEntry,
+ GridCacheVersionedEntryEx<K, V> newEntry
+ ) {
+ if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
+ return true;
+
+ if (oldEntry.isStartVersion()) // Entry absent (new entry).
+ return true;
+
+ if (oldEntry.dataCenterId() == newEntry.dataCenterId())
+ return newEntry.version().compareTo(oldEntry.version()) > 0; // New version from the same cluster.
+
+ if (conflictResolveFieldEnabled) {
+ Object oldVal = oldEntry.value(ctx);
+ Object newVal = newEntry.value(ctx);
+
+ if (oldVal != null && newVal != null) {
+ Comparable oldResolveField;
+ Comparable newResolveField;
+
+ try {
+ if (oldVal instanceof BinaryObject) {
+ oldResolveField = ((BinaryObject)oldVal).field(conflictResolveField);
+ newResolveField = ((BinaryObject)newVal).field(conflictResolveField);
+ }
+ else {
+ oldResolveField = U.field(oldVal, conflictResolveField);
+ newResolveField = U.field(newVal, conflictResolveField);
+ }
+
+ return oldResolveField.compareTo(newResolveField) < 0;
+ }
+ catch (Exception e) {
+ log.error(
+ "Error while resolving replication conflict. [field=" + conflictResolveField + ", key=" + newEntry.key() + ']',
+ e
+ );
+ }
+ }
+ }
+
+ log.error("Conflict can't be resolved, update ignored [key=" + newEntry.key() + ", fromCluster=" + newEntry.dataCenterId()
+ + ", toCluster=" + oldEntry.dataCenterId() + ']');
+
+ // Ignoring update.
+ return false;
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
new file mode 100644
index 0000000..bf702e1
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Plugin to enable {@link CacheVersionConflictResolverImpl} for provided caches.
+ *
+ * @see CacheVersionConflictResolverImpl
+ * @see CacheVersionConflictResolver
+ */
+public class CacheVersionConflictResolverPluginProvider<C extends PluginConfiguration> implements PluginProvider<C> {
+ /** Plugin context. */
+ private PluginContext ctx;
+
+ /** Cluster id. */
+ private byte clusterId;
+
+ /** Cache names. */
+ private Set<String> caches;
+
+ /**
+ * Field for conflict resolve.
+ * Value of this field will be used to compare two entries in case of conflicting changes.
+ * Note, values of this field must implement {@link Comparable}.
+ *
+ * @see CacheVersionConflictResolverImpl
+ */
+ private String conflictResolveField;
+
+ /** Cache plugin provider. */
+ private CachePluginProvider<?> provider;
+
+ /** */
+ public CacheVersionConflictResolverPluginProvider() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "Cache version conflict resolver";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return "1.0.0-SNAPSHOT";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String copyright() {
+ return "Apache Software Foundation";
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+ this.ctx = ctx;
+
+ this.provider = new CacheVersionConflictResolverCachePluginProvider<>(conflictResolveField);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+ if (caches.contains(ctx.igniteCacheConfiguration().getName()))
+ return provider;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() {
+ IgniteEx ign = (IgniteEx)ctx.grid();
+
+ ign.context().cache().context().versions().dataCenterId(clusterId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgnitePlugin plugin() {
+ return new IgnitePlugin() { /* No-op. */ };
+ }
+
+ /** @param clusterId Data center ID. */
+ public void setClusterId(byte clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ /** @param caches Caches to replicate. */
+ public void setCaches(Set<String> caches) {
+ this.caches = caches;
+ }
+
+ /** @param conflictResolveField Field to resolve conflicts. */
+ public void setConflictResolveField(String conflictResolveField) {
+ this.conflictResolveField = conflictResolveField;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(PluginContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls) {
+ return null;
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
new file mode 100644
index 0000000..3f97ee8
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+/**
+ * Change Data Consumer that streams all data changes to Kafka topic.
+ * {@link CdcEvent} spread across Kafka topic partitions with {@code {ignite_partition} % {kafka_topic_count}} formula.
+ * Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
+ * It expected that {@code ignite-cdc} will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka unavailability or network issues.
+ *
+ * If you have plans to apply written messages to the other Ignite cluster in active-active manner,
+ * e.g. concurrent updates of the same entry in other cluster is possible,
+ * please, be aware of {@link CacheVersionConflictResolverImpl} conflict resolved.
+ * Configuration of {@link CacheVersionConflictResolverImpl} can be found in {@link KafkaToIgniteCdcStreamer} documentation.
+ *
+ * @see CdcMain
+ * @see KafkaToIgniteCdcStreamer
+ * @see CacheVersionConflictResolverImpl
+ */
+public class IgniteToKafkaCdcStreamer implements CdcConsumer {
+ /** Default kafka request timeout in seconds. */
+ public static final int DFLT_REQ_TIMEOUT = 5;
+
+ /** Log. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Kafka producer to stream events. */
+ private KafkaProducer<Integer, byte[]> producer;
+
+ /** Handle only primary entry flag. */
+ private final boolean onlyPrimary;
+
+ /** Topic name. */
+ private final String topic;
+
+ /** Kafka topic partitions count. */
+ private final int kafkaParts;
+
+ /** Kafka properties. */
+ private final Properties kafkaProps;
+
+ /** Cache IDs. */
+ private final Set<Integer> cachesIds;
+
+ /** Max batch size. */
+ private final int maxBatchSize;
+
+ /** Count of sent messages. */
+ private long msgCnt;
+
+ /**
+ * @param topic Topic name.
+ * @param kafkaParts Kafka partitions count.
+ * @param caches Cache names.
+ * @param maxBatchSize Maximum size of records concurrently sent to Kafka.
+ * @param onlyPrimary If {@code true} then stream only events from primaries.
+ * @param kafkaProps Kafka properties.
+ */
+ public IgniteToKafkaCdcStreamer(
+ String topic,
+ int kafkaParts,
+ Set<String> caches,
+ int maxBatchSize,
+ boolean onlyPrimary,
+ Properties kafkaProps
+ ) {
+ assert caches != null && !caches.isEmpty();
+
+ this.topic = topic;
+ this.kafkaParts = kafkaParts;
+ this.onlyPrimary = onlyPrimary;
+ this.kafkaProps = kafkaProps;
+ this.maxBatchSize = maxBatchSize;
+
+ cachesIds = caches.stream()
+ .mapToInt(CU::cacheId)
+ .boxed()
+ .collect(Collectors.toSet());
+
+ kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+ List<Future<RecordMetadata>> futs = new ArrayList<>();
+
+ while (evts.hasNext() && futs.size() < maxBatchSize) {
+ CdcEvent evt = evts.next();
+
+ if (log.isDebugEnabled())
+ log.debug("Event received [evt=" + evt + ']');
+
+ if (onlyPrimary && !evt.primary()) {
+ if (log.isDebugEnabled())
+ log.debug("Event skipped because of primary flag [evt=" + evt + ']');
+
+ continue;
+ }
+
+ if (evt.version().otherClusterVersion() != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Event skipped because of version [evt=" + evt +
+ ", otherClusterVersion=" + evt.version().otherClusterVersion() + ']');
+ }
+
+ continue;
+ }
+
+ if (!cachesIds.isEmpty() && !cachesIds.contains(evt.cacheId())) {
+ if (log.isDebugEnabled())
+ log.debug("Event skipped because of cacheId [evt=" + evt + ']');
+
+ continue;
+ }
+
+ msgCnt++;
+
+ futs.add(producer.send(new ProducerRecord<>(
+ topic,
+ evt.partition() % kafkaParts,
+ evt.cacheId(),
+ IgniteUtils.toBytes(evt)
+ )));
+
+ if (log.isDebugEnabled())
+ log.debug("Event sent asynchronously [evt=" + evt + ']');
+ }
+
+ try {
+ for (Future<RecordMetadata> fut : futs)
+ fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Events processed [sentMessagesCount=" + msgCnt + ']');
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ try {
+ producer = new KafkaProducer<>(kafkaProps);
+
+ if (log.isInfoEnabled())
+ log.info("CDC Ignite To Kafka started [topic=" + topic + ", onlyPrimary=" + onlyPrimary + ", cacheIds=" + cachesIds + ']');
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ producer.close();
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
new file mode 100644
index 0000000..e62a6f4
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridLoggerProxy;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Main class of Kafka to Ignite application.
+ * This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
+ * Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
+ * and apply {@link CdcEvent} to Ignite.
+ * <p>
+ * Each applier receive even number of kafka topic partition to read.
+ * <p>
+ * In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
+ * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka or Ignite unavailability.
+ * <p>
+ * To resolve possible update conflicts (in case of concurrent update in source and destination Ignite clusters)
+ * real-world deployments should use some conflict resolver, for example {@link CacheVersionConflictResolverImpl}.
+ * Example of Ignite configuration with the conflict resolver:
+ * <pre>
+ * {@code
+ * CacheVersionConflictResolverCachePluginProvider conflictPlugin = new CacheVersionConflictResolverCachePluginProvider();
+ *
+ * conflictPlugin.setClusterId(clusterId); // Cluster id.
+ * conflictPlugin.setCaches(new HashSet<>(Arrays.asList("my-cache", "some-other-cache"))); // Caches to replicate.
+ *
+ * IgniteConfiguration cfg = ...;
+ *
+ * cfg.setPluginProviders(conflictPlugin);
+ * }
+ * </pre>
+ * Please, see {@link CacheConflictResolutionManagerImpl} for additional information.
+ *
+ * @see CdcMain
+ * @see IgniteToKafkaCdcStreamer
+ * @see CdcEvent
+ * @see KafkaToIgniteCdcStreamerApplier
+ * @see CacheConflictResolutionManagerImpl
+ */
+public class KafkaToIgniteCdcStreamer implements Runnable {
+ /** Ignite configuration. */
+ private final IgniteConfiguration igniteCfg;
+
+ /** Kafka consumer properties. */
+ private final Properties kafkaProps;
+
+ /** Streamer configuration. */
+ private final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
+
+ /** Runners to run {@link KafkaToIgniteCdcStreamerApplier} instances. */
+ private final Thread[] runners;
+
+ /** Appliers. */
+ private final List<KafkaToIgniteCdcStreamerApplier> appliers;
+
+ /**
+ * @param igniteCfg Ignite configuration.
+ * @param kafkaProps Kafka properties.
+ * @param streamerCfg Streamer configuration.
+ */
+ public KafkaToIgniteCdcStreamer(
+ IgniteConfiguration igniteCfg,
+ Properties kafkaProps,
+ KafkaToIgniteCdcStreamerConfiguration streamerCfg
+ ) {
+ this.igniteCfg = igniteCfg;
+ this.kafkaProps = kafkaProps;
+ this.streamerCfg = streamerCfg;
+
+ appliers = new ArrayList<>(streamerCfg.getThreadCount());
+ runners = new Thread[streamerCfg.getThreadCount()];
+
+ if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
+ throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG);
+
+ kafkaProps.put(KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ kafkaProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ runx();
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ private void runx() throws Exception {
+ U.initWorkDir(igniteCfg);
+
+ IgniteLogger log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
+
+ igniteCfg.setGridLogger(log);
+
+ ackAsciiLogo(log);
+
+ try (IgniteEx ign = (IgniteEx)Ignition.start(igniteCfg)) {
+ AtomicBoolean stopped = new AtomicBoolean();
+
+ Set<Integer> caches = null;
+
+ if (!F.isEmpty(streamerCfg.getCaches())) {
+ caches = streamerCfg.getCaches().stream()
+ .peek(cache -> Objects.requireNonNull(ign.cache(cache), cache + " not exists!"))
+ .map(CU::cacheId).collect(Collectors.toSet());
+ }
+
+ int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
+ int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
+ int threadCnt = streamerCfg.getThreadCount();
+
+ assert kafkaParts >= threadCnt
+ : "Threads count bigger then kafka partitions count [kafkaParts=" + kafkaParts + ",threadCount=" + threadCnt + ']';
+
+ int partPerApplier = kafkaParts / threadCnt;
+
+ for (int i = 0; i < threadCnt; i++) {
+ int from = i * partPerApplier;
+ int to = (i + 1) * partPerApplier;
+
+ if (i == threadCnt - 1)
+ to = kafkaParts;
+
+ KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
+ ign,
+ log,
+ kafkaProps,
+ streamerCfg.getTopic(),
+ kafkaPartsFrom + from,
+ kafkaPartsFrom + to,
+ caches,
+ streamerCfg.getMaxBatchSize(),
+ stopped
+ );
+
+ appliers.add(applier);
+
+ runners[i] = new Thread(applier, "applier-thread-" + i);
+
+ runners[i].start();
+ }
+
+ try {
+ for (int i = 0; i < threadCnt; i ++)
+ runners[i].join();
+ }
+ catch (InterruptedException e) {
+ stopped.set(true);
+
+ appliers.forEach(U::closeQuiet);
+
+ log.warning("Kafka to Ignite streamer interrupted", e);
+ }
+ }
+ }
+
+ /** */
+ private void ackAsciiLogo(IgniteLogger log) {
+ String ver = "ver. " + ACK_VER_STR;
+
+ if (log.isInfoEnabled()) {
+ log.info(NL + NL +
+ ">>> __ _____ ______ _____ __________ __________ ________________" + NL +
+ ">>> / //_/ _ | / __/ //_/ _ | /_ __/ __ \\ / _/ ___/ |/ / _/_ __/ __/" + NL +
+ ">>> / ,< / __ |/ _// ,< / __ | / / / /_/ / _/ // (_ / // / / / / _/ " + NL +
+ ">>> /_/|_/_/ |_/_/ /_/|_/_/ |_| /_/ \\____/ /___/\\___/_/|_/___/ /_/ /___/ " + NL +
+ ">>> " + NL +
+ ">>> " + NL +
+ ">>> " + ver + NL +
+ ">>> " + COPYRIGHT + NL +
+ ">>> " + NL +
+ ">>> Ignite documentation: " + "http://" + SITE + NL +
+ ">>> Kafka topic: " + streamerCfg.getTopic() + NL +
+ ">>> Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo() + NL
+ );
+ }
+
+ if (log.isQuiet()) {
+ U.quiet(false,
+ " __ _____ ______ _____ __________ __________ ________________",
+ " / //_/ _ | / __/ //_/ _ | /_ __/ __ \\ / _/ ___/ |/ / _/_ __/ __/",
+ " / ,< / __ |/ _// ,< / __ | / / / /_/ / _/ // (_ / // / / / / _/ ",
+ "/_/|_/_/ |_/_/ /_/|_/_/ |_| /_/ \\____/ /___/\\___/_/|_/___/ /_/ /___/ ",
+ "",
+ ver,
+ COPYRIGHT,
+ "",
+ "Ignite documentation: " + "http://" + SITE,
+ "Kafka topic: " + streamerCfg.getTopic(),
+ "Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo(),
+ "",
+ "Quiet mode.");
+
+ String fileName = log.fileName();
+
+ if (fileName != null)
+ U.quiet(false, " ^-- Logging to file '" + fileName + '\'');
+
+ if (log instanceof GridLoggerProxy)
+ U.quiet(false, " ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');
+
+ U.quiet(false,
+ " ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to kafka-to-ignite.{sh|bat}",
+ "");
+ }
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
new file mode 100644
index 0000000..fbac6e0
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.CdcEventsApplier;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+/**
+ * Thread that polls message from the Kafka topic partitions and applies those messages to the Ignite caches.
+ * It expected that messages was written to the Kafka by the {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
+ * <p>
+ * Each {@code Applier} receive set of Kafka topic partitions to read and caches to process.
+ * Applier creates consumer per partition because Kafka consumer reads not fair,
+ * consumer reads messages from specific partition while there is new messages in specific partition.
+ * See <a href=
+ * "https://cwiki.apache.org/confluence/display/KAFKA/KIP-387%3A+Fair+Message+Consumption+Across+Partitions+in+KafkaConsumer">KIP-387</a>
+ * and <a href="https://issues.apache.org/jira/browse/KAFKA-3932">KAFKA-3932</a> for further information.
+ * All consumers should belongs to the same consumer-group to ensure consistent reading.
+ * Applier polls messages from each consumer in round-robin fashion.
+ * <p>
+ * Messages applied to Ignite using {@link IgniteInternalCache#putAllConflict(Map)}, {@link IgniteInternalCache#removeAllConflict(Map)}
+ * these methods allows to provide {@link GridCacheVersion} of the entry to the Ignite so in case update conflicts they can be resolved
+ * by the {@link CacheVersionConflictResolver}.
+ * <p>
+ * In case of any error during read applier just fail.
+ * Fail of any applier will lead to the fail of {@link KafkaToIgniteCdcStreamer} application.
+ * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka or Ignite unavailability.
+ *
+ * @see KafkaToIgniteCdcStreamer
+ * @see IgniteToKafkaCdcStreamer
+ * @see IgniteInternalCache#putAllConflict(Map)
+ * @see IgniteInternalCache#removeAllConflict(Map)
+ * @see CacheVersionConflictResolver
+ * @see GridCacheVersion
+ * @see CdcEvent
+ * @see CacheEntryVersion
+ */
+class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
+ /** */
+ public static final int DFLT_REQ_TIMEOUT = 3;
+
+ /** Ignite instance. */
+ private final IgniteEx ign;
+
+ /** Log. */
+ private final IgniteLogger log;
+
+ /** Closed flag. Shared between all appliers. */
+ private final AtomicBoolean stopped;
+
+ /** Kafka properties. */
+ private final Properties kafkaProps;
+
+ /** Topic to read. */
+ private final String topic;
+
+ /** Lower kafka partition (inclusive). */
+ private final int kafkaPartFrom;
+
+ /** Higher kafka partition (exclusive). */
+ private final int kafkaPartTo;
+
+ /** Caches ids to read. */
+ private final Set<Integer> caches;
+
+ /** Consumers. */
+ private final List<KafkaConsumer<Integer, byte[]>> cnsmrs = new ArrayList<>();
+
+ /** */
+ private final AtomicLong rcvdEvts = new AtomicLong();
+
+ /**
+ * @param ign Ignite instance.
+ * @param log Logger.
+ * @param kafkaProps Kafka properties.
+ * @param topic Topic name.
+ * @param kafkaPartFrom Read from partition.
+ * @param kafkaPartTo Read to partition.
+ * @param caches Cache ids.
+ * @param maxBatchSize Maximum batch size.
+ * @param stopped Stopped flag.
+ */
+ public KafkaToIgniteCdcStreamerApplier(
+ IgniteEx ign,
+ IgniteLogger log,
+ Properties kafkaProps,
+ String topic,
+ int kafkaPartFrom,
+ int kafkaPartTo,
+ Set<Integer> caches,
+ int maxBatchSize,
+ AtomicBoolean stopped
+ ) {
+ super(maxBatchSize);
+
+ this.ign = ign;
+ this.kafkaProps = kafkaProps;
+ this.topic = topic;
+ this.kafkaPartFrom = kafkaPartFrom;
+ this.kafkaPartTo = kafkaPartTo;
+ this.caches = caches;
+ this.stopped = stopped;
+ this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ U.setCurrentIgniteName(ign.name());
+
+ try {
+ for (int kafkaPart = kafkaPartFrom; kafkaPart < kafkaPartTo; kafkaPart++) {
+ KafkaConsumer<Integer, byte[]> cnsmr = new KafkaConsumer<>(kafkaProps);
+
+ cnsmr.assign(Collections.singleton(new TopicPartition(topic, kafkaPart)));
+
+ cnsmrs.add(cnsmr);
+ }
+
+ Iterator<KafkaConsumer<Integer, byte[]>> cnsmrIter = Collections.emptyIterator();
+
+ while (!stopped.get()) {
+ if (!cnsmrIter.hasNext())
+ cnsmrIter = cnsmrs.iterator();
+
+ poll(cnsmrIter.next());
+ }
+ }
+ catch (WakeupException e) {
+ if (!stopped.get())
+ log.error("Applier wakeup error!", e);
+ }
+ catch (Throwable e) {
+ log.error("Applier error!", e);
+
+ stopped.set(true);
+ }
+ finally {
+ for (KafkaConsumer<Integer, byte[]> consumer : cnsmrs) {
+ try {
+ consumer.close(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+ }
+ catch (Exception e) {
+ log.warning("Close error!", e);
+ }
+ }
+
+ cnsmrs.clear();
+ }
+
+ if (log.isInfoEnabled())
+ log.info(Thread.currentThread().getName() + " - stopped!");
+ }
+
+ /**
+ * Polls data from the specific consumer and applies it to the Ignite.
+ * @param cnsmr Data consumer.
+ */
+ private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
+ ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
+ );
+ }
+
+ apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
+
+ cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+ }
+
+ /**
+ * @param rec Kafka record.
+ * @return CDC event.
+ */
+ private CdcEvent deserialize(ConsumerRecord<Integer, byte[]> rec) {
+ try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(rec.value()))) {
+ return (CdcEvent)is.readObject();
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ log.warning("Close applier!");
+
+ cnsmrs.forEach(KafkaConsumer::wakeup);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteEx ignite() {
+ return ign;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteLogger log() {
+ return log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(KafkaToIgniteCdcStreamerApplier.class, this);
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
new file mode 100644
index 0000000..1e9026c
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+
+/**
+ * Configuration of {@link KafkaToIgniteCdcStreamer} application.
+ *
+ * @see KafkaToIgniteCdcStreamer
+ * @see KafkaToIgniteLoader
+ */
+public class KafkaToIgniteCdcStreamerConfiguration {
+ /** Default {@link #kafkaPartsTo} value. */
+ public static final int DFLT_PARTS = 16;
+
+ /** Default {@link #topic} value. */
+ public static final String DFLT_TOPIC = "ignite";
+
+ /** Default {@link #maxBatchSize} value. */
+ public static final int DFLT_MAX_BATCH_SIZE = 1024;
+
+ /** {@link KafkaToIgniteCdcStreamerApplier} thread count. */
+ private int threadCnt = DFLT_PARTS;
+
+ /** Topic name. */
+ private String topic = DFLT_TOPIC;
+
+ /** Kafka partitions lower bound (inclusive). */
+ private int kafkaPartsFrom = 0;
+
+ /** Kafka partitions higher bound (exclusive). */
+ private int kafkaPartsTo = DFLT_PARTS;
+
+ /**
+ * Maximum batch size to apply to Ignite.
+ *
+ * @see IgniteInternalCache#putAllConflict(Map)
+ * @see IgniteInternalCache#removeAllConflict(Map)
+ */
+ private int maxBatchSize = DFLT_MAX_BATCH_SIZE;
+
+ /**
+ * Cache names to process.
+ */
+ private Collection<String> caches;
+
+ /** */
+ public int getThreadCount() {
+ return threadCnt;
+ }
+
+ /** */
+ public void setThreadCount(int threadCnt) {
+ this.threadCnt = threadCnt;
+ }
+
+ /** */
+ public String getTopic() {
+ return topic;
+ }
+
+ /** */
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ /** */
+ public int getKafkaPartsFrom() {
+ return kafkaPartsFrom;
+ }
+
+ /** */
+ public void setKafkaPartsFrom(int kafkaPartsFrom) {
+ this.kafkaPartsFrom = kafkaPartsFrom;
+ }
+
+ /** */
+ public int getKafkaPartsTo() {
+ return kafkaPartsTo;
+ }
+
+ /** */
+ public void setKafkaPartsTo(int kafkaPartsTo) {
+ this.kafkaPartsTo = kafkaPartsTo;
+ }
+
+ /** */
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ /** */
+ public void setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ /** */
+ public Collection<String> getCaches() {
+ return caches;
+ }
+
+ /** */
+ public void setCaches(Collection<String> caches) {
+ this.caches = caches;
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
new file mode 100644
index 0000000..c425664
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.typedef.X;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteLoader.KAFKA_PROPERTIES;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.ignite.startup.cmdline.CommandLineStartup.isHelp;
+
+/**
+ * This class defines command-line {@link KafkaToIgniteCdcStreamer} startup. This startup can be used to start Ignite
+ * {@link KafkaToIgniteCdcStreamer} application outside of any hosting environment from command line.
+ * This startup is a Java application with {@link #main(String[])} method that accepts command line arguments.
+ * It accepts on parameter which is Ignite Spring XML configuration file path.
+ * You can run this class from command line without parameters to get help message.
+ */
+public class KafkaToIgniteCommandLineStartup {
+ /** Quite log flag. */
+ private static final boolean QUITE = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_QUIET);
+
+ /**
+ * Main entry point.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args) {
+ if (!QUITE) {
+ X.println("Kafka To Ignite Command Line Startup, ver. " + ACK_VER_STR);
+ X.println(COPYRIGHT);
+ X.println();
+ }
+
+ if (args.length > 1)
+ exit("Too many arguments.", true, -1);
+
+ if (args.length > 0 && isHelp(args[0]))
+ exit(null, true, 0);
+
+ if (args.length > 0 && args[0].isEmpty())
+ exit("Empty argument.", true, 1);
+
+ if (args.length > 0 && args[0].charAt(0) == '-')
+ exit("Invalid arguments: " + args[0], true, -1);
+
+ try {
+ KafkaToIgniteCdcStreamer streamer = KafkaToIgniteLoader.loadKafkaToIgniteStreamer(args[0]);
+
+ streamer.run();
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+
+ String note = "";
+
+ if (X.hasCause(e, ClassNotFoundException.class))
+ note = "\nNote! You may use 'USER_LIBS' environment variable to specify your classpath.";
+
+ exit("Failed to run app: " + e.getMessage() + note, false, -1);
+ }
+ }
+
+ /**
+ * Exists with optional error message, usage show and exit code.
+ *
+ * @param errMsg Optional error message.
+ * @param showUsage Whether or not to show usage information.
+ * @param exitCode Exit code.
+ */
+ private static void exit(@Nullable String errMsg, boolean showUsage, int exitCode) {
+ if (errMsg != null)
+ X.error(errMsg);
+
+ if (showUsage) {
+ X.error(
+ "Usage:",
+ " kafka-to-ignite.{sh|bat} [?]|[path]",
+ " Where:",
+ " ?, /help, -help, - show this message.",
+ " -v - verbose mode (quiet by default).",
+ " path - path to Spring XML configuration file.",
+ " Path can be absolute or relative to IGNITE_HOME.",
+ " ",
+ "Spring file should contain bean definition of 'org.apache.ignite.configuration.IgniteConfiguration' " +
+ "and 'org.apache.ignite.cdc.KafkaToIgniteCdcStreamerConfiguration' " +
+ "and bean of class 'java.util.Properties' with '" + KAFKA_PROPERTIES + "' name " +
+ "that contains properties to connect to Apache Kafka cluster. " +
+ "Note that bean will be fetched by the type and its ID is not used.");
+ }
+
+ System.exit(exitCode);
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
new file mode 100644
index 0000000..adfc113
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.net.URL;
+import java.util.Collection;
+import java.util.Properties;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.IgniteComponentType.SPRING;
+
+/**
+ * Utility class to load {@link KafkaToIgniteCdcStreamer} from Spring XML configuration.
+ */
+public class KafkaToIgniteLoader {
+ /** Kafka properties bean name. */
+ public static final String KAFKA_PROPERTIES = "kafkaProperties";
+
+ /**
+ * Loads {@link KafkaToIgniteCdcStreamer} from XML configuration file.
+ * If load fails then error message wouldn't be null.
+ *
+ * @param springXmlPath Path to XML configuration file.
+ * @return {@code KafkaToIgniteCdcStreamer} instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static KafkaToIgniteCdcStreamer loadKafkaToIgniteStreamer(String springXmlPath) throws IgniteCheckedException {
+ URL cfgUrl = U.resolveSpringUrl(springXmlPath);
+
+ IgniteSpringHelper spring = SPRING.create(false);
+
+ IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgTuple =
+ spring.loadConfigurations(cfgUrl);
+
+ if (cfgTuple.get1().size() > 1) {
+ throw new IgniteCheckedException(
+ "Exact 1 IgniteConfiguration should be defined. Found " + cfgTuple.get1().size()
+ );
+ }
+
+ IgniteBiTuple<Collection<KafkaToIgniteCdcStreamerConfiguration>, ? extends GridSpringResourceContext> k2iCfg =
+ spring.loadConfigurations(cfgUrl, KafkaToIgniteCdcStreamerConfiguration.class);
+
+ if (k2iCfg.get1().size() > 1) {
+ throw new IgniteCheckedException(
+ "Exact 1 KafkaToIgniteCdcStreamerConfiguration configuration should be defined. " +
+ "Found " + k2iCfg.get1().size()
+ );
+ }
+
+ Properties kafkaProps = spring.loadBean(cfgUrl, KAFKA_PROPERTIES);
+
+ return new KafkaToIgniteCdcStreamer(cfgTuple.get1().iterator().next(), kafkaProps, k2iCfg.get1().iterator().next());
+ }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
new file mode 100644
index 0000000..fb980ed
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+@RunWith(Parameterized.class)
+public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
+ /** Cache mode. */
+ @Parameterized.Parameter
+ public CacheAtomicityMode cacheMode;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public int backupCnt;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "cacheMode={0},backupCnt={1}")
+ public static Collection<?> parameters() {
+ List<Object[]> params = new ArrayList<>();
+
+ for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+ for (int i = 0; i < 2; i++)
+ params.add(new Object[] {mode, i});
+
+ return params;
+ }
+
+ /** */
+ public static final String ACTIVE_PASSIVE_CACHE = "active-passive-cache";
+
+ /** */
+ public static final String ACTIVE_ACTIVE_CACHE = "active-active-cache";
+
+ /** */
+ public static final String IGNORED_CACHE = "ignored-cache";
+
+ /** */
+ public static final byte SRC_CLUSTER_ID = 1;
+
+ /** */
+ public static final byte DEST_CLUSTER_ID = 2;
+
+ /** */
+ private enum WaitDataMode {
+ /** */
+ EXISTS,
+
+ /** */
+ REMOVED
+ }
+
+ /** */
+ public static final int KEYS_CNT = 1000;
+
+ /** */
+ protected static IgniteEx[] srcCluster;
+
+ /** */
+ protected static IgniteEx[] destCluster;
+
+ /** */
+ protected static IgniteConfiguration[] srcClusterCliCfg;
+
+ /** */
+ protected static IgniteConfiguration[] destClusterCliCfg;
+
+ /** */
+ private int discoPort = TcpDiscoverySpi.DFLT_PORT;
+
+ /** */
+ private byte clusterId = SRC_CLUSTER_ID;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+ .setDiscoverySpi(new TcpDiscoverySpi()
+ .setLocalPort(discoPort)
+ .setIpFinder(new TcpDiscoveryVmIpFinder() {{
+ setAddresses(Collections.singleton("127.0.0.1:" + discoPort + ".." + (discoPort + DFLT_PORT_RANGE)));
+ }}));
+
+ if (!cfg.isClientMode()) {
+ CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();
+
+ cfgPlugin.setClusterId(clusterId);
+ cfgPlugin.setCaches(new HashSet<>(Collections.singletonList(ACTIVE_ACTIVE_CACHE)));
+ cfgPlugin.setConflictResolveField("reqId");
+
+ cfg.setPluginProviders(cfgPlugin);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)));
+
+ cfg.getDataStorageConfiguration()
+ .setWalForceArchiveTimeout(5_000)
+ .setCdcEnabled(true);
+
+ cfg.setConsistentId(igniteInstanceName);
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+
+ IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> cluster = setupCluster("source", "src-cluster-client", 0);
+
+ srcCluster = cluster.get1();
+ srcClusterCliCfg = cluster.get2();
+
+ discoPort += DFLT_PORT_RANGE + 1;
+ clusterId = DEST_CLUSTER_ID;
+
+ cluster = setupCluster("destination", "dest-cluster-client", 2);
+
+ destCluster = cluster.get1();
+ destClusterCliCfg = cluster.get2();
+
+ String srcTag = srcCluster[0].cluster().tag();
+ String destTag = destCluster[0].cluster().tag();
+
+ assertNotNull(srcTag);
+ assertNotNull(destTag);
+ assertFalse(srcTag.equals(destTag));
+ }
+
+ /** */
+ private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(String clusterTag, String clientPrefix, int idx) throws Exception {
+ IgniteEx[] cluster = new IgniteEx[] {
+ startGrid(idx + 1),
+ startGrid(idx + 2)
+ };
+
+ IgniteConfiguration[] clusterCliCfg = new IgniteConfiguration[2];
+
+ for (int i = 0; i < 2; i++)
+ clusterCliCfg[i] = optimize(getConfiguration(clientPrefix + i).setClientMode(true));
+
+ cluster[0].cluster().state(ACTIVE);
+ cluster[0].cluster().tag(clusterTag);
+
+ return F.t(cluster, clusterCliCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** Active/Passive mode means changes made only in one cluster. */
+ @Test
+ public void testActivePassiveReplication() throws Exception {
+ List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();
+
+ try {
+ IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].createCache(ACTIVE_PASSIVE_CACHE);
+
+ destCache.put(1, ConflictResolvableTestData.create());
+ destCache.remove(1);
+
+ // Updates for "ignored-cache" should be ignored because of CDC consume configuration.
+ runAsync(generateData(IGNORED_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
+ runAsync(generateData(ACTIVE_PASSIVE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
+
+ IgniteCache<Integer, ConflictResolvableTestData> srcCache =
+ srcCluster[srcCluster.length - 1].getOrCreateCache(ACTIVE_PASSIVE_CACHE);
+
+ waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
+
+ IntStream.range(0, KEYS_CNT).forEach(srcCache::remove);
+
+ waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
+
+ assertFalse(destCluster[0].cacheNames().contains(IGNORED_CACHE));
+ }
+ finally {
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.cancel();
+ }
+ }
+
+ /** Active/Active mode means changes made in both clusters. */
+ @Test
+ public void testActiveActiveReplication() throws Exception {
+ IgniteCache<Integer, ConflictResolvableTestData> srcCache = srcCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
+ IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
+
+ // Even keys goes to src cluster.
+ runAsync(generateData(ACTIVE_ACTIVE_CACHE, srcCluster[srcCluster.length - 1],
+ IntStream.range(0, KEYS_CNT).filter(i -> i % 2 == 0)));
+
+ // Odd keys goes to dest cluster.
+ runAsync(generateData(ACTIVE_ACTIVE_CACHE, destCluster[destCluster.length - 1],
+ IntStream.range(0, KEYS_CNT).filter(i -> i % 2 != 0)));
+
+ List<IgniteInternalFuture<?>> futs = startActiveActiveCdc();
+
+ try {
+ waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
+
+ runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 == 0).forEach(srcCache::remove));
+ runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 != 0).forEach(destCache::remove));
+
+ waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
+ }
+ finally {
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.cancel();
+ }
+ }
+
+ /** */
+ public static Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
+ return () -> {
+ IgniteCache<Integer, ConflictResolvableTestData> cache = ign.getOrCreateCache(cacheName);
+
+ keys.forEach(i -> cache.put(i, ConflictResolvableTestData.create()));
+ };
+ }
+
+ /** */
+ public void waitForSameData(
+ IgniteCache<Integer, ConflictResolvableTestData> src,
+ IgniteCache<Integer, ConflictResolvableTestData> dest,
+ int keysCnt,
+ WaitDataMode mode,
+ List<IgniteInternalFuture<?>> futs
+ ) throws IgniteInterruptedCheckedException {
+ assertTrue(waitForCondition(() -> {
+ for (int i = 0; i < keysCnt; i++) {
+ if (mode == WaitDataMode.EXISTS) {
+ if (!src.containsKey(i) || !dest.containsKey(i))
+ return checkFuts(false, futs);
+ }
+ else if (mode == WaitDataMode.REMOVED) {
+ if (src.containsKey(i) || dest.containsKey(i))
+ return checkFuts(false, futs);
+
+ continue;
+ }
+ else
+ throw new IllegalArgumentException(mode + " not supported.");
+
+ ConflictResolvableTestData data = dest.get(i);
+
+ if (!data.equals(src.get(i)))
+ return checkFuts(false, futs);
+ }
+
+ return checkFuts(true, futs);
+ }, getTestTimeout()));
+ }
+
+ /** */
+ private boolean checkFuts(boolean res, List<IgniteInternalFuture<?>> futs) {
+ for (int i = 0; i < futs.size(); i++)
+ assertFalse("Fut " + i, futs.get(i).isDone());
+
+ return res;
+ }
+
+ /** */
+ protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();
+
+ /** */
+ protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
new file mode 100644
index 0000000..e6f199b
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+ /** Cache mode. */
+ @Parameterized.Parameter
+ public CacheAtomicityMode cacheMode;
+
+ /** Other cluster id. */
+ @Parameterized.Parameter(1)
+ public byte otherClusterId;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+ public static Collection<?> parameters() {
+ List<Object[]> params = new ArrayList<>();
+
+ for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+ for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, THIRD_CLUSTER_ID})
+ params.add(new Object[] {mode, otherClusterId});
+
+ return params;
+ }
+
+ /** */
+ private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+ /** */
+ private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+ /** */
+ private static IgniteEx client;
+
+ /** */
+ private static final byte FIRST_CLUSTER_ID = 1;
+
+ /** */
+ private static final byte SECOND_CLUSTER_ID = 2;
+
+ /** */
+ private static final byte THIRD_CLUSTER_ID = 3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ CacheVersionConflictResolverPluginProvider<?> pluginCfg = new CacheVersionConflictResolverPluginProvider<>();
+
+ pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+ pluginCfg.setCaches(new HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+ pluginCfg.setConflictResolveField(conflictResolveField());
+
+ return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrid(1);
+
+ client = startClientGrid(2);
+
+ cache = client.createCache(new CacheConfiguration<String, ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+ cachex = client.cachex(DEFAULT_CACHE_NAME);
+ }
+
+ /** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */
+ @Test
+ public void testSimpleUpdates() {
+ String key = "UpdatesWithoutConflict";
+
+ put(key);
+ put(key);
+
+ remove(key);
+ }
+
+ /**
+ * Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver
+ * when there is no update conflicts.
+ */
+ @Test
+ public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+ String key = key("UpdateFromOtherClusterWithoutConflict", otherClusterId);
+
+ putConflict(key, 1, true);
+
+ putConflict(key, 2, true);
+
+ removeConflict(key, 3, true);
+ }
+
+ /**
+ * Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver
+ * when there are update conflicts.
+ */
+ @Test
+ public void testUpdatesReorderFromOtherCluster() throws Exception {
+ String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+ putConflict(key, 2, true);
+
+ // Update with the equal or lower order should ignored.
+ putConflict(key, 2, false);
+ putConflict(key, 1, false);
+
+ // Remove with the equal or lower order should ignored.
+ removeConflict(key, 2, false);
+ removeConflict(key, 1, false);
+
+ // Remove with the higher order should succeed.
+ putConflict(key, 3, true);
+
+ key = key("UpdateClusterUpdateReorder2", otherClusterId);
+
+ int order = 1;
+
+ putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), true);
+
+ // Update with the equal or lower topVer should ignored.
+ putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false);
+ putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false);
+
+ // Remove with the equal or lower topVer should ignored.
+ removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false);
+ removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false);
+
+ // Remove with the higher topVer should succeed.
+ putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true);
+
+ key = key("UpdateClusterUpdateReorder3", otherClusterId);
+
+ int topVer = 1;
+
+ putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), true);
+
+ // Update with the equal or lower nodeOrder should ignored.
+ putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
+ putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);
+
+ // Remove with the equal or lower nodeOrder should ignored.
+ removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
+ removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);
+
+ // Remove with the higher nodeOrder should succeed.
+ putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true);
+ }
+
+ /** Tests cache operations for entry replicated from another cluster. */
+ @Test
+ public void testUpdatesConflict() throws Exception {
+ String key = key("UpdateThisClusterConflict0", otherClusterId);
+
+ putConflict(key, 1, true);
+
+ // Local remove for other cluster entry should succeed.
+ remove(key);
+
+ // Conflict replicated update should ignored.
+ // Resolve by field value not applicable because after remove operation "old" value doesn't exists.
+ putConflict(key, 2, false);
+
+ key = key("UpdateThisDCConflict1", otherClusterId);
+
+ putConflict(key, 3, true);
+
+ // Local update for other cluster entry should succeed.
+ put(key);
+
+ key = key("UpdateThisDCConflict2", otherClusterId);
+
+ put(key);
+
+ // Conflict replicated remove should ignored.
+ removeConflict(key, 4, false);
+
+ key = key("UpdateThisDCConflict3", otherClusterId);
+
+ put(key);
+
+ // Conflict replicated update succeed only if resolved by field.
+ putConflict(key, 5, conflictResolveField() != null);
+ }
+
+ /** */
+ private void put(String key) {
+ ConflictResolvableTestData newVal = ConflictResolvableTestData.create();
+
+ CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(key);
+
+ cache.put(key, newVal);
+
+ CacheEntry<String, ConflictResolvableTestData> newEntry = cache.getEntry(key);
+
+ assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
+ assertEquals(newVal, cache.get(key));
+
+ if (oldEntry != null)
+ assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order());
+ }
+
+ /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+ private void putConflict(String k, long order, boolean success) throws IgniteCheckedException {
+ putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success);
+ }
+
+ /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+ private void putConflict(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException {
+ CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k);
+ ConflictResolvableTestData newVal = ConflictResolvableTestData.create();
+
+ KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k));
+ CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null);
+
+ cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer)));
+
+ if (success) {
+ assertEquals(newVer, ((CacheEntryVersion)cache.getEntry(k).version()).otherClusterVersion());
+ assertEquals(newVal, cache.get(k));
+ } else if (oldEntry != null) {
+ assertEquals(oldEntry.getValue(), cache.get(k));
+ assertEquals(oldEntry.version(), cache.getEntry(k).version());
+ }
+ }
+
+ /** */
+ private void remove(String key) {
+ assertTrue(cache.containsKey(key));
+
+ cache.remove(key);
+
+ assertFalse(cache.containsKey(key));
+ }
+
+ /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */
+ private void removeConflict(String k, long order, boolean success) throws IgniteCheckedException {
+ removeConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success);
+ }
+
+ /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */
+ private void removeConflict(String k, GridCacheVersion ver, boolean success) throws IgniteCheckedException {
+ assertTrue(cache.containsKey(k));
+
+ CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k);
+
+ KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k));
+
+ cachex.removeAllConflict(singletonMap(key, ver));
+
+ if (success)
+ assertFalse(cache.containsKey(k));
+ else if (oldEntry != null) {
+ assertEquals(oldEntry.getValue(), cache.get(k));
+ assertEquals(oldEntry.version(), cache.getEntry(k).version());
+ }
+ }
+
+ /** */
+ private String key(String key, byte otherClusterId) {
+ return key + otherClusterId + cacheMode;
+ }
+
+ /** */
+ protected String conflictResolveField() {
+ return null;
+ }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithFieldTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithFieldTest.java
new file mode 100644
index 0000000..2bf4d81
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithFieldTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+/** Cache conflict operations test. */
+public class CacheConflictOperationsWithFieldTest extends CacheConflictOperationsTest {
+ /** {@inheritDoc} */
+ @Override protected String conflictResolveField() {
+ return "reqId";
+ }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
new file mode 100644
index 0000000..34082b5
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
+ /** {@inheritDoc} */
+ @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < srcCluster.length; i++)
+ futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_PASSIVE_CACHE));
+
+ return futs;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < srcCluster.length; i++)
+ futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+
+ for (int i = 0; i < destCluster.length; i++)
+ futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+
+ return futs;
+ }
+
+ /**
+ * @param srcCfg Ignite source node configuration.
+ * @param destCfg Ignite destination cluster configuration.
+ * @param cache Cache name to stream to kafka.
+ * @return Future for Change Data Capture application.
+ */
+ protected IgniteInternalFuture<?> igniteToIgnite(IgniteConfiguration srcCfg, IgniteConfiguration destCfg, String cache) {
+ return runAsync(() -> {
+ CdcConfiguration cdcCfg = new CdcConfiguration();
+
+ cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer(destCfg, false, Collections.singleton(cache), KEYS_CNT));
+
+ new CdcMain(srcCfg, null, cdcCfg).run();
+ });
+ }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/ConflictResolvableTestData.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/ConflictResolvableTestData.java
new file mode 100644
index 0000000..d2006b3
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/ConflictResolvableTestData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** */
+public class ConflictResolvableTestData {
+ /** */
+ public static final AtomicLong REQUEST_ID = new AtomicLong();
+
+ /** */
+ private final byte[] payload;
+
+ /** */
+ private final long reqId;
+
+ /** */
+ public ConflictResolvableTestData(byte[] payload, long reqId) {
+ this.payload = payload;
+ this.reqId = reqId;
+ }
+
+ /**
+ * @return Generated data object.
+ */
+ public static ConflictResolvableTestData create() {
+ byte[] payload = new byte[1024];
+
+ ThreadLocalRandom.current().nextBytes(payload);
+
+ return new ConflictResolvableTestData(payload, REQUEST_ID.incrementAndGet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ ConflictResolvableTestData data = (ConflictResolvableTestData)o;
+ return reqId == data.reqId && Arrays.equals(payload, data.payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = Objects.hash(reqId);
+ result = 31 * result + Arrays.hashCode(payload);
+ return result;
+ }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
new file mode 100644
index 0000000..d02fb1e
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest;
+import org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest;
+import org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Cdc test suite.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ CacheConflictOperationsTest.class,
+ CacheConflictOperationsWithFieldTest.class,
+ CdcIgniteToIgniteReplicationTest.class,
+ KafkaToIgniteLoaderTest.class,
+ CdcKafkaReplicationTest.class,
+ CdcKafkaReplicationAppsTest.class
+})
+public class IgniteCdcTestSuite {
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
new file mode 100644
index 0000000..459c229
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
+ /** */
+ public static final String INSTANCE_NAME = "INSTANCE_NAME";
+
+ /** */
+ public static final String DISCO_PORT = "DISCO_PORT";
+
+ /** */
+ public static final String DISCO_PORT_RANGE = "DISCO_PORT_RANGE";
+
+ /** */
+ public static final String REPLICATED_CACHE = "REPLICATED_CACHE";
+
+ /** */
+ public static final String TOPIC = "TOPIC";
+
+ /** */
+ public static final String CONSISTENT_ID = "CONSISTENT_ID";
+
+ /** */
+ public static final String PARTS = "PARTS";
+
+ /** */
+ public static final String PARTS_FROM = "PARTS_FROM";
+
+ /** */
+ public static final String PARTS_TO = "PARTS_TO";
+
+ /** */
+ public static final String THREAD_CNT = "THREAD_CNT";
+
+ /** */
+ public static final String MAX_BATCH_SIZE = "MAX_BATCH_SIZE";
+
+ /** */
+ public static final String PROPS_PATH = "PROPS_PATH";
+
+ /** */
+ private String kafkaPropsPath = null;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ if (kafkaPropsPath == null) {
+ File file = File.createTempFile("kafka", "properties");
+
+ file.deleteOnExit();
+
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ kafkaProperties().store(fos, null);
+ }
+
+ kafkaPropsPath = "file://" + file.getAbsolutePath();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
+ Map<String, String> params = new HashMap<>();
+
+ params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
+ params.put(REPLICATED_CACHE, cache);
+ params.put(TOPIC, topic);
+ params.put(CONSISTENT_ID, String.valueOf(igniteCfg.getConsistentId()));
+ params.put(PARTS, Integer.toString(DFLT_PARTS));
+ params.put(MAX_BATCH_SIZE, Integer.toString(KEYS_CNT));
+ params.put(PROPS_PATH, kafkaPropsPath);
+
+ return runAsync(
+ () -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)})
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<?> kafkaToIgnite(
+ String cacheName,
+ String topic,
+ IgniteConfiguration igniteCfg,
+ int partFrom,
+ int partTo
+ ) {
+ Map<String, String> params = new HashMap<>();
+
+ int discoPort = getFieldValue(igniteCfg.getDiscoverySpi(), "locPort");
+
+ params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
+ params.put(DISCO_PORT, Integer.toString(discoPort));
+ params.put(DISCO_PORT_RANGE, Integer.toString(discoPort + DFLT_PORT_RANGE));
+ params.put(REPLICATED_CACHE, cacheName);
+ params.put(TOPIC, topic);
+ params.put(PROPS_PATH, kafkaPropsPath);
+ params.put(PARTS_FROM, Integer.toString(partFrom));
+ params.put(PARTS_TO, Integer.toString(partTo));
+ params.put(THREAD_CNT, Integer.toString((partTo - partFrom) / 3));
+
+ return runAsync(
+ () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig("/replication/kafka-to-ignite.xml", params)})
+ );
+ }
+
+ /** */
+ private String prepareConfig(String path, Map<String, String> params) {
+ try {
+ String cfg = new String(Files.readAllBytes(Paths.get(CdcKafkaReplicationAppsTest.class.getResource(path).toURI())));
+
+ for (String key : params.keySet()) {
+ String subst = '{' + key + '}';
+
+ while (cfg.contains(subst))
+ cfg = cfg.replace(subst, params.get(key));
+ }
+
+ File file = File.createTempFile("ignite-config", "xml");
+
+ file.deleteOnExit();
+
+ try (PrintWriter out = new PrintWriter(file)) {
+ out.print(cfg);
+ }
+
+ return file.getAbsolutePath();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
new file mode 100644
index 0000000..74e7719
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.ignite.cdc.AbstractReplicationTest;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_TOPIC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for kafka replication.
+ */
+public class CdcKafkaReplicationTest extends AbstractReplicationTest {
+ /** */
+ public static final String SRC_DEST_TOPIC = "source-dest";
+
+ /** */
+ public static final String DEST_SRC_TOPIC = "dest-source";
+
+ /** */
+ private static EmbeddedKafkaCluster KAFKA = null;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ if (KAFKA == null) {
+ KAFKA = new EmbeddedKafkaCluster(1);
+
+ KAFKA.start();
+ }
+
+ KAFKA.createTopic(DFLT_TOPIC, DFLT_PARTS, 1);
+ KAFKA.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
+ KAFKA.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ KAFKA.deleteTopic(DFLT_TOPIC);
+ KAFKA.deleteTopic(SRC_DEST_TOPIC);
+ KAFKA.deleteTopic(DEST_SRC_TOPIC);
+
+ waitForCondition(() -> {
+ Set<String> topics = KAFKA.getAllTopicsInCluster();
+
+ return !topics.contains(DFLT_TOPIC) && !topics.contains(SRC_DEST_TOPIC) && !topics.contains(DEST_SRC_TOPIC);
+ }, getTestTimeout());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (IgniteEx ex : srcCluster)
+ futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, ACTIVE_PASSIVE_CACHE));
+
+ for (int i = 0; i < destCluster.length; i++) {
+ futs.add(kafkaToIgnite(
+ ACTIVE_PASSIVE_CACHE,
+ DFLT_TOPIC,
+ destClusterCliCfg[i],
+ i * (DFLT_PARTS / 2),
+ (i + 1) * (DFLT_PARTS / 2)
+ ));
+ }
+
+
+ return futs;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (IgniteEx ex : srcCluster)
+ futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, ACTIVE_ACTIVE_CACHE));
+
+ for (IgniteEx ex : destCluster)
+ futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, ACTIVE_ACTIVE_CACHE));
+
+ futs.add(kafkaToIgnite(ACTIVE_ACTIVE_CACHE, SRC_DEST_TOPIC, destClusterCliCfg[0], 0, DFLT_PARTS));
+ futs.add(kafkaToIgnite(ACTIVE_ACTIVE_CACHE, DEST_SRC_TOPIC, srcClusterCliCfg[0], 0, DFLT_PARTS));
+
+ return futs;
+ }
+
+ /**
+ * @param igniteCfg Ignite configuration.
+ * @param topic Kafka topic name.
+ * @param cache Cache name to stream to kafka.
+ * @return Future for Change Data Capture application.
+ */
+ protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
+ return runAsync(() -> {
+ IgniteToKafkaCdcStreamer cdcCnsmr =
+ new IgniteToKafkaCdcStreamer(topic, DFLT_PARTS, Collections.singleton(cache), KEYS_CNT, false, kafkaProperties());
+
+ CdcConfiguration cdcCfg = new CdcConfiguration();
+
+ cdcCfg.setConsumer(cdcCnsmr);
+
+ new CdcMain(igniteCfg, null, cdcCfg).run();
+ });
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param igniteCfg Ignite configuration.
+ * @return Future for runed {@link KafkaToIgniteCdcStreamer}.
+ */
+ protected IgniteInternalFuture<?> kafkaToIgnite(
+ String cacheName,
+ String topic,
+ IgniteConfiguration igniteCfg,
+ int fromPart,
+ int toPart
+ ) {
+ KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration();
+
+ cfg.setKafkaPartsFrom(fromPart);
+ cfg.setKafkaPartsTo(toPart);
+ cfg.setThreadCount((toPart - fromPart)/2);
+
+ cfg.setCaches(Collections.singletonList(cacheName));
+ cfg.setTopic(topic);
+
+ return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
+ }
+
+ /** */
+ protected Properties kafkaProperties() {
+ Properties props = new Properties();
+
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
+
+ return props;
+ }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
new file mode 100644
index 0000000..413d105
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteLoader.loadKafkaToIgniteStreamer;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/** Tests load {@link KafkaToIgniteCdcStreamer} from Srping xml file. */
+public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
+ /** */
+ @Test
+ public void testLoadConfig() throws Exception {
+ assertThrows(
+ null,
+ () -> loadKafkaToIgniteStreamer("loader/kafka-to-ignite-double-ignite-cfg.xml"),
+ IgniteCheckedException.class,
+ "Exact 1 IgniteConfiguration should be defined. Found 2"
+ );
+
+ assertThrows(
+ null,
+ () -> loadKafkaToIgniteStreamer("loader/kafka-to-ignite-without-kafka-properties.xml"),
+ IgniteCheckedException.class,
+ "Spring bean with provided name doesn't exist"
+ );
+
+ KafkaToIgniteCdcStreamer streamer = loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml");
+
+ assertNotNull(streamer);
+ }
+}
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
new file mode 100644
index 0000000..d8a2efa
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ <property name="cdcEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
+
+ <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
new file mode 100644
index 0000000..d7e7cb9
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <import resource="kafka-to-ignite-correct.xml" />
+
+ <bean id="grid.cfg2" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ <property name="cdcEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+</beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
new file mode 100644
index 0000000..3182771
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="dataStorageConfiguration">
+ <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ <property name="cdcEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
+</beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka.properties b/modules/cdc-ext/src/test/resources/loader/kafka.properties
new file mode 100644
index 0000000..58a46b9
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka.properties
@@ -0,0 +1,4 @@
+bootstrap.servers=127.0.0.1
+key.serializer=ru.SomeClass
+value.serializer=ru.SomeOtherClass
+group.id=my-group
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
new file mode 100644
index 0000000..964daf0
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="igniteInstanceName" value="{INSTANCE_NAME}" />
+ <property name="peerClassLoadingEnabled" value="true" />
+ <property name="localHost" value="127.0.0.1" />
+ <property name="consistentId" value="{CONSISTENT_ID}" />
+
+ <property name="dataStorageConfiguration">
+ <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ <property name="cdcEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
+ <property name="consumer">
+ <bean class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer">
+ <constructor-arg name="topic" value="{TOPIC}" />
+ <constructor-arg name="kafkaParts" value="{PARTS}" />
+ <constructor-arg name="caches">
+ <util:list>
+ <bean class="java.lang.String">
+ <constructor-arg type="String" value="{REPLICATED_CACHE}" />
+ </bean>
+ </util:list>
+ </constructor-arg>
+ <constructor-arg name="maxBatchSize" value="{MAX_BATCH_SIZE}" />
+ <constructor-arg name="onlyPrimary" value="false" />
+ <constructor-arg name="kafkaProps" ref="kafkaProperties" />
+ </bean>
+ </property>
+ </bean>
+
+ <util:properties id="kafkaProperties" location="{PROPS_PATH}" />
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
new file mode 100644
index 0000000..154b1b3
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="igniteInstanceName" value="{INSTANCE_NAME}" />
+ <property name="clientMode" value="true" />
+ <property name="peerClassLoadingEnabled" value="true" />
+ <property name="localHost" value="127.0.0.1" />
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="localPort" value="{DISCO_PORT}" />
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses"
+ value="127.0.0.1:{DISCO_PORT}..{DISCO_PORT_RANGE}" />
+ </bean>
+ </property>
+ <property name="joinTimeout" value="10000" />
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="caches">
+ <util:list>
+ <bean class="java.lang.String">
+ <constructor-arg type="String" value="{REPLICATED_CACHE}" />
+ </bean>
+ </util:list>
+ </property>
+ <property name="topic" value="{TOPIC}" />
+ <property name="kafkaPartsFrom" value="{PARTS_FROM}"/>
+ <property name="kafkaPartsTo" value="{PARTS_TO}"/>
+ <property name="threadCount" value="{THREAD_CNT}"/>
+ </bean>
+
+ <util:properties id="kafkaProperties" location="{PROPS_PATH}" />
+</beans>
diff --git a/modules/performance-statistics-ext/pom.xml b/modules/performance-statistics-ext/pom.xml
index 01d7135..86c19c4 100644
--- a/modules/performance-statistics-ext/pom.xml
+++ b/modules/performance-statistics-ext/pom.xml
@@ -106,7 +106,6 @@
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
<executions>
<execution>
<!-- Create zip archive with UI resources. -->
diff --git a/parent/pom.xml b/parent/pom.xml
index 911558a..312d3a6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -106,6 +106,7 @@
<lucene.bundle.version>7.4.0_1</lucene.bundle.version>
<lucene.version>7.4.0</lucene.version>
<lz4.version>1.5.0</lz4.version>
+ <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
<maven.bundle.plugin.version>3.5.0</maven.bundle.plugin.version>
<maven.checkstyle.plugin.version>3.0.0</maven.checkstyle.plugin.version>
<checkstyle.puppycrawl.version>8.19</checkstyle.puppycrawl.version>
@@ -473,6 +474,10 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.plugin.version}</version>
+ </plugin>
</plugins>
</pluginManagement>
diff --git a/pom.xml b/pom.xml
index 27249de..92019f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
<module>modules/spring-tx-ext</module>
<module>modules/spring-cache-ext</module>
<module>modules/spring-session-ext</module>
+ <module>modules/cdc-ext</module>
</modules>
<profiles>