blob: acb2aebe7cab22d6d9b547fce257d28ff65e1321 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.cdc.kafka;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.CdcEventsApplier;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.META_UPDATE_MARKER;
* Thread that polls message from the Kafka topic partitions and applies those messages to the Ignite caches.
* It expected that messages was written to the Kafka by the {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
* <p>
* Each {@code Applier} receive set of Kafka topic partitions to read and caches to process.
* Applier creates consumer per partition because Kafka consumer reads not fair,
* consumer reads messages from specific partition while there is new messages in specific partition.
* See <a href=
* "">KIP-387</a>
* and <a href="">KAFKA-3932</a> for further information.
* All consumers should belongs to the same consumer-group to ensure consistent reading.
* Applier polls messages from each consumer in round-robin fashion.
* <p>
* Messages applied to Ignite using {@link IgniteInternalCache#putAllConflict(Map)}, {@link IgniteInternalCache#removeAllConflict(Map)}
* these methods allows to provide {@link GridCacheVersion} of the entry to the Ignite so in case update conflicts they can be resolved
* by the {@link CacheVersionConflictResolver}.
* <p>
* In case of any error during read applier just fail.
* Fail of any applier will lead to the fail of {@link KafkaToIgniteCdcStreamer} application.
* It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
* such as Kafka or Ignite unavailability.
* @see KafkaToIgniteCdcStreamer
* @see IgniteToKafkaCdcStreamer
* @see IgniteInternalCache#putAllConflict(Map)
* @see IgniteInternalCache#removeAllConflict(Map)
* @see CacheVersionConflictResolver
* @see GridCacheVersion
* @see CdcEvent
* @see CacheEntryVersion
class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
/** Ignite instance. */
private final IgniteEx ign;
/** Log. */
private final IgniteLogger log;
/** Closed flag. Shared between all appliers. */
private final AtomicBoolean stopped;
/** Kafka properties. */
private final Properties kafkaProps;
/** Topic to read. */
private final String topic;
/** Lower kafka partition (inclusive). */
private final int kafkaPartFrom;
/** Higher kafka partition (exclusive). */
private final int kafkaPartTo;
/** Caches ids to read. */
private final Set<Integer> caches;
/** The maximum time to complete Kafka related requests, in milliseconds. */
private final long kafkaReqTimeout;
/** Metadata updater. */
private final KafkaToIgniteMetadataUpdater metaUpdr;
/** Consumers. */
private final List<KafkaConsumer<Integer, byte[]>> cnsmrs = new ArrayList<>();
/** */
private final AtomicLong rcvdEvts = new AtomicLong();
* @param ign Ignite instance.
* @param log Logger.
* @param kafkaProps Kafka properties.
* @param topic Topic name.
* @param kafkaPartFrom Read from partition.
* @param kafkaPartTo Read to partition.
* @param caches Cache ids.
* @param maxBatchSize Maximum batch size.
* @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds.
* @param metaUpdr Metadata updater.
* @param stopped Stopped flag.
public KafkaToIgniteCdcStreamerApplier(
IgniteEx ign,
IgniteLogger log,
Properties kafkaProps,
String topic,
int kafkaPartFrom,
int kafkaPartTo,
Set<Integer> caches,
int maxBatchSize,
long kafkaReqTimeout,
KafkaToIgniteMetadataUpdater metaUpdr,
AtomicBoolean stopped
) {
this.ign = ign;
this.kafkaProps = kafkaProps;
this.topic = topic;
this.kafkaPartFrom = kafkaPartFrom;
this.kafkaPartTo = kafkaPartTo;
this.caches = caches;
this.kafkaReqTimeout = kafkaReqTimeout;
this.metaUpdr = metaUpdr;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
/** {@inheritDoc} */
@Override public void run() {
try {
for (int kafkaPart = kafkaPartFrom; kafkaPart < kafkaPartTo; kafkaPart++) {
KafkaConsumer<Integer, byte[]> cnsmr = new KafkaConsumer<>(kafkaProps);
cnsmr.assign(Collections.singleton(new TopicPartition(topic, kafkaPart)));
Iterator<KafkaConsumer<Integer, byte[]>> cnsmrIter = Collections.emptyIterator();
while (!stopped.get()) {
if (!cnsmrIter.hasNext())
cnsmrIter = cnsmrs.iterator();
catch (WakeupException e) {
if (!stopped.get())
log.error("Applier wakeup error!", e);
catch (Throwable e) {
log.error("Applier error!", e);
finally {
for (KafkaConsumer<Integer, byte[]> consumer : cnsmrs) {
try {
catch (Exception e) {
log.warning("Close error!", e);
if (log.isInfoEnabled()) + " - stopped!");
* Polls data from the specific consumer and applies it to the Ignite.
* @param cnsmr Data consumer.
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
if (log.isInfoEnabled()) {
"Polled from consumer [assignments=" + cnsmr.assignment() +
", cnt=" + recs.count() +
", rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
* Filter out {@link CdcEvent} records.
* Updates metadata in case update metadata marker found.
* @param rec Record to filter.
* @return {@code True} if record should be pushed down.
private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) {
byte[] val = rec.value();
if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
return false;
return F.isEmpty(caches) || caches.contains(rec.key());
* @param rec Kafka record.
* @return CDC event.
private CdcEvent deserialize(ConsumerRecord<Integer, byte[]> rec) {
try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(rec.value()))) {
return (CdcEvent)is.readObject();
catch (IOException | ClassNotFoundException e) {
throw new IgniteException(e);
/** {@inheritDoc} */
@Override public void close() {
log.warning("Close applier!");
/** {@inheritDoc} */
@Override protected IgniteEx ignite() {
return ign;
/** {@inheritDoc} */
@Override protected IgniteLogger log() {
return log;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(KafkaToIgniteCdcStreamerApplier.class, this);