| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| = Cross-cluster Replication with Kafka |
| |
| == CDC replication using Kafka |
| |
| This way to replicate changes between clusters requires setting up two applications: |
| |
| . `ignite-cdc.sh` with `org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer` that will capture changes from source cluster and write it to Kafka topic. |
| . `kafka-to-ignite.sh` that will read changes from Kafka topic and then write them to destination cluster. |
| |
| NOTE: Instances of `ignite-cdc.sh` with configured streamer should be started on each server node of source cluster to capture all changes. |
| |
| IMPORTANT: CDC trough Kafka requires _metadata topic with the only one partition_ for sequential ordering guarantees. |
| |
| image:../../assets/images/integrations/CDC-ignite2kafka.svg[] |
| |
| === IgniteToKafkaCdcStreamer Configuration |
| |
| [cols="20%,45%,35%",opts="header"] |
| |=== |
| |Name |Description | Default value |
| | `caches` | Set of cache names to replicate. | null |
| | `kafkaProperties` | Kafka producer properties. | null |
| | `topic` | Name of the Kafka topic for CDC events. | null |
| | `kafkaParts` | Number of Kafka partitions in CDC events topic. | null |
| | `metadataTopic` | Name of topic for replication of BinaryTypes and TypeMappings. | null |
| | `onlyPrimary` | Flag to handle changes only on primary node. | `false` |
| | `maxBatchSize` | Maximum size of concurrently produced Kafka records. When streamer reaches this number, it waits for Kafka acknowledgements, and then commits CDC offset. | `1024` |
| | `kafkaRequestTimeout` | Kafka request timeout in milliseconds. | `3000` |
| |=== |
| |
| * `kafkaRequestTimeout` property sets how much `IgniteToKafkaCdcStreamer` will wait for `KafkaProducer` to finish request. |
| |
| NOTE: `kafkaRequestTimeout` should not be too low. If wait time exceeds `kafkaRequestTimeout`, then `IgniteToKafkaCdcStreamer` will fail with a timeout error. |
| |
| * To specify `KafkaProducer` settings, use `kafkaProperties` property. We suggest to use a separate file to store all the necessary configuration properties and reference it from the IgniteToKafkaCdcStreamer configuration '.xml' file. See the examples below. |
| |
| `kafka.properties` |
| ``` |
| bootstrap.servers=xxx.x.x.x:9092 |
| request.timeout.ms=10000 |
| ``` |
| |
| IgniteToKafkaCdcStreamer bean declaration in `ignite-to-kafka-streamer-config.xml` |
| ``` |
| <bean id="cdc.streamer" class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer"> |
| <property name="topic" value="${send_data_kafka_topic_name}"/> |
| <property name="metadataTopic" value="${send_metadata_kafka_topic_name}"/> |
| <property name="kafkaPartitions" value="${send_kafka_partitions}"/> |
| <property name="caches"> |
| <list> |
| <value>terminator</value> |
| </list> |
| </property> |
| <property name="onlyPrimary" value="false"/> |
| <property name="kafkaProperties" ref="kafkaProperties"/> |
| </bean> |
| |
| <util:properties id="kafkaProperties" location="file:kafka_properties_path/kafka.properties"/> |
| ``` |
| |
| |
| NOTE: link:https://kafka.apache.org/documentation/#producerconfigs_request.timeout.ms[request.timeout.ms] Kafka producer property is mandatory for streamer configuration. For more details you should refer to a link:https://kafka.apache.org/documentation/#configuration[configuration] |
| section of the official Kafka documentation. |
| |
| === IgniteToKafkaCdcStreamer Metrics |
| |
| [cols="30%,70%",opts="header"] |
| |=== |
| |Name |Description |
| | `EventsCount` | Count of messages applied to Kafka. |
| | `LastEventTime` | Timestamp of last applied event to Kafka. |
| | `TypesCount` | Count of binary types events applied to Kafka. |
| | `MappingsCount` | Count of mappings events applied to Kafka. |
| | `BytesSent` | Count of bytes sent to Kafka. |
| | `MarkersCount` | Count of metadata markers sent to Kafka. |
| |=== |
| |
| === `kafka-to-ignite.sh` application |
| |
| This application should be started near the destination cluster. |
| `kafka-to-ignite.sh` will read CDC events from Kafka topic and then apply them to destination cluster. |
| |
| IMPORTANT: `kafka-to-ignite.sh` implements the fail-fast approach. It just fails in case of any error. The restart procedure should be configured with the OS tools. |
| |
| Count of instances of the application does not corellate to the count of destination server nodes. |
| It should be just enough to process source cluster load. |
| Each instance of application will process configured subset of topic partitions to spread the load. |
| `KafkaConsumer` for each partition will be created to ensure fair reads. |
| |
| ==== Configuration |
| |
| Application configuration should be done using POJO classes or Spring xml file like regular Ignite node configuration. |
| Kafka to Ignite configuration file should contain the following beans that will be loaded during startup: |
| |
| . One of the configuration beans to define a client type that will connect to the destination cluster: |
| - `IgniteConfiguration` bean: Configuration of a client node. |
| - `ClientConfiguration` bean: Configuration of a link:thin-clients/java-thin-client[Java Thin Client]. |
| . `java.util.Properties` bean with the name `kafkaProperties`: Single Kafka consumer configuration. |
| . `org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration` bean: Options specific to `kafka-to-ignite.sh` application. |
| |
| [cols="25%,45%,30%",opts="header"] |
| |=== |
| |Name |Description | Default value |
| | `caches` | Set of cache names to replicate. | null |
| | `topic` | Name of the Kafka topic for CDC events. | null |
| | `kafkaPartsFrom` | Lower Kafka partitions number (inclusive) for CDC events topic. | -1 |
| | `kafkaPartsTo` | Lower Kafka partitions number (exclusive) for CDC events topic. | -1 |
| | `metadataTopic` | Name of topic for replication of BinaryTypes and TypeMappings. | null |
| | `metadataConsumerGroup` | Group for `KafkaConsumer`, which polls from metadata topic | ignite-metadata-update-<kafkaPartsFrom>-<kafkaPartsTo> |
| | `kafkaRequestTimeout` | Kafka request timeout in milliseconds. | `3000` |
| | `kafkaConsumerPollTimeout` | Kafka poll timeout in milliseconds. | `3000` |
| | `maxBatchSize` | Maximum number of events to be sent to destination cluster in a single batch. | 1024 |
| | `threadCount` | Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner. | 16 |
| |`metricRegistryName`| Name for metric registry. `org.apache.metricRegistryName.cdc.applier` | cdc-kafka-to-ignite |
| |=== |
| |
| * `kafkaRequestTimeout` property is used as timeout for `KafkaConsumer` methods (except for `KafkaConsumer#poll`). |
| |
| NOTE: `kafkaRequestTimeout` should not be too low, otherwise you are risking the application fail on method execution. |
| |
| * `kafkaConsumerPollTimeout` property is used as timeout for `KafkaConsumer#poll` method. |
| |
| NOTE: High `kafkaConsumerPollTimeout` property setting might greatly affect replication performance. Kafka topics partitions are equally distributed among threads (see `threadCount`). Each thread can only poll one partition at a time, meaning no other partition, asigned to the same thread, will be polled from while the current is not handled. |
| |
| * To specify `KafkaConsumer` settings, use `kafkaProperties` bean. Basically, you need to use a separate file to store all the necessary configuration properties and reference it from the KafkaToIgniteCdcStreamer configuration '.xml' file. See the examples below. |
| |
| `kafka.properties` |
| ``` |
| bootstrap.servers=127.0.0.1:9092 |
| request.timeout.ms=10000 |
| group.id=kafka-to-ignite-dc1 |
| auto.offset.reset=earliest |
| enable.auto.commit=false |
| ``` |
| |
| Kafka properties bean declaration in `kafka-to-ignite-streamer-config.xml` |
| ``` |
| <util:properties id="kafkaProperties" location="file:kafka_properties_path/kafka.properties"/> |
| ``` |
| |
| |
| NOTE: link:https://kafka.apache.org/documentation/#consumerconfigs_request.timeout.ms[request.timeout.ms] Kafka consumer property is mandatory for streamer configuration. |
| |
| === Metrics |
| |
| [cols="35%,65%",opts="header"] |
| |=== |
| |Name |Description |
| | `EventsReceivedCount` | Count of events received from Kafka. |
| | `LastEventReceivedTime` | Timestamp of last received event from Kafka. |
| | `EventsSentCount` | Count of events sent to destination cluster. |
| | `LastBatchSentTime` | Timestamp of last sent batch to the destination cluster. |
| | `MarkersCount` | Count of metadata markers received from Kafka. |
| |=== |
| |
| ==== Logging |
| |
| `kafka-to-ignite.sh` uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file. |
| |
| == Fault tolerance |
| It expected that CDC streamers will be configured with the `onlyPrimary=false` in most real-world deployments to ensure fault-tolerance. |
| That means streamer will send the same change several times equal to `CacheConfiguration#backups` + 1. |