blob: c6ecfd06144a3f1c98d082126a07ff0ccb0cdfbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.MetadataUpdateEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/** Brings up multiple kafka clusters and provides utilities to setup test data. */
public class DynamicKafkaSourceTestHelper extends KafkaTestBase {
public static final int NUM_KAFKA_CLUSTERS = 2;
public static void setup() throws Throwable {
setNumKafkaClusters(NUM_KAFKA_CLUSTERS);
prepare();
}
public static void tearDown() throws Exception {
shutDownServices();
}
public static KafkaClusterTestEnvMetadata getKafkaClusterTestEnvMetadata(int kafkaClusterIdx) {
return kafkaClusters.get(kafkaClusterIdx);
}
public static MetadataUpdateEvent getMetadataUpdateEvent(String topic) {
return new MetadataUpdateEvent(Collections.singleton(getKafkaStream(topic)));
}
public static String getKafkaClusterId(int kafkaClusterIdx) {
return kafkaClusters.get(kafkaClusterIdx).getKafkaClusterId();
}
/** Stream is a topic across multiple clusters. */
public static KafkaStream getKafkaStream(String topic) {
Map<String, ClusterMetadata> clusterMetadataMap = new HashMap<>();
for (int i = 0; i < NUM_KAFKA_CLUSTERS; i++) {
KafkaClusterTestEnvMetadata kafkaClusterTestEnvMetadata =
getKafkaClusterTestEnvMetadata(i);
Set<String> topics = new HashSet<>();
topics.add(topic);
ClusterMetadata clusterMetadata =
new ClusterMetadata(
topics, kafkaClusterTestEnvMetadata.getStandardProperties());
clusterMetadataMap.put(
kafkaClusterTestEnvMetadata.getKafkaClusterId(), clusterMetadata);
}
return new KafkaStream(topic, clusterMetadataMap);
}
public static void createTopic(String topic, int numPartitions, int replicationFactor) {
for (int i = 0; i < NUM_KAFKA_CLUSTERS; i++) {
createTopic(i, topic, numPartitions, replicationFactor);
}
}
public static void createTopic(String topic, int numPartitions) {
createTopic(topic, numPartitions, 1);
}
public static void createTopic(int kafkaClusterIdx, String topic, int numPartitions) {
createTopic(kafkaClusterIdx, topic, numPartitions, 1);
}
private static void createTopic(
int kafkaClusterIdx, String topic, int numPartitions, int replicationFactor) {
kafkaClusters
.get(kafkaClusterIdx)
.getKafkaTestEnvironment()
.createTestTopic(topic, numPartitions, replicationFactor);
}
/** Produces [0, numPartitions*numRecordsPerSplit) range of records to the specified topic. */
public static List<ProducerRecord<String, Integer>> produceToKafka(
String topic, int numPartitions, int numRecordsPerSplit) throws Throwable {
List<ProducerRecord<String, Integer>> records = new ArrayList<>();
int counter = 0;
for (int kafkaClusterIdx = 0; kafkaClusterIdx < NUM_KAFKA_CLUSTERS; kafkaClusterIdx++) {
String kafkaClusterId = getKafkaClusterId(kafkaClusterIdx);
List<ProducerRecord<String, Integer>> recordsForCluster = new ArrayList<>();
for (int part = 0; part < numPartitions; part++) {
for (int i = 0; i < numRecordsPerSplit; i++) {
recordsForCluster.add(
new ProducerRecord<>(
topic,
part,
topic + "-" + part,
counter++,
Collections.singleton(
new RecordHeader(
"flink.kafka-cluster-name",
kafkaClusterId.getBytes(
StandardCharsets.UTF_8)))));
}
}
produceToKafka(kafkaClusterIdx, recordsForCluster);
records.addAll(recordsForCluster);
}
return records;
}
/**
* Produces [recordValueStartingOffset, recordValueStartingOffset +
* numPartitions*numRecordsPerSplit) range of records to the specified topic and cluster.
*/
public static int produceToKafka(
int kafkaClusterIdx,
String topic,
int numPartitions,
int numRecordsPerSplit,
int recordValueStartingOffset)
throws Throwable {
int counter = recordValueStartingOffset;
String kafkaClusterId = getKafkaClusterId(kafkaClusterIdx);
List<ProducerRecord<String, Integer>> recordsForCluster = new ArrayList<>();
for (int part = 0; part < numPartitions; part++) {
for (int i = 0; i < numRecordsPerSplit; i++) {
recordsForCluster.add(
new ProducerRecord<>(
topic,
part,
topic + "-" + part,
counter++,
Collections.singleton(
new RecordHeader(
"flink.kafka-cluster-name",
kafkaClusterId.getBytes(StandardCharsets.UTF_8)))));
}
}
produceToKafka(kafkaClusterIdx, recordsForCluster);
return counter;
}
public static void produceToKafka(
int kafkaClusterIdx, Collection<ProducerRecord<String, Integer>> records)
throws Throwable {
produceToKafka(kafkaClusterIdx, records, StringSerializer.class, IntegerSerializer.class);
}
public static <K, V> void produceToKafka(
int id,
Collection<ProducerRecord<K, V>> records,
Class<? extends org.apache.kafka.common.serialization.Serializer<K>> keySerializerClass,
Class<? extends org.apache.kafka.common.serialization.Serializer<V>>
valueSerializerClass)
throws Throwable {
produceToKafka(
kafkaClusters.get(id).getStandardProperties(),
records,
keySerializerClass,
valueSerializerClass);
}
public static <K, V> void produceToKafka(
Properties clusterProperties,
Collection<ProducerRecord<K, V>> records,
Class<? extends org.apache.kafka.common.serialization.Serializer<K>> keySerializerClass,
Class<? extends org.apache.kafka.common.serialization.Serializer<V>>
valueSerializerClass)
throws Throwable {
Properties props = new Properties();
props.putAll(clusterProperties);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName());
props.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName());
AtomicReference<Throwable> sendingError = new AtomicReference<>();
Callback callback =
(metadata, exception) -> {
if (exception != null) {
if (!sendingError.compareAndSet(null, exception)) {
sendingError.get().addSuppressed(exception);
}
}
};
try (KafkaProducer<K, V> producer = new KafkaProducer<>(props)) {
for (ProducerRecord<K, V> record : records) {
producer.send(record, callback).get();
}
}
if (sendingError.get() != null) {
throw sendingError.get();
}
}
}