[FLINK-34466] Lineage interfaces for kafka connector (#130)
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
new file mode 100644
index 0000000..e1c6823
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
@@ -0,0 +1,65 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
+
+import java.util.Objects;
+import java.util.Properties;
+
+/** Default implementation of {@link KafkaDatasetFacet}. */
+@PublicEvolving
+public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet {
+
+ public static final String KAFKA_FACET_NAME = "kafka";
+
+ private Properties properties;
+
+ private final KafkaDatasetIdentifier topicIdentifier;
+
+ public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier, Properties properties) {
+ this(topicIdentifier);
+
+ this.properties = new Properties();
+ KafkaPropertiesUtil.copyProperties(properties, this.properties);
+ }
+
+ public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier) {
+ this.topicIdentifier = topicIdentifier;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = new Properties();
+ KafkaPropertiesUtil.copyProperties(properties, this.properties);
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public KafkaDatasetIdentifier getTopicIdentifier() {
+ return topicIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultKafkaDatasetFacet that = (DefaultKafkaDatasetFacet) o;
+ return Objects.equals(properties, that.properties)
+ && Objects.equals(topicIdentifier, that.topicIdentifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, topicIdentifier);
+ }
+
+ @Override
+ public String name() {
+ return KAFKA_FACET_NAME;
+ }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java
new file mode 100644
index 0000000..cd97b7f
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java
@@ -0,0 +1,59 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+/** Default implementation of {@link KafkaDatasetIdentifier}. */
+@PublicEvolving
+public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier {
+
+ @Nullable private final List<String> topics;
+ @Nullable private final Pattern topicPattern;
+
+ private DefaultKafkaDatasetIdentifier(
+ @Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
+ this.topics = fixedTopics;
+ this.topicPattern = topicPattern;
+ }
+
+ public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) {
+ return new DefaultKafkaDatasetIdentifier(null, pattern);
+ }
+
+ public static DefaultKafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {
+ return new DefaultKafkaDatasetIdentifier(fixedTopics, null);
+ }
+
+ @Nullable
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ @Nullable
+ public Pattern getTopicPattern() {
+ return topicPattern;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultKafkaDatasetIdentifier that = (DefaultKafkaDatasetIdentifier) o;
+ return Objects.equals(topics, that.topics)
+ && Objects.equals(topicPattern, that.topicPattern);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topics, topicPattern);
+ }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java
new file mode 100644
index 0000000..d9475d7
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java
@@ -0,0 +1,44 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Objects;
+
+/** Default implementation of {@link KafkaDatasetFacet}. */
+@PublicEvolving
+public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
+
+ public static final String TYPE_FACET_NAME = "type";
+
+ private final TypeInformation typeInformation;
+
+ public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
+ this.typeInformation = typeInformation;
+ }
+
+ public TypeInformation getTypeInformation() {
+ return typeInformation;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
+ return Objects.equals(typeInformation, that.typeInformation);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(typeInformation);
+ }
+
+ @Override
+ public String name() {
+ return TYPE_FACET_NAME;
+ }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
new file mode 100644
index 0000000..c0d3d0b
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Properties;
+
+/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */
+@PublicEvolving
+public interface KafkaDatasetFacet extends LineageDatasetFacet {
+ Properties getProperties();
+
+ KafkaDatasetIdentifier getTopicIdentifier();
+
+ void setProperties(Properties properties);
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java
new file mode 100644
index 0000000..76fe41b
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/** Contains method to extract {@link KafkaDatasetFacet}. */
+@PublicEvolving
+public interface KafkaDatasetFacetProvider {
+
+ /**
+ * Returns a Kafka dataset facet or empty in case an implementing class is not able to identify
+ * a dataset.
+ */
+ Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java
new file mode 100644
index 0000000..19f7082
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java
@@ -0,0 +1,30 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */
+@PublicEvolving
+public interface KafkaDatasetIdentifier {
+ @Nullable
+ List<String> getTopics();
+
+ @Nullable
+ Pattern getTopicPattern();
+
+ /**
+ * Assigns lineage dataset's name which is topic pattern if it is present or comma separated
+ * list of topics.
+ */
+ default String toLineageName() {
+ if (getTopicPattern() != null) {
+ return getTopicPattern().toString();
+ }
+ return String.join(",", Objects.requireNonNull(getTopics()));
+ }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java
new file mode 100644
index 0000000..1389fea
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/** Contains method which allows extracting topic identifier. */
+@PublicEvolving
+public interface KafkaDatasetIdentifierProvider {
+
+ /**
+ * Gets Kafka dataset identifier or empty in case a class implementing is not able to extract
+ * dataset identifier.
+ */
+ Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java
new file mode 100644
index 0000000..086303e
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing lineage objects. */
+public class LineageUtil {
+
+ private static final String KAFKA_DATASET_PREFIX = "kafka://";
+ private static final String COMMA = ",";
+ private static final String SEMICOLON = ";";
+
+ public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
+ return datasetOf(namespace, kafkaDatasetFacet, Collections.emptyList());
+ }
+
+ public static LineageDataset datasetOf(
+ String namespace, KafkaDatasetFacet kafkaDatasetFacet, TypeDatasetFacet typeFacet) {
+ return datasetOf(namespace, kafkaDatasetFacet, Collections.singletonList(typeFacet));
+ }
+
+ private static LineageDataset datasetOf(
+ String namespace,
+ KafkaDatasetFacet kafkaDatasetFacet,
+ List<LineageDatasetFacet> facets) {
+ return new LineageDataset() {
+ @Override
+ public String name() {
+ return kafkaDatasetFacet.getTopicIdentifier().toLineageName();
+ }
+
+ @Override
+ public String namespace() {
+ return namespace;
+ }
+
+ @Override
+ public Map<String, LineageDatasetFacet> facets() {
+ Map<String, LineageDatasetFacet> facetMap = new HashMap<>();
+ facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
+ facetMap.putAll(
+ facets.stream()
+ .collect(
+ Collectors.toMap(LineageDatasetFacet::name, item -> item)));
+ return facetMap;
+ }
+ };
+ }
+
+ public static String namespaceOf(Properties properties) {
+ String bootstrapServers = properties.getProperty("bootstrap.servers");
+
+ if (bootstrapServers == null) {
+ return KAFKA_DATASET_PREFIX;
+ }
+
+ if (bootstrapServers.contains(COMMA)) {
+ bootstrapServers = bootstrapServers.split(COMMA)[0];
+ } else if (bootstrapServers.contains(SEMICOLON)) {
+ bootstrapServers = bootstrapServers.split(SEMICOLON)[0];
+ }
+
+ return String.format(KAFKA_DATASET_PREFIX + bootstrapServers);
+ }
+
+ public static SourceLineageVertex sourceLineageVertexOf(Collection<LineageDataset> datasets) {
+ return new SourceLineageVertex() {
+ @Override
+ public Boundedness boundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public List<LineageDataset> datasets() {
+ return datasets.stream().collect(Collectors.toList());
+ }
+ };
+ }
+
+ public static LineageVertex lineageVertexOf(Collection<LineageDataset> datasets) {
+ return new LineageVertex() {
+ @Override
+ public List<LineageDataset> datasets() {
+ return datasets.stream().collect(Collectors.toList());
+ }
+ };
+ }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java
new file mode 100644
index 0000000..1e64f58
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java
@@ -0,0 +1,11 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+/** Facet definition to contain type information of source and sink. */
+@PublicEvolving
+public interface TypeDatasetFacet extends LineageDatasetFacet {
+ TypeInformation getTypeInformation();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java
new file mode 100644
index 0000000..016a1bb
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/** Contains method to extract {@link TypeDatasetFacet}. */
+@PublicEvolving
+public interface TypeDatasetFacetProvider {
+
+ /**
+ * Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to
+ * resolve type.
+ */
+ Optional<TypeDatasetFacet> getTypeDatasetFacet();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
index 9d081c7..f56a7da 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
@@ -29,7 +29,10 @@
/**
* A serialization schema which defines how to convert a value of type {@code T} to {@link
- * ProducerRecord}.
+ * ProducerRecord}. {@link org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider} can
+ * be implemented to provide Kafka specific lineage metadata, while {@link
+ * org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider} can be implemented to provide
+ * lineage metadata with type information.
*
* @param <T> the type of values being serialized
*/
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index e9fc413..0fba3a3 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -19,16 +19,32 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
+import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import com.google.common.reflect.TypeToken;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;
@@ -79,6 +95,7 @@
*/
@PublicEvolving
public class KafkaRecordSerializationSchemaBuilder<IN> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
@Nullable private Function<? super IN, String> topicSelector;
@Nullable private SerializationSchema<? super IN> valueSerializationSchema;
@@ -122,7 +139,8 @@
public KafkaRecordSerializationSchemaBuilder<IN> setTopic(String topic) {
checkState(this.topicSelector == null, "Topic selector already set.");
checkNotNull(topic);
- this.topicSelector = new CachingTopicSelector<>((e) -> topic);
+
+ this.topicSelector = new ConstantTopicSelector<>(topic);
return this;
}
@@ -283,7 +301,29 @@
checkState(keySerializationSchema == null, "Key serializer already set.");
}
- private static class CachingTopicSelector<IN> implements Function<IN, String>, Serializable {
+ private static class ConstantTopicSelector<IN>
+ implements Function<IN, String>, Serializable, KafkaDatasetIdentifierProvider {
+
+ private String topic;
+
+ ConstantTopicSelector(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public String apply(IN in) {
+ return topic;
+ }
+
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+ return Optional.of(
+ DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList(topic)));
+ }
+ }
+
+ private static class CachingTopicSelector<IN>
+ implements Function<IN, String>, KafkaDatasetIdentifierProvider, Serializable {
private static final int CACHE_RESET_SIZE = 5;
private final Map<IN, String> cache;
@@ -303,10 +343,21 @@
}
return topic;
}
+
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+ if (topicSelector instanceof KafkaDatasetIdentifierProvider) {
+ return ((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier();
+ } else {
+ return Optional.empty();
+ }
+ }
}
private static class KafkaRecordSerializationSchemaWrapper<IN>
- implements KafkaRecordSerializationSchema<IN> {
+ implements KafkaDatasetFacetProvider,
+ KafkaRecordSerializationSchema<IN>,
+ TypeDatasetFacetProvider {
private final SerializationSchema<? super IN> valueSerializationSchema;
private final Function<? super IN, String> topicSelector;
private final KafkaPartitioner<? super IN> partitioner;
@@ -369,5 +420,46 @@
value,
headerProvider != null ? headerProvider.getHeaders(element) : null);
}
+
+ @Override
+ public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+ if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+ LOG.info("Cannot identify topics. Not an TopicsIdentifierProvider");
+ return Optional.empty();
+ }
+
+ Optional<DefaultKafkaDatasetIdentifier> topicsIdentifier =
+ ((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier();
+
+ if (!topicsIdentifier.isPresent()) {
+ LOG.info("No topics' identifiers provided");
+ return Optional.empty();
+ }
+
+ return Optional.of(new DefaultKafkaDatasetFacet(topicsIdentifier.get()));
+ }
+
+ @Override
+ public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
+ if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+ return Optional.of(
+ new DefaultTypeDatasetFacet(
+ ((ResultTypeQueryable<?>) this.valueSerializationSchema)
+ .getProducedType()));
+ } else {
+ // gets type information from serialize method signature
+ TypeToken serializationSchemaType =
+ TypeToken.of(valueSerializationSchema.getClass());
+ Class parameterType =
+ serializationSchemaType
+ .resolveType(SerializationSchema.class.getTypeParameters()[0])
+ .getRawType();
+ if (parameterType != Object.class) {
+ return Optional.of(
+ new DefaultTypeDatasetFacet(TypeInformation.of(parameterType)));
+ }
+ }
+ return Optional.empty();
+ }
}
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index d5b1c37..d3d3c89 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -22,11 +22,22 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.LineageUtil;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -54,8 +65,9 @@
*/
@PublicEvolving
public class KafkaSink<IN>
- implements TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable> {
-
+ implements LineageVertexProvider,
+ TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
private final DeliveryGuarantee deliveryGuarantee;
private final KafkaRecordSerializationSchema<IN> recordSerializer;
@@ -132,4 +144,42 @@
protected Properties getKafkaProducerConfig() {
return kafkaProducerConfig;
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ // enrich dataset facet with properties
+ Optional<KafkaDatasetFacet> kafkaDatasetFacet;
+ if (recordSerializer instanceof KafkaDatasetFacetProvider) {
+ kafkaDatasetFacet =
+ ((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet();
+
+ if (!kafkaDatasetFacet.isPresent()) {
+ LOG.info("Provider did not return kafka dataset facet");
+ return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+ }
+ kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
+ } else {
+ LOG.info(
+ "recordSerializer does not implement KafkaDatasetFacetProvider: {}",
+ recordSerializer);
+ return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+ }
+
+ String namespace = LineageUtil.namespaceOf(kafkaProducerConfig);
+
+ Optional<TypeDatasetFacet> typeDatasetFacet = Optional.empty();
+ if (recordSerializer instanceof TypeDatasetFacetProvider) {
+ typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet();
+ }
+
+ if (typeDatasetFacet.isPresent()) {
+ return LineageUtil.sourceLineageVertexOf(
+ Collections.singleton(
+ LineageUtil.datasetOf(
+ namespace, kafkaDatasetFacet.get(), typeDatasetFacet.get())));
+ }
+
+ return LineageUtil.sourceLineageVertexOf(
+ Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get())));
+ }
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 54f5f85..3930275 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -33,6 +33,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.lineage.LineageUtil;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
@@ -48,15 +53,20 @@
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.SerializableSupplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
@@ -87,8 +97,10 @@
*/
@PublicEvolving
public class KafkaSource<OUT>
- implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
+ implements LineageVertexProvider,
+ Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
ResultTypeQueryable<OUT> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private static final long serialVersionUID = -8755372893283732098L;
// Users can choose only one of the following ways to specify the topics to consume from.
private final KafkaSubscriber subscriber;
@@ -251,4 +263,31 @@
OffsetsInitializer getStoppingOffsetsInitializer() {
return stoppingOffsetsInitializer;
}
+
+ @Override
+ public SourceLineageVertex getLineageVertex() {
+ if (!(subscriber instanceof KafkaDatasetIdentifierProvider)) {
+ LOG.info("unable to determine topic identifier");
+ return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+ }
+
+ Optional<DefaultKafkaDatasetIdentifier> topicsIdentifier =
+ ((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier();
+
+ if (!topicsIdentifier.isPresent()) {
+ LOG.info("No topics' identifier returned from subscriber");
+ return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+ }
+
+ DefaultKafkaDatasetFacet kafkaDatasetFacet =
+ new DefaultKafkaDatasetFacet(topicsIdentifier.get(), props);
+
+ String namespace = LineageUtil.namespaceOf(props);
+ return LineageUtil.sourceLineageVertexOf(
+ Collections.singletonList(
+ LineageUtil.datasetOf(
+ namespace,
+ kafkaDatasetFacet,
+ new DefaultTypeDatasetFacet(getProducedType()))));
+ }
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 1b819fb..37de884 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -39,6 +39,10 @@
*
* <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
* three types of subscribing mode.
+ *
+ * <p>When implementing a subscriber, {@link
+ * org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider} can be implemented to
+ * provide lineage metadata with source topics.
*/
@PublicEvolving
public interface KafkaSubscriber extends Serializable {
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
index 3423b0f..9cd50fb 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
@@ -18,6 +18,9 @@
package org.apache.flink.connector.kafka.source.enumerator.subscriber;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
@@ -26,13 +29,14 @@
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
/** A subscriber for a partition set. */
-class PartitionSetSubscriber implements KafkaSubscriber {
+class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
private static final long serialVersionUID = 390970375272146036L;
private static final Logger LOG = LoggerFactory.getLogger(PartitionSetSubscriber.class);
private final Set<TopicPartition> subscribedPartitions;
@@ -73,4 +77,14 @@
private boolean partitionExistsInTopic(TopicPartition partition, TopicDescription topic) {
return topic.partitions().size() > partition.partition();
}
+
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+ return Optional.of(
+ DefaultKafkaDatasetIdentifier.ofTopics(
+ subscribedPartitions.stream()
+ .map(TopicPartition::topic)
+ .distinct()
+ .collect(Collectors.toList())));
+ }
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
index b2ad844..e86ade0 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
@@ -18,6 +18,9 @@
package org.apache.flink.connector.kafka.source.enumerator.subscriber;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
@@ -28,6 +31,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
@@ -36,7 +40,7 @@
* A subscriber to a fixed list of topics. The subscribed topics must have existed in the Kafka
* cluster, otherwise an exception will be thrown.
*/
-class TopicListSubscriber implements KafkaSubscriber {
+class TopicListSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
private static final long serialVersionUID = -6917603843104947866L;
private static final Logger LOG = LoggerFactory.getLogger(TopicListSubscriber.class);
private final List<String> topics;
@@ -60,4 +64,9 @@
return subscribedPartitions;
}
+
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+ return Optional.of(DefaultKafkaDatasetIdentifier.ofTopics(topics));
+ }
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
index 985ca71..208959e 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
@@ -18,6 +18,9 @@
package org.apache.flink.connector.kafka.source.enumerator.subscriber;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
@@ -27,13 +30,14 @@
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
/** A subscriber to a topic pattern. */
-class TopicPatternSubscriber implements KafkaSubscriber {
+class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
private static final long serialVersionUID = -7471048577725467797L;
private static final Logger LOG = LoggerFactory.getLogger(TopicPatternSubscriber.class);
private final Pattern topicPattern;
@@ -60,4 +64,9 @@
return subscribedTopicPartitions;
}
+
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+ return Optional.of(DefaultKafkaDatasetIdentifier.ofPattern(topicPattern));
+ }
}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java
new file mode 100644
index 0000000..8693998
--- /dev/null
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java
@@ -0,0 +1,74 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LineageUtil}. */
+public class LineageUtilTest {
+ @Test
+ public void testSourceLineageVertexOf() {
+ LineageDataset dataset = new TestingLineageDataset();
+ SourceLineageVertex sourceLineageVertex =
+ LineageUtil.sourceLineageVertexOf(Collections.singletonList(dataset));
+
+ assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+ assertThat(sourceLineageVertex.datasets()).containsExactly(dataset);
+ }
+
+ @Test
+ public void testDatasetNamespaceOf() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "my-kafka-host");
+
+ assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host");
+ }
+
+ @Test
+ public void testDatasetNamespaceOfWithSemicolon() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "my-kafka-host1;my-kafka-host2");
+
+ assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1");
+ }
+
+ @Test
+ public void testDatasetNamespaceOfWithComma() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "my-kafka-host1,my-kafka-host2");
+
+ assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1");
+ }
+
+ @Test
+ public void testDatasetNamespaceWhenNoBootstrapServersProperty() {
+ Properties properties = new Properties();
+ assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://");
+ }
+
+ private static class TestingLineageDataset implements LineageDataset {
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public String namespace() {
+ return null;
+ }
+
+ @Override
+ public Map<String, LineageDatasetFacet> facets() {
+ return null;
+ }
+ }
+}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
index 701f9c8..4d14372 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
@@ -19,6 +19,15 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.TestLogger;
@@ -31,6 +40,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +50,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@@ -53,6 +64,13 @@
private static Map<String, ?> configurableConfiguration;
private static Map<String, ?> configuration;
+
+ private interface TestingTopicSelector<T>
+ extends TopicSelector<T>, KafkaDatasetIdentifierProvider {}
+
+ private interface SerializationSchemaWithResultQueryable<T>
+ extends SerializationSchema<T>, ResultTypeQueryable<T> {}
+
private static boolean isKeySerializer;
@Before
@@ -256,6 +274,134 @@
assertThat(recordWithInvalidTimestamp.timestamp()).isNull();
}
+ @Test
+ public void testGetLineageDatasetFacetsWhenTopicSelectorNotKafkaTopicsIdentifierProvider() {
+ SerializationSchema<String> serializationSchema = new SimpleStringSchema();
+ KafkaRecordSerializationSchema<String> schema =
+ KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((TopicSelector<Object>) o -> DEFAULT_TOPIC)
+ .setValueSerializationSchema(serializationSchema)
+ .setKeySerializationSchema(serializationSchema)
+ .build();
+
+ assertThat(schema)
+ .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class))
+ .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet);
+ }
+
+ @Test
+ public void testGetLineageDatasetFacetsWhenNoTopicsIdentifiersFound() {
+ SerializationSchema<String> serializationSchema = new SimpleStringSchema();
+ KafkaRecordSerializationSchema<String> schema =
+ KafkaRecordSerializationSchema.builder()
+ .setTopicSelector(
+ new TestingTopicSelector<Object>() {
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier>
+ getDatasetIdentifier() {
+ return Optional.empty();
+ }
+
+ @Override
+ public String apply(Object o) {
+ return DEFAULT_TOPIC;
+ }
+ })
+ .setValueSerializationSchema(serializationSchema)
+ .setKeySerializationSchema(serializationSchema)
+ .build();
+ assertThat(schema)
+ .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class))
+ .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet);
+ }
+
+ @Test
+ public void testGetLineageDatasetFacetsValueSerializationSchemaIsResultTypeQueryable() {
+ TypeInformation<String> stringTypeInformation = TypeInformation.of(String.class);
+ SerializationSchemaWithResultQueryable<String> serializationSchema =
+ new SerializationSchemaWithResultQueryable<String>() {
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return stringTypeInformation;
+ }
+
+ @Override
+ public byte[] serialize(String o) {
+ return new byte[0];
+ }
+ };
+
+ KafkaRecordSerializationSchema<String> schema =
+ KafkaRecordSerializationSchema.builder()
+ .setTopicSelector(
+ new TestingTopicSelector<Object>() {
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier>
+ getDatasetIdentifier() {
+ return Optional.of(
+ DefaultKafkaDatasetIdentifier.ofTopics(
+ Arrays.asList("topic1", "topic2")));
+ }
+
+ @Override
+ public String apply(Object o) {
+ return DEFAULT_TOPIC;
+ }
+ })
+ .setValueSerializationSchema(serializationSchema)
+ .setKeySerializationSchema(serializationSchema)
+ .build();
+
+ Optional<KafkaDatasetFacet> kafkaDatasetFacet =
+ ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet();
+
+ assertThat(kafkaDatasetFacet).isPresent();
+ assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics())
+ .containsExactly("topic1", "topic2");
+ assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
+ .isPresent()
+ .get()
+ .extracting(TypeDatasetFacet::getTypeInformation)
+ .isEqualTo(stringTypeInformation);
+ }
+
+ @Test
+ public void testGetLineageDatasetFacets() {
+ KafkaRecordSerializationSchema<String> schema =
+ KafkaRecordSerializationSchema.builder()
+ .setTopicSelector(
+ new TestingTopicSelector<Object>() {
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier>
+ getDatasetIdentifier() {
+ return Optional.of(
+ DefaultKafkaDatasetIdentifier.ofTopics(
+ Arrays.asList("topic1", "topic2")));
+ }
+
+ @Override
+ public String apply(Object o) {
+ return DEFAULT_TOPIC;
+ }
+ })
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .build();
+
+ Optional<KafkaDatasetFacet> kafkaDatasetFacet =
+ ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet();
+
+ assertThat(kafkaDatasetFacet).isPresent();
+ assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics())
+ .containsExactly("topic1", "topic2");
+ assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
+ .isPresent()
+ .get()
+ .extracting(TypeDatasetFacet::getTypeInformation)
+ .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO);
+ }
+
private static void assertOnlyOneSerializerAllowed(
List<
Function<
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
new file mode 100644
index 0000000..1efb6ec
--- /dev/null
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
@@ -0,0 +1,144 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KafkaSink}. */
+public class KafkaSinkTest {
+
+ Properties kafkaProperties;
+
+ @BeforeEach
+ void setup() {
+ kafkaProperties = new Properties();
+ kafkaProperties.put("bootstrap.servers", "host1;host2");
+ }
+
+ @Test
+ public void testGetLineageVertexWhenSerializerNotAnKafkaDatasetFacetProvider() {
+ KafkaRecordSerializationSchema recordSerializer =
+ new KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider();
+ KafkaSink sink =
+ new KafkaSink(
+ DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer);
+
+ assertThat(sink.getLineageVertex().datasets()).isEmpty();
+ }
+
+ @Test
+ public void testGetLineageVertexWhenNoKafkaDatasetFacetReturnedFromSerializer() {
+ KafkaRecordSerializationSchema recordSerializer =
+ new KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider();
+
+ KafkaSink sink =
+ new KafkaSink(
+ DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer);
+
+ assertThat(sink.getLineageVertex().datasets()).isEmpty();
+ }
+
+ @Test
+ public void testGetLineageVertex() {
+ KafkaRecordSerializationSchema recordSerializer =
+ new TestingKafkaRecordSerializationSchema();
+
+ KafkaSink sink =
+ new KafkaSink(
+ DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", recordSerializer);
+
+ LineageVertex lineageVertex = sink.getLineageVertex();
+
+ assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1");
+ assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1");
+
+ assertThat(
+ lineageVertex
+ .datasets()
+ .get(0)
+ .facets()
+ .get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME))
+ .hasFieldOrPropertyWithValue("properties", kafkaProperties)
+ .hasFieldOrPropertyWithValue(
+ "topicIdentifier",
+ DefaultKafkaDatasetIdentifier.ofTopics(
+ Collections.singletonList("topic1")));
+
+ assertThat(
+ lineageVertex
+ .datasets()
+ .get(0)
+ .facets()
+ .get(DefaultTypeDatasetFacet.TYPE_FACET_NAME))
+ .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class));
+ }
+
+ private static class KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider
+ implements KafkaRecordSerializationSchema {
+ @Nullable
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(
+ Object element, KafkaSinkContext context, Long timestamp) {
+ return null;
+ }
+ }
+
+ private static class KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider
+ implements KafkaRecordSerializationSchema, KafkaDatasetFacetProvider {
+ @Nullable
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(
+ Object element, KafkaSinkContext context, Long timestamp) {
+ return null;
+ }
+
+ @Override
+ public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+ return Optional.empty();
+ }
+ }
+
+ private static class TestingKafkaRecordSerializationSchema
+ implements KafkaRecordSerializationSchema,
+ KafkaDatasetFacetProvider,
+ TypeDatasetFacetProvider {
+
+ @Override
+ public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+ return Optional.of(
+ new DefaultKafkaDatasetFacet(
+ DefaultKafkaDatasetIdentifier.ofTopics(
+ Collections.singletonList("topic1"))));
+ }
+
+ @Nullable
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(
+ Object element, KafkaSinkContext context, Long timestamp) {
+ return null;
+ }
+
+ @Override
+ public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
+ return Optional.of(new DefaultTypeDatasetFacet(TypeInformation.of(String.class)));
+ }
+ }
+}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
new file mode 100644
index 0000000..259668c
--- /dev/null
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KafkaSource}. */
+public class KafkaSourceTest {
+ Properties kafkaProperties;
+
+ @BeforeEach
+ void setup() {
+ kafkaProperties = new Properties();
+ kafkaProperties.put("bootstrap.servers", "host1;host2");
+ }
+
+ @Test
+ public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() {
+ KafkaSource<String> source =
+ new KafkaSourceBuilder<String>()
+ .setKafkaSubscriber(
+ new KafkaSubscriber() {
+ @Override
+ public Set<TopicPartition> getSubscribedTopicPartitions(
+ AdminClient adminClient) {
+ return null;
+ }
+ })
+ .setProperties(kafkaProperties)
+ .setGroupId("")
+ .setDeserializer(
+ new KafkaRecordDeserializationSchema<String>() {
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return null;
+ }
+
+ @Override
+ public void deserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Collector<String> out)
+ throws IOException {}
+ })
+ .setUnbounded(OffsetsInitializer.committedOffsets())
+ .build();
+
+ assertThat(source.getLineageVertex())
+ .extracting(LineageVertex::datasets)
+ .asList()
+ .isEmpty();
+ }
+
+ @Test
+ public void testGetLineageVertexWhenNoKafkaTopicsIdentifier() {
+ KafkaSource<String> source =
+ new KafkaSourceBuilder<String>()
+ .setKafkaSubscriber(
+ new TestingKafkaSubscriber() {
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier>
+ getDatasetIdentifier() {
+ return Optional.empty();
+ }
+ })
+ .setProperties(kafkaProperties)
+ .setGroupId("")
+ .setDeserializer(
+ new KafkaRecordDeserializationSchema<String>() {
+ @Override
+ public void deserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Collector<String> out)
+ throws IOException {}
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeInformation.of(String.class);
+ }
+ })
+ .setUnbounded(OffsetsInitializer.committedOffsets())
+ .build();
+ assertThat(source.getLineageVertex())
+ .extracting(LineageVertex::datasets)
+ .asList()
+ .isEmpty();
+ }
+
+ @Test
+ public void testGetLineageVertex() {
+ TypeInformation<String> typeInformation = TypeInformation.of(String.class);
+ KafkaSource<String> source =
+ new KafkaSourceBuilder<String>()
+ .setKafkaSubscriber(new TestingKafkaSubscriber())
+ .setProperties(kafkaProperties)
+ .setGroupId("")
+ .setDeserializer(
+ new KafkaRecordDeserializationSchema<String>() {
+ @Override
+ public void deserialize(
+ ConsumerRecord<byte[], byte[]> record,
+ Collector<String> out)
+ throws IOException {}
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return typeInformation;
+ }
+ })
+ .setUnbounded(OffsetsInitializer.committedOffsets())
+ .build();
+
+ LineageVertex lineageVertex = source.getLineageVertex();
+ assertThat(lineageVertex.datasets()).hasSize(1);
+ LineageDataset dataset = lineageVertex.datasets().get(0);
+
+ assertThat(dataset.namespace()).isEqualTo("kafka://host1");
+ assertThat(dataset.name()).isEqualTo("topic1");
+
+ assertThat(dataset.facets()).containsKey(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
+ DefaultKafkaDatasetFacet kafkaFacet =
+ (DefaultKafkaDatasetFacet)
+ dataset.facets().get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
+
+ assertThat(kafkaFacet.getProperties()).containsEntry("bootstrap.servers", "host1;host2");
+
+ assertThat(dataset.facets()).containsKey(DefaultTypeDatasetFacet.TYPE_FACET_NAME);
+ assertThat(dataset.facets().get(DefaultTypeDatasetFacet.TYPE_FACET_NAME))
+ .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class));
+ }
+
+ private static class TestingKafkaSubscriber
+ implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
+ @Override
+ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+ return Optional.of(
+ DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList("topic1")));
+ }
+
+ @Override
+ public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
+ return null;
+ }
+ }
+}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 258c1c0..4c5a502 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.connector.kafka.source.enumerator.subscriber;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
import org.apache.kafka.clients.admin.AdminClient;
@@ -71,6 +73,8 @@
new HashSet<>(KafkaSourceTestEnv.getPartitionsForTopics(topics));
assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions);
+ assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
+ .isEqualTo(DefaultKafkaDatasetIdentifier.ofTopics(topics));
}
@Test
@@ -86,8 +90,8 @@
@Test
public void testTopicPatternSubscriber() {
- KafkaSubscriber subscriber =
- KafkaSubscriber.getTopicPatternSubscriber(Pattern.compile("pattern.*"));
+ Pattern pattern = Pattern.compile("pattern.*");
+ KafkaSubscriber subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern);
final Set<TopicPartition> subscribedPartitions =
subscriber.getSubscribedTopicPartitions(adminClient);
@@ -96,6 +100,8 @@
KafkaSourceTestEnv.getPartitionsForTopics(Collections.singleton(TOPIC2)));
assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions);
+ assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
+ .isEqualTo(DefaultKafkaDatasetIdentifier.ofPattern(pattern));
}
@Test
@@ -111,6 +117,8 @@
subscriber.getSubscribedTopicPartitions(adminClient);
assertThat(subscribedPartitions).isEqualTo(partitions);
+ assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
+ .isEqualTo(DefaultKafkaDatasetIdentifier.ofTopics(topics));
}
@Test