blob: e62a6f43fc4af2a864f34f5d96b9626ecbbf833e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import static org.apache.ignite.internal.IgniteKernal.NL;
import static org.apache.ignite.internal.IgniteKernal.SITE;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
/**
* Main class of Kafka to Ignite application.
* This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
* Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
* and apply {@link CdcEvent} to Ignite.
* <p>
* Each applier receive even number of kafka topic partition to read.
* <p>
* In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
* It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
* such as Kafka or Ignite unavailability.
* <p>
* To resolve possible update conflicts (in case of concurrent update in source and destination Ignite clusters)
* real-world deployments should use some conflict resolver, for example {@link CacheVersionConflictResolverImpl}.
* Example of Ignite configuration with the conflict resolver:
* <pre>
* {@code
* CacheVersionConflictResolverCachePluginProvider conflictPlugin = new CacheVersionConflictResolverCachePluginProvider();
*
* conflictPlugin.setClusterId(clusterId); // Cluster id.
* conflictPlugin.setCaches(new HashSet<>(Arrays.asList("my-cache", "some-other-cache"))); // Caches to replicate.
*
* IgniteConfiguration cfg = ...;
*
* cfg.setPluginProviders(conflictPlugin);
* }
* </pre>
* Please, see {@link CacheConflictResolutionManagerImpl} for additional information.
*
* @see CdcMain
* @see IgniteToKafkaCdcStreamer
* @see CdcEvent
* @see KafkaToIgniteCdcStreamerApplier
* @see CacheConflictResolutionManagerImpl
*/
public class KafkaToIgniteCdcStreamer implements Runnable {
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
/** Kafka consumer properties. */
private final Properties kafkaProps;
/** Streamer configuration. */
private final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
/** Runners to run {@link KafkaToIgniteCdcStreamerApplier} instances. */
private final Thread[] runners;
/** Appliers. */
private final List<KafkaToIgniteCdcStreamerApplier> appliers;
/**
* @param igniteCfg Ignite configuration.
* @param kafkaProps Kafka properties.
* @param streamerCfg Streamer configuration.
*/
public KafkaToIgniteCdcStreamer(
IgniteConfiguration igniteCfg,
Properties kafkaProps,
KafkaToIgniteCdcStreamerConfiguration streamerCfg
) {
this.igniteCfg = igniteCfg;
this.kafkaProps = kafkaProps;
this.streamerCfg = streamerCfg;
appliers = new ArrayList<>(streamerCfg.getThreadCount());
runners = new Thread[streamerCfg.getThreadCount()];
if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG);
kafkaProps.put(KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
kafkaProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
/** {@inheritDoc} */
@Override public void run() {
try {
runx();
}
catch (Exception e) {
throw new IgniteException(e);
}
}
/** */
private void runx() throws Exception {
U.initWorkDir(igniteCfg);
IgniteLogger log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
igniteCfg.setGridLogger(log);
ackAsciiLogo(log);
try (IgniteEx ign = (IgniteEx)Ignition.start(igniteCfg)) {
AtomicBoolean stopped = new AtomicBoolean();
Set<Integer> caches = null;
if (!F.isEmpty(streamerCfg.getCaches())) {
caches = streamerCfg.getCaches().stream()
.peek(cache -> Objects.requireNonNull(ign.cache(cache), cache + " not exists!"))
.map(CU::cacheId).collect(Collectors.toSet());
}
int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
int threadCnt = streamerCfg.getThreadCount();
assert kafkaParts >= threadCnt
: "Threads count bigger then kafka partitions count [kafkaParts=" + kafkaParts + ",threadCount=" + threadCnt + ']';
int partPerApplier = kafkaParts / threadCnt;
for (int i = 0; i < threadCnt; i++) {
int from = i * partPerApplier;
int to = (i + 1) * partPerApplier;
if (i == threadCnt - 1)
to = kafkaParts;
KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
ign,
log,
kafkaProps,
streamerCfg.getTopic(),
kafkaPartsFrom + from,
kafkaPartsFrom + to,
caches,
streamerCfg.getMaxBatchSize(),
stopped
);
appliers.add(applier);
runners[i] = new Thread(applier, "applier-thread-" + i);
runners[i].start();
}
try {
for (int i = 0; i < threadCnt; i ++)
runners[i].join();
}
catch (InterruptedException e) {
stopped.set(true);
appliers.forEach(U::closeQuiet);
log.warning("Kafka to Ignite streamer interrupted", e);
}
}
}
/** */
private void ackAsciiLogo(IgniteLogger log) {
String ver = "ver. " + ACK_VER_STR;
if (log.isInfoEnabled()) {
log.info(NL + NL +
">>> __ _____ ______ _____ __________ __________ ________________" + NL +
">>> / //_/ _ | / __/ //_/ _ | /_ __/ __ \\ / _/ ___/ |/ / _/_ __/ __/" + NL +
">>> / ,< / __ |/ _// ,< / __ | / / / /_/ / _/ // (_ / // / / / / _/ " + NL +
">>> /_/|_/_/ |_/_/ /_/|_/_/ |_| /_/ \\____/ /___/\\___/_/|_/___/ /_/ /___/ " + NL +
">>> " + NL +
">>> " + NL +
">>> " + ver + NL +
">>> " + COPYRIGHT + NL +
">>> " + NL +
">>> Ignite documentation: " + "http://" + SITE + NL +
">>> Kafka topic: " + streamerCfg.getTopic() + NL +
">>> Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo() + NL
);
}
if (log.isQuiet()) {
U.quiet(false,
" __ _____ ______ _____ __________ __________ ________________",
" / //_/ _ | / __/ //_/ _ | /_ __/ __ \\ / _/ ___/ |/ / _/_ __/ __/",
" / ,< / __ |/ _// ,< / __ | / / / /_/ / _/ // (_ / // / / / / _/ ",
"/_/|_/_/ |_/_/ /_/|_/_/ |_| /_/ \\____/ /___/\\___/_/|_/___/ /_/ /___/ ",
"",
ver,
COPYRIGHT,
"",
"Ignite documentation: " + "http://" + SITE,
"Kafka topic: " + streamerCfg.getTopic(),
"Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo(),
"",
"Quiet mode.");
String fileName = log.fileName();
if (fileName != null)
U.quiet(false, " ^-- Logging to file '" + fileName + '\'');
if (log instanceof GridLoggerProxy)
U.quiet(false, " ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');
U.quiet(false,
" ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to kafka-to-ignite.{sh|bat}",
"");
}
}
}