blob: 459c229d22f5b02019d6e0f3eb99bbeb73ef0870 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc.kafka;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/** */
public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
/** */
public static final String INSTANCE_NAME = "INSTANCE_NAME";
/** */
public static final String DISCO_PORT = "DISCO_PORT";
/** */
public static final String DISCO_PORT_RANGE = "DISCO_PORT_RANGE";
/** */
public static final String REPLICATED_CACHE = "REPLICATED_CACHE";
/** */
public static final String TOPIC = "TOPIC";
/** */
public static final String CONSISTENT_ID = "CONSISTENT_ID";
/** */
public static final String PARTS = "PARTS";
/** */
public static final String PARTS_FROM = "PARTS_FROM";
/** */
public static final String PARTS_TO = "PARTS_TO";
/** */
public static final String THREAD_CNT = "THREAD_CNT";
/** */
public static final String MAX_BATCH_SIZE = "MAX_BATCH_SIZE";
/** */
public static final String PROPS_PATH = "PROPS_PATH";
/** */
private String kafkaPropsPath = null;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
if (kafkaPropsPath == null) {
File file = File.createTempFile("kafka", "properties");
file.deleteOnExit();
try (FileOutputStream fos = new FileOutputStream(file)) {
kafkaProperties().store(fos, null);
}
kafkaPropsPath = "file://" + file.getAbsolutePath();
}
}
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
Map<String, String> params = new HashMap<>();
params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
params.put(REPLICATED_CACHE, cache);
params.put(TOPIC, topic);
params.put(CONSISTENT_ID, String.valueOf(igniteCfg.getConsistentId()));
params.put(PARTS, Integer.toString(DFLT_PARTS));
params.put(MAX_BATCH_SIZE, Integer.toString(KEYS_CNT));
params.put(PROPS_PATH, kafkaPropsPath);
return runAsync(
() -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)})
);
}
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<?> kafkaToIgnite(
String cacheName,
String topic,
IgniteConfiguration igniteCfg,
int partFrom,
int partTo
) {
Map<String, String> params = new HashMap<>();
int discoPort = getFieldValue(igniteCfg.getDiscoverySpi(), "locPort");
params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
params.put(DISCO_PORT, Integer.toString(discoPort));
params.put(DISCO_PORT_RANGE, Integer.toString(discoPort + DFLT_PORT_RANGE));
params.put(REPLICATED_CACHE, cacheName);
params.put(TOPIC, topic);
params.put(PROPS_PATH, kafkaPropsPath);
params.put(PARTS_FROM, Integer.toString(partFrom));
params.put(PARTS_TO, Integer.toString(partTo));
params.put(THREAD_CNT, Integer.toString((partTo - partFrom) / 3));
return runAsync(
() -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig("/replication/kafka-to-ignite.xml", params)})
);
}
/** */
private String prepareConfig(String path, Map<String, String> params) {
try {
String cfg = new String(Files.readAllBytes(Paths.get(CdcKafkaReplicationAppsTest.class.getResource(path).toURI())));
for (String key : params.keySet()) {
String subst = '{' + key + '}';
while (cfg.contains(subst))
cfg = cfg.replace(subst, params.get(key));
}
File file = File.createTempFile("ignite-config", "xml");
file.deleteOnExit();
try (PrintWriter out = new PrintWriter(file)) {
out.print(cfg);
}
return file.getAbsolutePath();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}