blob: 74e77192828ac7f9774c00b46241374702e25de6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc.kafka;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.ignite.cdc.AbstractReplicationTest;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_TOPIC;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Tests for kafka replication.
*/
public class CdcKafkaReplicationTest extends AbstractReplicationTest {
/** */
public static final String SRC_DEST_TOPIC = "source-dest";
/** */
public static final String DEST_SRC_TOPIC = "dest-source";
/** */
private static EmbeddedKafkaCluster KAFKA = null;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
if (KAFKA == null) {
KAFKA = new EmbeddedKafkaCluster(1);
KAFKA.start();
}
KAFKA.createTopic(DFLT_TOPIC, DFLT_PARTS, 1);
KAFKA.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
KAFKA.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
KAFKA.deleteTopic(DFLT_TOPIC);
KAFKA.deleteTopic(SRC_DEST_TOPIC);
KAFKA.deleteTopic(DEST_SRC_TOPIC);
waitForCondition(() -> {
Set<String> topics = KAFKA.getAllTopicsInCluster();
return !topics.contains(DFLT_TOPIC) && !topics.contains(SRC_DEST_TOPIC) && !topics.contains(DEST_SRC_TOPIC);
}, getTestTimeout());
}
/** {@inheritDoc} */
@Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (IgniteEx ex : srcCluster)
futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, ACTIVE_PASSIVE_CACHE));
for (int i = 0; i < destCluster.length; i++) {
futs.add(kafkaToIgnite(
ACTIVE_PASSIVE_CACHE,
DFLT_TOPIC,
destClusterCliCfg[i],
i * (DFLT_PARTS / 2),
(i + 1) * (DFLT_PARTS / 2)
));
}
return futs;
}
/** {@inheritDoc} */
@Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (IgniteEx ex : srcCluster)
futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, ACTIVE_ACTIVE_CACHE));
for (IgniteEx ex : destCluster)
futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, ACTIVE_ACTIVE_CACHE));
futs.add(kafkaToIgnite(ACTIVE_ACTIVE_CACHE, SRC_DEST_TOPIC, destClusterCliCfg[0], 0, DFLT_PARTS));
futs.add(kafkaToIgnite(ACTIVE_ACTIVE_CACHE, DEST_SRC_TOPIC, srcClusterCliCfg[0], 0, DFLT_PARTS));
return futs;
}
/**
* @param igniteCfg Ignite configuration.
* @param topic Kafka topic name.
* @param cache Cache name to stream to kafka.
* @return Future for Change Data Capture application.
*/
protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
return runAsync(() -> {
IgniteToKafkaCdcStreamer cdcCnsmr =
new IgniteToKafkaCdcStreamer(topic, DFLT_PARTS, Collections.singleton(cache), KEYS_CNT, false, kafkaProperties());
CdcConfiguration cdcCfg = new CdcConfiguration();
cdcCfg.setConsumer(cdcCnsmr);
new CdcMain(igniteCfg, null, cdcCfg).run();
});
}
/**
* @param cacheName Cache name.
* @param igniteCfg Ignite configuration.
* @return Future for runed {@link KafkaToIgniteCdcStreamer}.
*/
protected IgniteInternalFuture<?> kafkaToIgnite(
String cacheName,
String topic,
IgniteConfiguration igniteCfg,
int fromPart,
int toPart
) {
KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration();
cfg.setKafkaPartsFrom(fromPart);
cfg.setKafkaPartsTo(toPart);
cfg.setThreadCount((toPart - fromPart)/2);
cfg.setCaches(Collections.singletonList(cacheName));
cfg.setTopic(topic);
return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
}
/** */
protected Properties kafkaProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
return props;
}
}