blob: eb79ddfc2d7de22d8f813cd920901087c9dcfb8d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Cross-cluster Replication with Kafka
== CDC replication using Kafka
This way to replicate changes between clusters requires setting up two applications:
. `ignite-cdc.sh` with `org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer` that will capture changes from source cluster and write it to Kafka topic.
. `kafka-to-ignite.sh` that will read changes from Kafka topic and then write them to destination cluster.
NOTE: Instances of `ignite-cdc.sh` with configured streamer should be started on each server node of source cluster to capture all changes.
IMPORTANT: CDC trough Kafka requires _metadata topic with the only one partition_ for sequential ordering guarantees.
image:../../assets/images/integrations/CDC-ignite2kafka.svg[]
=== IgniteToKafkaCdcStreamer Configuration
[cols="20%,45%,35%",opts="header"]
|===
|Name |Description | Default value
| `caches` | Set of cache names to replicate. | null
| `kafkaProperties` | Kafka producer properties. | null
| `topic` | Name of the Kafka topic for CDC events. | null
| `kafkaParts` | Number of Kafka partitions in CDC events topic. | null
| `metadataTopic` | Name of topic for replication of BinaryTypes and TypeMappings. | null
| `onlyPrimary` | Flag to handle changes only on primary node. | `false`
| `maxBatchSize` | Maximum size of concurrently produced Kafka records. When streamer reaches this number, it waits for Kafka acknowledgements, and then commits CDC offset. | `1024`
| `kafkaRequestTimeout` | Kafka request timeout in milliseconds. | `3000`
|===
* `kafkaRequestTimeout` property sets how much `IgniteToKafkaCdcStreamer` will wait for `KafkaProducer` to finish request.
NOTE: `kafkaRequestTimeout` should not be too low. If wait time exceeds `kafkaRequestTimeout`, then `IgniteToKafkaCdcStreamer` will fail with a timeout error.
* To specify `KafkaProducer` settings, use `kafkaProperties` property. We suggest to use a separate file to store all the necessary configuration properties and reference it from the IgniteToKafkaCdcStreamer configuration '.xml' file. See the examples below.
`kafka.properties`
```
bootstrap.servers=xxx.x.x.x:9092
request.timeout.ms=10000
```
IgniteToKafkaCdcStreamer bean declaration in `ignite-to-kafka-streamer-config.xml`
```
<bean id="cdc.streamer" class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer">
<property name="topic" value="${send_data_kafka_topic_name}"/>
<property name="metadataTopic" value="${send_metadata_kafka_topic_name}"/>
<property name="kafkaPartitions" value="${send_kafka_partitions}"/>
<property name="caches">
<list>
<value>terminator</value>
</list>
</property>
<property name="onlyPrimary" value="false"/>
<property name="kafkaProperties" ref="kafkaProperties"/>
</bean>
<util:properties id="kafkaProperties" location="file:kafka_properties_path/kafka.properties"/>
```
NOTE: link:https://kafka.apache.org/documentation/#producerconfigs_request.timeout.ms[request.timeout.ms] Kafka producer property is mandatory for streamer configuration. For more details you should refer to a link:https://kafka.apache.org/documentation/#configuration[configuration]
section of the official Kafka documentation.
=== IgniteToKafkaCdcStreamer Metrics
[cols="30%,70%",opts="header"]
|===
|Name |Description
| `EventsCount` | Count of messages applied to Kafka.
| `LastEventTime` | Timestamp of last applied event to Kafka.
| `TypesCount` | Count of binary types events applied to Kafka.
| `MappingsCount` | Count of mappings events applied to Kafka.
| `BytesSent` | Count of bytes sent to Kafka.
| `MarkersCount` | Count of metadata markers sent to Kafka.
|===
=== `kafka-to-ignite.sh` application
This application should be started near the destination cluster.
`kafka-to-ignite.sh` will read CDC events from Kafka topic and then apply them to destination cluster.
IMPORTANT: `kafka-to-ignite.sh` implements the fail-fast approach. It just fails in case of any error. The restart procedure should be configured with the OS tools.
Count of instances of the application does not corellate to the count of destination server nodes.
It should be just enough to process source cluster load.
Each instance of application will process configured subset of topic partitions to spread the load.
`KafkaConsumer` for each partition will be created to ensure fair reads.
==== Configuration
Application configuration should be done using POJO classes or Spring xml file like regular Ignite node configuration.
Kafka to Ignite configuration file should contain the following beans that will be loaded during startup:
. One of the configuration beans to define a client type that will connect to the destination cluster:
- `IgniteConfiguration` bean: Configuration of a client node.
- `ClientConfiguration` bean: Configuration of a link:thin-clients/java-thin-client[Java Thin Client].
. `java.util.Properties` bean with the name `kafkaProperties`: Single Kafka consumer configuration.
. `org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration` bean: Options specific to `kafka-to-ignite.sh` application.
[cols="25%,45%,30%",opts="header"]
|===
|Name |Description | Default value
| `caches` | Set of cache names to replicate. | null
| `topic` | Name of the Kafka topic for CDC events. | null
| `kafkaPartsFrom` | Lower Kafka partitions number (inclusive) for CDC events topic. | -1
| `kafkaPartsTo` | Lower Kafka partitions number (exclusive) for CDC events topic. | -1
| `metadataTopic` | Name of topic for replication of BinaryTypes and TypeMappings. | null
| `metadataConsumerGroup` | Group for `KafkaConsumer`, which polls from metadata topic | ignite-metadata-update-<kafkaPartsFrom>-<kafkaPartsTo>
| `kafkaRequestTimeout` | Kafka request timeout in milliseconds. | `3000`
| `kafkaConsumerPollTimeout` | Kafka poll timeout in milliseconds. | `3000`
| `maxBatchSize` | Maximum number of events to be sent to destination cluster in a single batch. | 1024
| `threadCount` | Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner. | 16
|`metricRegistryName`| Name for metric registry. `org.apache.metricRegistryName.cdc.applier` | cdc-kafka-to-ignite
|===
* `kafkaRequestTimeout` property is used as timeout for `KafkaConsumer` methods (except for `KafkaConsumer#poll`).
NOTE: `kafkaRequestTimeout` should not be too low, otherwise you are risking the application fail on method execution.
* `kafkaConsumerPollTimeout` property is used as timeout for `KafkaConsumer#poll` method.
NOTE: High `kafkaConsumerPollTimeout` property setting might greatly affect replication performance. Kafka topics partitions are equally distributed among threads (see `threadCount`). Each thread can only poll one partition at a time, meaning no other partition, asigned to the same thread, will be polled from while the current is not handled.
* To specify `KafkaConsumer` settings, use `kafkaProperties` bean. Basically, you need to use a separate file to store all the necessary configuration properties and reference it from the KafkaToIgniteCdcStreamer configuration '.xml' file. See the examples below.
`kafka.properties`
```
bootstrap.servers=127.0.0.1:9092
request.timeout.ms=10000
group.id=kafka-to-ignite-dc1
auto.offset.reset=earliest
enable.auto.commit=false
```
Kafka properties bean declaration in `kafka-to-ignite-streamer-config.xml`
```
<util:properties id="kafkaProperties" location="file:kafka_properties_path/kafka.properties"/>
```
NOTE: link:https://kafka.apache.org/documentation/#consumerconfigs_request.timeout.ms[request.timeout.ms] Kafka consumer property is mandatory for streamer configuration.
=== Metrics
[cols="35%,65%",opts="header"]
|===
|Name |Description
| `EventsReceivedCount` | Count of events received from Kafka.
| `LastEventReceivedTime` | Timestamp of last received event from Kafka.
| `EventsSentCount` | Count of events sent to destination cluster.
| `LastBatchSentTime` | Timestamp of last sent batch to the destination cluster.
| `MarkersCount` | Count of metadata markers received from Kafka.
|===
==== Logging
`kafka-to-ignite.sh` uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file.
== Fault tolerance
It expected that CDC streamers will be configured with the `onlyPrimary=false` in most real-world deployments to ensure fault-tolerance.
That means streamer will send the same change several times equal to `CacheConfiguration#backups` + 1.