blob: af961695a327dc1524d490a23d616ea72b333727 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc.kafka;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.resources.LoggerResource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
/**
* Change Data Consumer that streams all data changes to Kafka topic.
* {@link CdcEvent} spread across Kafka topic partitions with {@code {ignite_partition} % {kafka_topic_count}} formula.
* Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
* It expected that {@code ignite-cdc} will be configured for automatic restarts with the OS tool to failover temporary errors
* such as Kafka unavailability or network issues.
*
* If you have plans to apply written messages to the other Ignite cluster in active-active manner,
* e.g. concurrent updates of the same entry in other cluster is possible,
* please, be aware of {@link CacheVersionConflictResolverImpl} conflict resolved.
* Configuration of {@link CacheVersionConflictResolverImpl} can be found in {@link KafkaToIgniteCdcStreamer} documentation.
*
* @see CdcMain
* @see KafkaToIgniteCdcStreamer
* @see CacheVersionConflictResolverImpl
*/
public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Default kafka request timeout in seconds. */
public static final int DFLT_REQ_TIMEOUT = 5;
/** Bytes sent metric name. */
public static final String BYTES_SENT = "BytesSent";
/** Bytes sent metric description. */
public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent.";
/** Log. */
@LoggerResource
private IgniteLogger log;
/** Kafka producer to stream events. */
private KafkaProducer<Integer, byte[]> producer;
/** Handle only primary entry flag. */
private final boolean onlyPrimary;
/** Topic name. */
private final String topic;
/** Kafka topic partitions count. */
private final int kafkaParts;
/** Kafka properties. */
private final Properties kafkaProps;
/** Cache IDs. */
private final Set<Integer> cachesIds;
/** Max batch size. */
private final int maxBatchSize;
/** Timestamp of last sent message. */
private AtomicLongMetric lastMsgTs;
/** Count of bytes sent to the Kafka. */
private AtomicLongMetric bytesSnt;
/** Count of sent messages. */
private AtomicLongMetric msgsSnt;
/**
* @param topic Topic name.
* @param kafkaParts Kafka partitions count.
* @param caches Cache names.
* @param maxBatchSize Maximum size of records concurrently sent to Kafka.
* @param onlyPrimary If {@code true} then stream only events from primaries.
* @param kafkaProps Kafka properties.
*/
public IgniteToKafkaCdcStreamer(
String topic,
int kafkaParts,
Set<String> caches,
int maxBatchSize,
boolean onlyPrimary,
Properties kafkaProps
) {
assert caches != null && !caches.isEmpty();
this.topic = topic;
this.kafkaParts = kafkaParts;
this.onlyPrimary = onlyPrimary;
this.kafkaProps = kafkaProps;
this.maxBatchSize = maxBatchSize;
cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());
kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
}
/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
List<Future<RecordMetadata>> futs = new ArrayList<>();
while (evts.hasNext() && futs.size() < maxBatchSize) {
CdcEvent evt = evts.next();
if (log.isDebugEnabled())
log.debug("Event received [evt=" + evt + ']');
if (onlyPrimary && !evt.primary()) {
if (log.isDebugEnabled())
log.debug("Event skipped because of primary flag [evt=" + evt + ']');
continue;
}
if (evt.version().otherClusterVersion() != null) {
if (log.isDebugEnabled()) {
log.debug("Event skipped because of version [evt=" + evt +
", otherClusterVersion=" + evt.version().otherClusterVersion() + ']');
}
continue;
}
if (!cachesIds.isEmpty() && !cachesIds.contains(evt.cacheId())) {
if (log.isDebugEnabled())
log.debug("Event skipped because of cacheId [evt=" + evt + ']');
continue;
}
byte[] bytes = IgniteUtils.toBytes(evt);
bytesSnt.add(bytes.length);
futs.add(producer.send(new ProducerRecord<>(
topic,
evt.partition() % kafkaParts,
evt.cacheId(),
bytes
)));
if (log.isDebugEnabled())
log.debug("Event sent asynchronously [evt=" + evt + ']');
}
if (!futs.isEmpty()) {
try {
for (Future<RecordMetadata> fut : futs)
fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
msgsSnt.add(futs.size());
lastMsgTs.value(System.currentTimeMillis());
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
if (log.isInfoEnabled())
log.info("Events processed [sentMessagesCount=" + msgsSnt.value() + ']');
}
return true;
}
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
try {
producer = new KafkaProducer<>(kafkaProps);
if (log.isInfoEnabled())
log.info("CDC Ignite To Kafka started [topic=" + topic + ", onlyPrimary=" + onlyPrimary + ", cacheIds=" + cachesIds + ']');
}
catch (Exception e) {
throw new RuntimeException(e);
}
this.msgsSnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.lastMsgTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
this.bytesSnt = mreg.longMetric(BYTES_SENT, BYTES_SENT_DESCRIPTION);
}
/** {@inheritDoc} */
@Override public void stop() {
producer.close();
}
}