blob: e9bc77e83ccfa012a2d4fdc032b68d5ddb325f87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.testutils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/** A external context for {@link DynamicKafkaSource} connector testing framework. */
public class DynamicKafkaSourceExternalContext implements DataStreamSourceExternalContext<String> {
private static final Logger logger =
LoggerFactory.getLogger(DynamicKafkaSourceExternalContext.class);
private static final int NUM_TEST_RECORDS_PER_SPLIT = 10;
private static final int NUM_PARTITIONS = 1;
private static final Pattern STREAM_ID_PATTERN = Pattern.compile("stream-[0-9]+");
private final List<URL> connectorJarPaths;
private final Set<KafkaStream> kafkaStreams = new HashSet<>();
private final Map<String, Properties> clusterPropertiesMap;
private final List<SplitDataWriter> splitDataWriters = new ArrayList<>();
// add random suffix to alleviate race conditions with Kafka deleting topics
private final long randomTopicSuffix;
public DynamicKafkaSourceExternalContext(
List<String> bootstrapServerList, List<URL> connectorJarPaths) {
this.connectorJarPaths = connectorJarPaths;
Properties propertiesForCluster0 = new Properties();
propertiesForCluster0.setProperty(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerList.get(0));
Properties propertiesForCluster1 = new Properties();
propertiesForCluster1.setProperty(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerList.get(1));
this.clusterPropertiesMap =
ImmutableMap.of(
"cluster0", propertiesForCluster0, "cluster1", propertiesForCluster1);
this.randomTopicSuffix = ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
}
@Override
public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings)
throws UnsupportedOperationException {
final DynamicKafkaSourceBuilder<String> builder = DynamicKafkaSource.builder();
builder.setStreamPattern(STREAM_ID_PATTERN)
.setKafkaMetadataService(new MockKafkaMetadataService(kafkaStreams))
.setGroupId("DynamicKafkaSourceExternalContext")
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
if (sourceSettings.getBoundedness().equals(Boundedness.BOUNDED)) {
builder.setBounded(OffsetsInitializer.latest());
}
return builder.build();
}
@Override
public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
TestingSourceSettings sourceSettings) {
int suffix = splitDataWriters.size();
List<Tuple2<String, String>> clusterTopics = setupSplits(String.valueOf(suffix));
SplitDataWriter splitDataWriter = new SplitDataWriter(clusterPropertiesMap, clusterTopics);
this.splitDataWriters.add(splitDataWriter);
return splitDataWriter;
}
private List<Tuple2<String, String>> setupSplits(String suffix) {
KafkaStream kafkaStream = getKafkaStream(suffix + randomTopicSuffix);
logger.info("Setting up splits for {}", kafkaStream);
List<Tuple2<String, String>> clusterTopics =
kafkaStream.getClusterMetadataMap().entrySet().stream()
.flatMap(
entry ->
entry.getValue().getTopics().stream()
.map(topic -> Tuple2.of(entry.getKey(), topic)))
.collect(Collectors.toList());
for (Tuple2<String, String> clusterTopic : clusterTopics) {
String cluster = clusterTopic.f0;
String topic = clusterTopic.f1;
KafkaTestEnvironmentImpl.createNewTopic(
topic, NUM_PARTITIONS, 1, clusterPropertiesMap.get(cluster));
}
kafkaStreams.add(kafkaStream);
return clusterTopics;
}
private KafkaStream getKafkaStream(String suffix) {
return new KafkaStream(
"stream-" + suffix,
ImmutableMap.of(
"cluster0",
new ClusterMetadata(
ImmutableSet.of("topic0-" + suffix, "topic1-" + suffix),
clusterPropertiesMap.get("cluster0")),
"cluster1",
new ClusterMetadata(
ImmutableSet.of("topic2-" + suffix, "topic3-" + suffix),
clusterPropertiesMap.get("cluster1"))));
}
@Override
public List<String> generateTestData(
TestingSourceSettings sourceSettings, int splitIndex, long seed) {
return IntStream.range(0, NUM_TEST_RECORDS_PER_SPLIT * NUM_PARTITIONS)
.boxed()
.map(num -> Integer.toString(num))
.collect(Collectors.toList());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public List<URL> getConnectorJarPaths() {
return connectorJarPaths;
}
@Override
public void close() throws Exception {
// need to clear topics
Map<String, List<String>> clusterTopics = new HashMap<>();
for (SplitDataWriter splitDataWriter : splitDataWriters) {
for (Tuple2<String, String> clusterTopic : splitDataWriter.getClusterTopics()) {
clusterTopics
.computeIfAbsent(clusterTopic.f0, unused -> new ArrayList<>())
.add(clusterTopic.f1);
}
}
for (Map.Entry<String, List<String>> entry : clusterTopics.entrySet()) {
String cluster = entry.getKey();
List<String> topics = entry.getValue();
try (AdminClient adminClient = AdminClient.create(clusterPropertiesMap.get(cluster))) {
adminClient.deleteTopics(topics).all().get();
CommonTestUtils.waitUtil(
() -> {
try {
return adminClient.listTopics().listings().get().stream()
.map(TopicListing::name)
.noneMatch(topics::contains);
} catch (Exception e) {
logger.warn("Exception caught when listing Kafka topics", e);
return false;
}
},
Duration.ofSeconds(30),
String.format("Topics %s were not deleted within timeout", topics));
}
logger.info("topics {} are deleted from {}", topics, cluster);
}
}
private static class SplitDataWriter implements ExternalSystemSplitDataWriter<String> {
private final Map<String, Properties> clusterPropertiesMap;
private final List<Tuple2<String, String>> clusterTopics;
public SplitDataWriter(
Map<String, Properties> clusterPropertiesMap,
List<Tuple2<String, String>> clusterTopics) {
this.clusterPropertiesMap = clusterPropertiesMap;
this.clusterTopics = clusterTopics;
}
@Override
public void writeRecords(List<String> records) {
int counter = 0;
try {
for (Tuple2<String, String> clusterTopic : clusterTopics) {
String cluster = clusterTopic.f0;
String topic = clusterTopic.f1;
List<ProducerRecord<String, String>> producerRecords = new ArrayList<>();
for (int j = 0; j < NUM_PARTITIONS; j++) {
for (int k = 0; k < NUM_TEST_RECORDS_PER_SPLIT; k++) {
if (records.size() <= counter) {
break;
}
producerRecords.add(
new ProducerRecord<>(topic, j, null, records.get(counter++)));
}
}
logger.info("Writing producer records: {}", producerRecords);
DynamicKafkaSourceTestHelper.produceToKafka(
clusterPropertiesMap.get(cluster),
producerRecords,
StringSerializer.class,
StringSerializer.class);
}
} catch (Throwable e) {
throw new RuntimeException("Failed to produce test data", e);
}
}
@Override
public void close() throws Exception {}
public List<Tuple2<String, String>> getClusterTopics() {
return clusterTopics;
}
}
}