blob: 5e754578cb6c47547494d81449a4e766e5e3b869 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=UA-1382082-1"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments);}
gtag('js', new Date());
gtag('config', 'UA-61232409-1');
</script>
<meta charset="UTF-8">
<title>Change Data Capture Extension | Ignite Documentation</title>
<link rel="canonical" href="/docs/cdc/change-data-capture-extensions" />
<link rel="stylesheet" href="/assets/css/styles.css?1658382975">
<link rel="stylesheet" href="/assets/css/asciidoc-pygments.css">
<link rel="shortcut icon" href="/favicon.ico">
<meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'>
<link rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/jquery.swiftype.autocomplete.js?1658382975"></script>
<script type="text/javascript" src="/assets/js/anchor.min.js?1658382975"></script>
</head>
<body>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--header>
<button type='button' class='menu' title='Docs menu'>
<img src="/assets/images/menu-icon.svg"/>
</button>
<nav>
</nav>
<form class='search'>
<button class="search-close" type='button'><img src='/assets/images/cancel.svg'></button>
<input type="search" placeholder="Search…" id="search-input">
</form>
<button type='button' class='search-toggle'><img src='/assets/images/search.svg'></button>
<button type='button' class='top-nav-toggle'>⋮</button>
<a href="https://github.com/ignite" title='GitHub' class='github' target="_blank">
<img src="/assets/images/github-gray.svg" alt="GitHub logo">
</a>
</header-->
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<link rel="stylesheet" href="/assets/css/docs.css">
<section class='page-docs'>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class='left-nav' data-swiftype-index='false'>
<li>
<a href="/docs/extensions/aws/aws" class=''>Amazon S3 IP Finder</a>
</li>
<li>
<a href="/docs/extensions/camel/camel-streamer" class=''>Apache Camel Streamer</a>
</li>
<li>
<a href="/docs/extensions/flink/flink-streamer" class=''>Apache Flink Streamer</a>
</li>
<li>
<a href="/docs/extensions/flume/flume-sink" class=''>Apache Flume Sink</a>
</li>
<li>
<a href="/docs/extensions/azure/azure" class=''>Apache Ignite Azure Module</a>
</li>
<li>
<a href="/docs/extensions/gce/gce" class=''>Apache Ignite GCE Module</a>
</li>
<li>
<a href="/docs/extensions/pub-sub/pub-sub" class=''>Apache Ignite Pub/Sub Module</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-boot" class=''>Apache Ignite and Spring Boot</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-data" class=''>Apache Ignite and Spring Data</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-caching" class=''>Apache Ignite and Spring Cache</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-sessions" class=''>Apache Ignite and Spring Session</a>
</li>
<li>
<a href="/docs/extensions/spring/spring-tx" class=''>Apache Ignite and Spring Transactions</a>
</li>
<li>
<a href="/docs/extensions/kafka/kafka-streamer" class=''>Apache Kafka Streamer</a>
</li>
<li>
<a href="/docs/extensions/storm/storm-streamer" class=''>Apache Storm Streamer</a>
</li>
<li>
<a href="/docs/extensions/cdc/change-data-capture-extensions" class=''>Change Data Capture Extension</a>
</li>
<li>
<a href="/docs/extensions/jms/jms-streamer" class=''>JMS Streamer</a>
</li>
<li>
<a href="/docs/extensions/mqtt/mqtt-streamer" class=''>MQTT Streamer</a>
</li>
<li>
<a href="/docs/extensions/perf-statistics/performance-statistics" class=''>Performance Statistics Extension</a>
</li>
<li>
<a href="/docs/extensions/rocketmq/rocketmq-streamer" class=''>RocketMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/topology-validator/topology-validator" class=''>Topology Validator</a>
</li>
<li>
<a href="/docs/extensions/twitter/twitter-streamer" class=''>Twitter Streamer</a>
</li>
<li>
<a href="/docs/extensions/zeromq/zeromq-streamer" class=''>ZeroMQ Streamer</a>
</li>
<li>
<a href="/docs/extensions/zookeeper/zookeeper-ip" class=''>ZooKeeper IP Finder</a>
</li>
</nav>
<div class="left-nav__overlay"></div>
<article data-swiftype-index='true'>
<a class='edit-link' href="/_docs/cdc/change-data-capture-extensions.adoc" target="_blank">Edit</a>
<h1>Change Data Capture Extension</h1>
<div id="preamble">
<div class="sectionbody">
<div class="admonitionblock warning">
<table>
<tr>
<td class="icon">
<div class="title">Warning</div>
</td>
<td class="content">
CDC is an experimental feature. API or design architecture might be changed.
</td>
</tr>
</table>
</div>
</div>
</div>
<div class="sect1">
<h2 id="overview">Overview</h2>
<div class="sectionbody">
<div class="paragraph">
<p><a href="https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext">Change Data Capture Extension</a> module provides two ways to set up cross cluster replication based on CDC.</p>
</div>
<div class="olist arabic">
<ol class="arabic">
<li>
<p><a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java">Ignite2IgniteCdcStreamer</a> - streams changes to destination cluster using client node.</p>
</li>
<li>
<p><a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java">Ignite2KafkaCdcStreamer</a> combined with <a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java">KafkaToIgniteCdcStreamer</a> streams changes to destination cluster using <a href="https://kafka.apache.org">Apache Kafka</a> as a transport.</p>
</li>
</ol>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
For each cache replicated between clusters <a href="https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java">CacheVersionConflictResolver</a> should be defined.
</td>
</tr>
</table>
</div>
</div>
</div>
<div class="sect1">
<h2 id="ignite-to-ignite-cdc-streamer">Ignite to Ignite CDC streamer</h2>
<div class="sectionbody">
<div class="paragraph">
<p>This streamer starts client node which connects to destination cluster.
After connection is established, all changes captured by CDC will be replicated to destination cluster.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
Instances of <code>ignite-cdc.sh</code> with configured streamer should be started on each server node of source cluster to capture all changes.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p><span class="image"><img src="/docs/images/CDC-design.svg" alt="CDC design"></span></p>
</div>
</div>
</div>
<div class="sect1">
<h2 id="configuration">Configuration</h2>
<div class="sectionbody">
<table class="tableblock frame-all grid-all stripes-even stretch">
<colgroup>
<col style="width: 20%;">
<col style="width: 45%;">
<col style="width: 35%;">
</colgroup>
<thead>
<tr>
<th class="tableblock halign-left valign-top">Name</th>
<th class="tableblock halign-left valign-top">Description</th>
<th class="tableblock halign-left valign-top">Default value</th>
</tr>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to replicate.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>destinationIgniteConfiguration</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Ignite configuration of client nodes that will connect to destination cluster to replicate changes.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>onlyPrimary</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Flag to handle changes only on primary node.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>false</code></p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>maxBatchSize</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Maximum number of events to be sent to destination cluster in a single batch.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">1024</p></td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="sect1">
<h2 id="metrics">Metrics</h2>
<div class="sectionbody">
<table class="tableblock frame-all grid-all stripes-even stretch">
<colgroup>
<col style="width: 50%;">
<col style="width: 50%;">
</colgroup>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">Name</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Description</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>EventsCount</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Count of messages applied to destination cluster.</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>LastEventTime</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Timestamp of last applied event.</p></td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="sect1">
<h2 id="cdc-replication-using-kafka">CDC replication using Kafka</h2>
<div class="sectionbody">
<div class="paragraph">
<p>This way to replicate changes between clusters requires setting up two applications:</p>
</div>
<div class="olist arabic">
<ol class="arabic">
<li>
<p><code>ignite-cdc.sh</code> with <code>org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer</code> that will capture changes from source cluster and write it to Kafka topic.</p>
</li>
<li>
<p><code>kafka-to-ignite.sh</code> that will read changes from Kafka topic and then write them to destination cluster.</p>
</li>
</ol>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
Instances of <code>ignite-cdc.sh</code> with configured streamer should be started on each server node of source cluster to capture all changes.
</td>
</tr>
</table>
</div>
<div class="sect2">
<h3 id="ignitetokafkacdcstreamer-configuration">IgniteToKafkaCdcStreamer Configuration</h3>
<table class="tableblock frame-all grid-all stripes-even stretch">
<colgroup>
<col style="width: 20%;">
<col style="width: 45%;">
<col style="width: 35%;">
</colgroup>
<thead>
<tr>
<th class="tableblock halign-left valign-top">Name</th>
<th class="tableblock halign-left valign-top">Description</th>
<th class="tableblock halign-left valign-top">Default value</th>
</tr>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to replicate.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaProperties</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Kafka producer properties.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>topic</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Name of the Kafka topic.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaParts</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Number of Kafka topic partitions.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>onlyPrimary</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Flag to handle changes only on primary node.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>false</code></p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>maxBatchSize</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Maximum size of concurrently produced Kafka records. When streamer reaches this number, it waits for Kafka acknowledgements, and then commits CDC offset.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>1024</code></p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaRequestTimeout</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Kafka request timeout in milliseconds.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>3000</code></p></td>
</tr>
</tbody>
</table>
</div>
<div class="sect2">
<h3 id="ignitetokafkacdcstreamer-metrics">IgniteToKafkaCdcStreamer Metrics</h3>
<table class="tableblock frame-all grid-all stripes-even stretch">
<colgroup>
<col style="width: 50%;">
<col style="width: 50%;">
</colgroup>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">Name</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Description</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>EventsCount</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Count of messages applied to destination cluster.</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>LastEventTime</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Timestamp of last applied event.</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>BytesSent</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Number of bytes send to Kafka.</p></td>
</tr>
</tbody>
</table>
</div>
<div class="sect2">
<h3 id="kafka-to-ignite-sh-application"><code>kafka-to-ignite.sh</code> application</h3>
<div class="paragraph">
<p>This application should be started near the destination cluster.
<code>kafka-to-ignite.sh</code> will read CDC events from Kafka topic and then apply them to destination cluster.</p>
</div>
<div class="admonitionblock important">
<table>
<tr>
<td class="icon">
<div class="title">Important</div>
</td>
<td class="content">
<code>kafka-to-ignite.sh</code> implements the fail-fast approach. It just fails in case of any error. The restart procedure should be configured with the OS tools.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p>Count of instances of the application does not corellate to the count of destination server nodes.
It should be just enough to process source cluster load.
Each instance of application will process configured subset of topic partitions to spread the load.
<code>KafkaConsumer</code> for each partition will be created to ensure fair reads.</p>
</div>
<div class="sect3">
<h4 id="installation">Installation</h4>
<div class="olist arabic">
<ol class="arabic">
<li>
<p>Build <code>cdc-ext</code> module with maven:</p>
<div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="console"><span class="gp"> $</span>~/src/ignite-extensions/&gt; mvn clean package <span class="nt">-DskipTests</span>
<span class="gp"> $</span>~/src/ignite-extensions/&gt; <span class="nb">ls </span>modules/cdc-ext/target | <span class="nb">grep </span>zip
<span class="go">ignite-cdc-ext.zip</span></code></pre>
</div>
</div>
</li>
<li>
<p>Unpack <code>ignite-cdc-ext.zip</code> archive to <code>$IGNITE_HOME</code> folder.</p>
</li>
</ol>
</div>
<div class="paragraph">
<p>Now, you have additional binary <code>$IGNITE_HOME/bin/kafka-to-ignite.sh</code> and <code>$IGNITE_HOME/libs/ignite-cdc-ext</code> module.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
Please, enable <code>ignite-cdc-ext</code> to be able to run <code>kafka-to-ignite.sh</code>.
</td>
</tr>
</table>
</div>
</div>
<div class="sect3">
<h4 id="configuration-2">Configuration</h4>
<div class="paragraph">
<p>Application configuration should be done using POJO classes or Spring xml file like regular Ignite node configuration.
Kafka to ignite configuration file should contain the following beans that will be loaded during startup:</p>
</div>
<div class="olist arabic">
<ol class="arabic">
<li>
<p><code>IgniteConfiguration</code> bean: Configuration of the client node that will connect to the destination cluster.</p>
</li>
<li>
<p><code>java.util.Properties</code> bean with the name <code>kafkaProperties</code>: Single Kafka consumer configuration.</p>
</li>
<li>
<p><code>org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration</code> bean: Options specific to <code>kafka-to-ignite.sh</code> application.</p>
</li>
</ol>
</div>
<table class="tableblock frame-all grid-all stripes-even stretch">
<colgroup>
<col style="width: 20%;">
<col style="width: 45%;">
<col style="width: 35%;">
</colgroup>
<thead>
<tr>
<th class="tableblock halign-left valign-top">Name</th>
<th class="tableblock halign-left valign-top">Description</th>
<th class="tableblock halign-left valign-top">Default value</th>
</tr>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to replicate.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>topic</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Name of the Kafka topic.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaPartsFrom</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Lower Kafka partitions number (inclusive).</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">-1</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaPartsTo</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Lower Kafka partitions number (exclusive).</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">-1</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaRequestTimeout</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Kafka request timeout in milliseconds.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>3000</code></p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>maxBatchSize</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Maximum number of events to be sent to destination cluster in a single batch.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">1024</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>threadCount</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">16</p></td>
</tr>
</tbody>
</table>
</div>
<div class="sect3">
<h4 id="logging">Logging</h4>
<div class="paragraph">
<p><code>kakfa-to-ignite.sh</code> uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file.</p>
</div>
</div>
</div>
</div>
</div>
<div class="sect1">
<h2 id="cacheversionconflictresolver-implementation">CacheVersionConflictResolver implementation</h2>
<div class="sectionbody">
<div class="paragraph">
<p>It expected that CDC streamers will be configured with the <code>onlyPrimary=false</code> in most real-world deployments to ensure fault-tolerance.
That means streamer will send the same change several times equal to <code>CacheConfiguration#backups</code> + 1.
At the same time concurrent updates of the same key can be done in replicated clusters.
<code>CacheVersionConflictResolver</code> used by Ignite node to selects or merge new (from update request) and existing (stored in the cluster) entry versions.
Selected entry version will be actually stored in the cluster.</p>
</div>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
Default implementation only select correct entry and never merge.
</td>
</tr>
</table>
</div>
<div class="paragraph">
<p><a href="https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java">CacheVersionConflictResolver</a> should be defined for each cache replicated between clusters.</p>
</div>
<div class="paragraph">
<p>Default <a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java">implementation</a> is available in cdc-ext.</p>
</div>
<div class="sect2">
<h3 id="configuration-3">Configuration</h3>
<table class="tableblock frame-all grid-all stripes-even stretch">
<colgroup>
<col style="width: 20%;">
<col style="width: 45%;">
<col style="width: 35%;">
</colgroup>
<thead>
<tr>
<th class="tableblock halign-left valign-top">Name</th>
<th class="tableblock halign-left valign-top">Description</th>
<th class="tableblock halign-left valign-top">Default value</th>
</tr>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>clusterId</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Local cluster id. Can be any value from 1 to 31.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to handle with this plugin instance.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock"><code>conflictResolveField</code></p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">Value field to resolve conflict with. Optional. Field values must implement <code>java.lang.Comparable</code>.</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td>
</tr>
</tbody>
</table>
</div>
<div class="sect2">
<h3 id="conflict-resolve-algorithm">Conflict resolve algorithm</h3>
<div class="paragraph">
<p>Replicated changes contain some additional data. Specifically, entry version from source cluster supplied with the changed data.
Default conflict resolve algorithm based on entry version and <code>conflictResolveField</code>.
Conflict resolution field should contain user provided monotonically increasing value such as query id or timestamp.</p>
</div>
<div class="olist arabic">
<ol class="arabic">
<li>
<p>Changes from the "local" cluster always win.</p>
</li>
<li>
<p>If both old and new entry from the same cluster version comparison used to determine order.</p>
</li>
<li>
<p>If <code>conflictResolveField</code> if provided then field values comparison used to determine order.</p>
</li>
<li>
<p>Conflict resolution failed. Update will be ignored.</p>
</li>
</ol>
</div>
</div>
<div class="sect2">
<h3 id="configuration-example">Configuration example</h3>
<div class="paragraph">
<p>Configuration is done via Ignite node plugin:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="xml"><span class="nt">&lt;property</span> <span class="na">name=</span><span class="s">"pluginProviders"</span><span class="nt">&gt;</span>
<span class="nt">&lt;bean</span> <span class="na">class=</span><span class="s">"org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider"</span><span class="nt">&gt;</span>
<span class="nt">&lt;property</span> <span class="na">name=</span><span class="s">"clusterId"</span> <span class="na">value=</span><span class="s">"1"</span> <span class="nt">/&gt;</span>
<span class="nt">&lt;property</span> <span class="na">name=</span><span class="s">"caches"</span><span class="nt">&gt;</span>
<span class="nt">&lt;util:list&gt;</span>
<span class="nt">&lt;bean</span> <span class="na">class=</span><span class="s">"java.lang.String"</span><span class="nt">&gt;</span>
<span class="nt">&lt;constructor-arg</span> <span class="na">type=</span><span class="s">"String"</span> <span class="na">value=</span><span class="s">"queryId"</span> <span class="nt">/&gt;</span>
<span class="nt">&lt;/bean&gt;</span>
<span class="nt">&lt;/util:list&gt;</span>
<span class="nt">&lt;/property&gt;</span>
<span class="nt">&lt;/bean&gt;</span>
<span class="nt">&lt;/property&gt;</span></code></pre>
</div>
</div>
</div>
</div>
</div>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div class="copyright">
© 2022 The Apache Software Foundation.<br/>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
</div>
</article>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<nav class="right-nav" data-swiftype-index='false'>
<ul class="sectlevel1">
<li><a href="#overview">Overview</a></li>
<li><a href="#ignite-to-ignite-cdc-streamer">Ignite to Ignite CDC streamer</a></li>
<li><a href="#configuration">Configuration</a></li>
<li><a href="#metrics">Metrics</a></li>
<li><a href="#cdc-replication-using-kafka">CDC replication using Kafka</a>
<ul class="sectlevel2">
<li><a href="#ignitetokafkacdcstreamer-configuration">IgniteToKafkaCdcStreamer Configuration</a></li>
<li><a href="#ignitetokafkacdcstreamer-metrics">IgniteToKafkaCdcStreamer Metrics</a></li>
<li><a href="#kafka-to-ignite-sh-application"><code>kafka-to-ignite.sh</code> application</a>
<ul class="sectlevel3">
<li><a href="#installation">Installation</a></li>
<li><a href="#configuration-2">Configuration</a></li>
<li><a href="#logging">Logging</a></li>
</ul>
</li>
</ul>
</li>
<li><a href="#cacheversionconflictresolver-implementation">CacheVersionConflictResolver implementation</a>
<ul class="sectlevel2">
<li><a href="#configuration-3">Configuration</a></li>
<li><a href="#conflict-resolve-algorithm">Conflict resolve algorithm</a></li>
<li><a href="#configuration-example">Configuration example</a></li>
</ul>
</li>
</ul>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<footer>
</footer>
</nav>
</section>
<script type='module' src='/assets/js/code-copy-to-clipboard.js' async></script>
<script>
// inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late
anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5');
anchors.options = {
placement: 'right',
visible: 'always'
};
</script>
</body>
<script type='module' src='/assets/js/index.js?1658382975' async></script>
</html>