| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| <!DOCTYPE html> |
| |
| |
| |
| <html lang="en"> |
| <head> |
| <!-- Global site tag (gtag.js) - Google Analytics --> |
| <script async src="https://www.googletagmanager.com/gtag/js?id=UA-1382082-1"></script> |
| <script> |
| window.dataLayer = window.dataLayer || []; |
| function gtag(){dataLayer.push(arguments);} |
| gtag('js', new Date()); |
| |
| gtag('config', 'UA-61232409-1'); |
| </script> |
| |
| |
| |
| <meta charset="UTF-8"> |
| <title>Change Data Capture Extension | Ignite Documentation</title> |
| |
| <link rel="canonical" href="/docs/cdc/change-data-capture-extensions" /> |
| |
| |
| <link rel="stylesheet" href="/assets/css/styles.css?1658382975"> |
| <link rel="stylesheet" href="/assets/css/asciidoc-pygments.css"> |
| <link rel="shortcut icon" href="/favicon.ico"> |
| <meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'> |
| |
| <link rel="stylesheet" |
| href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> |
| |
| <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/jquery.swiftype.autocomplete.js?1658382975"></script> |
| <script type="text/javascript" src="/assets/js/anchor.min.js?1658382975"></script> |
| |
| |
| </head> |
| <body> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <header> |
| <!--#include virtual="/includes/promotion_banner.html" --> |
| <div class="container"> |
| <button type='button' class='menu' title='Docs menu'> |
| <img src="/assets/images/menu-icon.svg" width="18" height="12" alt="menu icon" /> |
| </button> |
| <div class='home'> |
| <a href="/" class='home' title='Apache Ignite home'> |
| <img src="/assets/images/apache_ignite_logo.svg" alt="Apache Ignite logo" width="103" height="36" > |
| </a> |
| </div> |
| <form class='search'> |
| <button class="search-close" type='button'><img src='/assets/images/cancel.svg' alt="close" width="10" height="10" /></button> |
| <input type="search" placeholder="Search…" id="search-input"> |
| </form> |
| <button type='button' class='top-nav-toggle'>⋮</button> |
| </div> |
| </header> |
| |
| |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| <link rel="stylesheet" href="/assets/css/docs.css"> |
| <section class='page-docs'> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| |
| |
| |
| <nav class='left-nav' data-swiftype-index='false'> |
| |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/aws/aws" class=''>Amazon S3 IP Finder</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/camel/camel-streamer" class=''>Apache Camel Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/flink/flink-streamer" class=''>Apache Flink Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/flume/flume-sink" class=''>Apache Flume Sink</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/azure/azure" class=''>Apache Ignite Azure Module</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/gce/gce" class=''>Apache Ignite GCE Module</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/pub-sub/pub-sub" class=''>Apache Ignite Pub/Sub Module</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-boot" class=''>Apache Ignite and Spring Boot</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-data" class=''>Apache Ignite and Spring Data</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-caching" class=''>Apache Ignite and Spring Cache</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-sessions" class=''>Apache Ignite and Spring Session</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/spring/spring-tx" class=''>Apache Ignite and Spring Transactions</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/kafka/kafka-streamer" class=''>Apache Kafka Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/storm/storm-streamer" class=''>Apache Storm Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/cdc/change-data-capture-extensions" class=''>Change Data Capture Extension</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/jms/jms-streamer" class=''>JMS Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/mqtt/mqtt-streamer" class=''>MQTT Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/perf-statistics/performance-statistics" class=''>Performance Statistics Extension</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/rocketmq/rocketmq-streamer" class=''>RocketMQ Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/topology-validator/topology-validator" class=''>Topology Validator</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/twitter/twitter-streamer" class=''>Twitter Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/zeromq/zeromq-streamer" class=''>ZeroMQ Streamer</a> |
| |
| </li> |
| |
| <li> |
| |
| |
| <a href="/docs/extensions/zookeeper/zookeeper-ip" class=''>ZooKeeper IP Finder</a> |
| |
| </li> |
| |
| </nav> |
| <div class="left-nav__overlay"></div> |
| |
| |
| <article data-swiftype-index='true'> |
| <a class='edit-link' href="/_docs/cdc/change-data-capture-extensions.adoc" target="_blank">Edit</a> |
| |
| <h1>Change Data Capture Extension</h1> |
| |
| <div id="preamble"> |
| <div class="sectionbody"> |
| <div class="admonitionblock warning"> |
| <table> |
| <tr> |
| <td class="icon"> |
| <div class="title">Warning</div> |
| </td> |
| <td class="content"> |
| CDC is an experimental feature. API or design architecture might be changed. |
| </td> |
| </tr> |
| </table> |
| </div> |
| </div> |
| </div> |
| <div class="sect1"> |
| <h2 id="overview">Overview</h2> |
| <div class="sectionbody"> |
| <div class="paragraph"> |
| <p><a href="https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext">Change Data Capture Extension</a> module provides two ways to set up cross cluster replication based on CDC.</p> |
| </div> |
| <div class="olist arabic"> |
| <ol class="arabic"> |
| <li> |
| <p><a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java">Ignite2IgniteCdcStreamer</a> - streams changes to destination cluster using client node.</p> |
| </li> |
| <li> |
| <p><a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java">Ignite2KafkaCdcStreamer</a> combined with <a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java">KafkaToIgniteCdcStreamer</a> streams changes to destination cluster using <a href="https://kafka.apache.org">Apache Kafka</a> as a transport.</p> |
| </li> |
| </ol> |
| </div> |
| <div class="admonitionblock note"> |
| <table> |
| <tr> |
| <td class="icon"> |
| <div class="title">Note</div> |
| </td> |
| <td class="content"> |
| For each cache replicated between clusters <a href="https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java">CacheVersionConflictResolver</a> should be defined. |
| </td> |
| </tr> |
| </table> |
| </div> |
| </div> |
| </div> |
| <div class="sect1"> |
| <h2 id="ignite-to-ignite-cdc-streamer">Ignite to Ignite CDC streamer</h2> |
| <div class="sectionbody"> |
| <div class="paragraph"> |
| <p>This streamer starts client node which connects to destination cluster. |
| After connection is established, all changes captured by CDC will be replicated to destination cluster.</p> |
| </div> |
| <div class="admonitionblock note"> |
| <table> |
| <tr> |
| <td class="icon"> |
| <div class="title">Note</div> |
| </td> |
| <td class="content"> |
| Instances of <code>ignite-cdc.sh</code> with configured streamer should be started on each server node of source cluster to capture all changes. |
| </td> |
| </tr> |
| </table> |
| </div> |
| <div class="paragraph"> |
| <p><span class="image"><img src="/docs/images/CDC-design.svg" alt="CDC design"></span></p> |
| </div> |
| </div> |
| </div> |
| <div class="sect1"> |
| <h2 id="configuration">Configuration</h2> |
| <div class="sectionbody"> |
| <table class="tableblock frame-all grid-all stripes-even stretch"> |
| <colgroup> |
| <col style="width: 20%;"> |
| <col style="width: 45%;"> |
| <col style="width: 35%;"> |
| </colgroup> |
| <thead> |
| <tr> |
| <th class="tableblock halign-left valign-top">Name</th> |
| <th class="tableblock halign-left valign-top">Description</th> |
| <th class="tableblock halign-left valign-top">Default value</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to replicate.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>destinationIgniteConfiguration</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Ignite configuration of client nodes that will connect to destination cluster to replicate changes.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>onlyPrimary</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Flag to handle changes only on primary node.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>false</code></p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>maxBatchSize</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Maximum number of events to be sent to destination cluster in a single batch.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">1024</p></td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| </div> |
| <div class="sect1"> |
| <h2 id="metrics">Metrics</h2> |
| <div class="sectionbody"> |
| <table class="tableblock frame-all grid-all stripes-even stretch"> |
| <colgroup> |
| <col style="width: 50%;"> |
| <col style="width: 50%;"> |
| </colgroup> |
| <tbody> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Name</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Description</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>EventsCount</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Count of messages applied to destination cluster.</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>LastEventTime</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Timestamp of last applied event.</p></td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| </div> |
| <div class="sect1"> |
| <h2 id="cdc-replication-using-kafka">CDC replication using Kafka</h2> |
| <div class="sectionbody"> |
| <div class="paragraph"> |
| <p>This way to replicate changes between clusters requires setting up two applications:</p> |
| </div> |
| <div class="olist arabic"> |
| <ol class="arabic"> |
| <li> |
| <p><code>ignite-cdc.sh</code> with <code>org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer</code> that will capture changes from source cluster and write it to Kafka topic.</p> |
| </li> |
| <li> |
| <p><code>kafka-to-ignite.sh</code> that will read changes from Kafka topic and then write them to destination cluster.</p> |
| </li> |
| </ol> |
| </div> |
| <div class="admonitionblock note"> |
| <table> |
| <tr> |
| <td class="icon"> |
| <div class="title">Note</div> |
| </td> |
| <td class="content"> |
| Instances of <code>ignite-cdc.sh</code> with configured streamer should be started on each server node of source cluster to capture all changes. |
| </td> |
| </tr> |
| </table> |
| </div> |
| <div class="sect2"> |
| <h3 id="ignitetokafkacdcstreamer-configuration">IgniteToKafkaCdcStreamer Configuration</h3> |
| <table class="tableblock frame-all grid-all stripes-even stretch"> |
| <colgroup> |
| <col style="width: 20%;"> |
| <col style="width: 45%;"> |
| <col style="width: 35%;"> |
| </colgroup> |
| <thead> |
| <tr> |
| <th class="tableblock halign-left valign-top">Name</th> |
| <th class="tableblock halign-left valign-top">Description</th> |
| <th class="tableblock halign-left valign-top">Default value</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to replicate.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaProperties</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Kafka producer properties.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>topic</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Name of the Kafka topic.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaParts</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Number of Kafka topic partitions.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>onlyPrimary</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Flag to handle changes only on primary node.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>false</code></p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>maxBatchSize</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Maximum size of concurrently produced Kafka records. When streamer reaches this number, it waits for Kafka acknowledgements, and then commits CDC offset.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>1024</code></p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaRequestTimeout</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Kafka request timeout in milliseconds.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>3000</code></p></td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| <div class="sect2"> |
| <h3 id="ignitetokafkacdcstreamer-metrics">IgniteToKafkaCdcStreamer Metrics</h3> |
| <table class="tableblock frame-all grid-all stripes-even stretch"> |
| <colgroup> |
| <col style="width: 50%;"> |
| <col style="width: 50%;"> |
| </colgroup> |
| <tbody> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Name</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Description</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>EventsCount</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Count of messages applied to destination cluster.</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>LastEventTime</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Timestamp of last applied event.</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>BytesSent</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Number of bytes send to Kafka.</p></td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| <div class="sect2"> |
| <h3 id="kafka-to-ignite-sh-application"><code>kafka-to-ignite.sh</code> application</h3> |
| <div class="paragraph"> |
| <p>This application should be started near the destination cluster. |
| <code>kafka-to-ignite.sh</code> will read CDC events from Kafka topic and then apply them to destination cluster.</p> |
| </div> |
| <div class="admonitionblock important"> |
| <table> |
| <tr> |
| <td class="icon"> |
| <div class="title">Important</div> |
| </td> |
| <td class="content"> |
| <code>kafka-to-ignite.sh</code> implements the fail-fast approach. It just fails in case of any error. The restart procedure should be configured with the OS tools. |
| </td> |
| </tr> |
| </table> |
| </div> |
| <div class="paragraph"> |
| <p>Count of instances of the application does not corellate to the count of destination server nodes. |
| It should be just enough to process source cluster load. |
| Each instance of application will process configured subset of topic partitions to spread the load. |
| <code>KafkaConsumer</code> for each partition will be created to ensure fair reads.</p> |
| </div> |
| <div class="sect3"> |
| <h4 id="installation">Installation</h4> |
| <div class="olist arabic"> |
| <ol class="arabic"> |
| <li> |
| <p>Build <code>cdc-ext</code> module with maven:</p> |
| <div class="listingblock"> |
| <div class="content"> |
| <pre class="rouge highlight"><code data-lang="console"><span class="gp"> $</span>~/src/ignite-extensions/> mvn clean package <span class="nt">-DskipTests</span> |
| <span class="gp"> $</span>~/src/ignite-extensions/> <span class="nb">ls </span>modules/cdc-ext/target | <span class="nb">grep </span>zip |
| <span class="go">ignite-cdc-ext.zip</span></code></pre> |
| </div> |
| </div> |
| </li> |
| <li> |
| <p>Unpack <code>ignite-cdc-ext.zip</code> archive to <code>$IGNITE_HOME</code> folder.</p> |
| </li> |
| </ol> |
| </div> |
| <div class="paragraph"> |
| <p>Now, you have additional binary <code>$IGNITE_HOME/bin/kafka-to-ignite.sh</code> and <code>$IGNITE_HOME/libs/ignite-cdc-ext</code> module.</p> |
| </div> |
| <div class="admonitionblock note"> |
| <table> |
| <tr> |
| <td class="icon"> |
| <div class="title">Note</div> |
| </td> |
| <td class="content"> |
| Please, enable <code>ignite-cdc-ext</code> to be able to run <code>kafka-to-ignite.sh</code>. |
| </td> |
| </tr> |
| </table> |
| </div> |
| </div> |
| <div class="sect3"> |
| <h4 id="configuration-2">Configuration</h4> |
| <div class="paragraph"> |
| <p>Application configuration should be done using POJO classes or Spring xml file like regular Ignite node configuration. |
| Kafka to ignite configuration file should contain the following beans that will be loaded during startup:</p> |
| </div> |
| <div class="olist arabic"> |
| <ol class="arabic"> |
| <li> |
| <p><code>IgniteConfiguration</code> bean: Configuration of the client node that will connect to the destination cluster.</p> |
| </li> |
| <li> |
| <p><code>java.util.Properties</code> bean with the name <code>kafkaProperties</code>: Single Kafka consumer configuration.</p> |
| </li> |
| <li> |
| <p><code>org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration</code> bean: Options specific to <code>kafka-to-ignite.sh</code> application.</p> |
| </li> |
| </ol> |
| </div> |
| <table class="tableblock frame-all grid-all stripes-even stretch"> |
| <colgroup> |
| <col style="width: 20%;"> |
| <col style="width: 45%;"> |
| <col style="width: 35%;"> |
| </colgroup> |
| <thead> |
| <tr> |
| <th class="tableblock halign-left valign-top">Name</th> |
| <th class="tableblock halign-left valign-top">Description</th> |
| <th class="tableblock halign-left valign-top">Default value</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to replicate.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>topic</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Name of the Kafka topic.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaPartsFrom</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Lower Kafka partitions number (inclusive).</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">-1</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaPartsTo</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Lower Kafka partitions number (exclusive).</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">-1</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>kafkaRequestTimeout</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Kafka request timeout in milliseconds.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>3000</code></p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>maxBatchSize</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Maximum number of events to be sent to destination cluster in a single batch.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">1024</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>threadCount</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">16</p></td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| <div class="sect3"> |
| <h4 id="logging">Logging</h4> |
| <div class="paragraph"> |
| <p><code>kakfa-to-ignite.sh</code> uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file.</p> |
| </div> |
| </div> |
| </div> |
| </div> |
| </div> |
| <div class="sect1"> |
| <h2 id="cacheversionconflictresolver-implementation">CacheVersionConflictResolver implementation</h2> |
| <div class="sectionbody"> |
| <div class="paragraph"> |
| <p>It expected that CDC streamers will be configured with the <code>onlyPrimary=false</code> in most real-world deployments to ensure fault-tolerance. |
| That means streamer will send the same change several times equal to <code>CacheConfiguration#backups</code> + 1. |
| At the same time concurrent updates of the same key can be done in replicated clusters. |
| <code>CacheVersionConflictResolver</code> used by Ignite node to selects or merge new (from update request) and existing (stored in the cluster) entry versions. |
| Selected entry version will be actually stored in the cluster.</p> |
| </div> |
| <div class="admonitionblock note"> |
| <table> |
| <tr> |
| <td class="icon"> |
| <div class="title">Note</div> |
| </td> |
| <td class="content"> |
| Default implementation only select correct entry and never merge. |
| </td> |
| </tr> |
| </table> |
| </div> |
| <div class="paragraph"> |
| <p><a href="https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java">CacheVersionConflictResolver</a> should be defined for each cache replicated between clusters.</p> |
| </div> |
| <div class="paragraph"> |
| <p>Default <a href="https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java">implementation</a> is available in cdc-ext.</p> |
| </div> |
| <div class="sect2"> |
| <h3 id="configuration-3">Configuration</h3> |
| <table class="tableblock frame-all grid-all stripes-even stretch"> |
| <colgroup> |
| <col style="width: 20%;"> |
| <col style="width: 45%;"> |
| <col style="width: 35%;"> |
| </colgroup> |
| <thead> |
| <tr> |
| <th class="tableblock halign-left valign-top">Name</th> |
| <th class="tableblock halign-left valign-top">Description</th> |
| <th class="tableblock halign-left valign-top">Default value</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>clusterId</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Local cluster id. Can be any value from 1 to 31.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>caches</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Set of cache names to handle with this plugin instance.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| <tr> |
| <td class="tableblock halign-left valign-top"><p class="tableblock"><code>conflictResolveField</code></p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">Value field to resolve conflict with. Optional. Field values must implement <code>java.lang.Comparable</code>.</p></td> |
| <td class="tableblock halign-left valign-top"><p class="tableblock">null</p></td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| <div class="sect2"> |
| <h3 id="conflict-resolve-algorithm">Conflict resolve algorithm</h3> |
| <div class="paragraph"> |
| <p>Replicated changes contain some additional data. Specifically, entry version from source cluster supplied with the changed data. |
| Default conflict resolve algorithm based on entry version and <code>conflictResolveField</code>. |
| Conflict resolution field should contain user provided monotonically increasing value such as query id or timestamp.</p> |
| </div> |
| <div class="olist arabic"> |
| <ol class="arabic"> |
| <li> |
| <p>Changes from the "local" cluster always win.</p> |
| </li> |
| <li> |
| <p>If both old and new entry from the same cluster version comparison used to determine order.</p> |
| </li> |
| <li> |
| <p>If <code>conflictResolveField</code> if provided then field values comparison used to determine order.</p> |
| </li> |
| <li> |
| <p>Conflict resolution failed. Update will be ignored.</p> |
| </li> |
| </ol> |
| </div> |
| </div> |
| <div class="sect2"> |
| <h3 id="configuration-example">Configuration example</h3> |
| <div class="paragraph"> |
| <p>Configuration is done via Ignite node plugin:</p> |
| </div> |
| <div class="listingblock"> |
| <div class="content"> |
| <pre class="rouge highlight"><code data-lang="xml"><span class="nt"><property</span> <span class="na">name=</span><span class="s">"pluginProviders"</span><span class="nt">></span> |
| <span class="nt"><bean</span> <span class="na">class=</span><span class="s">"org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider"</span><span class="nt">></span> |
| <span class="nt"><property</span> <span class="na">name=</span><span class="s">"clusterId"</span> <span class="na">value=</span><span class="s">"1"</span> <span class="nt">/></span> |
| <span class="nt"><property</span> <span class="na">name=</span><span class="s">"caches"</span><span class="nt">></span> |
| <span class="nt"><util:list></span> |
| <span class="nt"><bean</span> <span class="na">class=</span><span class="s">"java.lang.String"</span><span class="nt">></span> |
| <span class="nt"><constructor-arg</span> <span class="na">type=</span><span class="s">"String"</span> <span class="na">value=</span><span class="s">"queryId"</span> <span class="nt">/></span> |
| <span class="nt"></bean></span> |
| <span class="nt"></util:list></span> |
| <span class="nt"></property></span> |
| <span class="nt"></bean></span> |
| <span class="nt"></property></span></code></pre> |
| </div> |
| </div> |
| </div> |
| </div> |
| </div> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <div class="copyright"> |
| © 2024 The Apache Software Foundation.<br/> |
| Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation. |
| |
| </div> |
| |
| </article> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| <nav class="right-nav" data-swiftype-index='false'> |
| <ul class="sectlevel1"> |
| <li><a href="#overview">Overview</a></li> |
| <li><a href="#ignite-to-ignite-cdc-streamer">Ignite to Ignite CDC streamer</a></li> |
| <li><a href="#configuration">Configuration</a></li> |
| <li><a href="#metrics">Metrics</a></li> |
| <li><a href="#cdc-replication-using-kafka">CDC replication using Kafka</a> |
| <ul class="sectlevel2"> |
| <li><a href="#ignitetokafkacdcstreamer-configuration">IgniteToKafkaCdcStreamer Configuration</a></li> |
| <li><a href="#ignitetokafkacdcstreamer-metrics">IgniteToKafkaCdcStreamer Metrics</a></li> |
| <li><a href="#kafka-to-ignite-sh-application"><code>kafka-to-ignite.sh</code> application</a> |
| <ul class="sectlevel3"> |
| <li><a href="#installation">Installation</a></li> |
| <li><a href="#configuration-2">Configuration</a></li> |
| <li><a href="#logging">Logging</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#cacheversionconflictresolver-implementation">CacheVersionConflictResolver implementation</a> |
| <ul class="sectlevel2"> |
| <li><a href="#configuration-3">Configuration</a></li> |
| <li><a href="#conflict-resolve-algorithm">Conflict resolve algorithm</a></li> |
| <li><a href="#configuration-example">Configuration example</a></li> |
| </ul> |
| </li> |
| </ul> |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --> |
| |
| |
| <footer> |
| </footer> |
| |
| </nav> |
| |
| </section> |
| <script type='module' src='/assets/js/code-copy-to-clipboard.js' async></script> |
| |
| <script> |
| // inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late |
| anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5'); |
| anchors.options = { |
| placement: 'right', |
| visible: 'always' |
| }; |
| </script> |
| </body> |
| <script type='module' src='/assets/js/index.js?1658382975' async></script> |
| </html> |