[FLINK-34466] Lineage interfaces for kafka connector (#130)

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
new file mode 100644
index 0000000..e1c6823
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
@@ -0,0 +1,65 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
+
+import java.util.Objects;
+import java.util.Properties;
+
+/** Default implementation of {@link KafkaDatasetFacet}. */
+@PublicEvolving
+public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet {
+
+    public static final String KAFKA_FACET_NAME = "kafka";
+
+    private Properties properties;
+
+    private final KafkaDatasetIdentifier topicIdentifier;
+
+    public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier, Properties properties) {
+        this(topicIdentifier);
+
+        this.properties = new Properties();
+        KafkaPropertiesUtil.copyProperties(properties, this.properties);
+    }
+
+    public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier) {
+        this.topicIdentifier = topicIdentifier;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = new Properties();
+        KafkaPropertiesUtil.copyProperties(properties, this.properties);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public KafkaDatasetIdentifier getTopicIdentifier() {
+        return topicIdentifier;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultKafkaDatasetFacet that = (DefaultKafkaDatasetFacet) o;
+        return Objects.equals(properties, that.properties)
+                && Objects.equals(topicIdentifier, that.topicIdentifier);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(properties, topicIdentifier);
+    }
+
+    @Override
+    public String name() {
+        return KAFKA_FACET_NAME;
+    }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java
new file mode 100644
index 0000000..cd97b7f
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java
@@ -0,0 +1,59 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+/** Default implementation of {@link KafkaDatasetIdentifier}. */
+@PublicEvolving
+public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier {
+
+    @Nullable private final List<String> topics;
+    @Nullable private final Pattern topicPattern;
+
+    private DefaultKafkaDatasetIdentifier(
+            @Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
+        this.topics = fixedTopics;
+        this.topicPattern = topicPattern;
+    }
+
+    public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) {
+        return new DefaultKafkaDatasetIdentifier(null, pattern);
+    }
+
+    public static DefaultKafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {
+        return new DefaultKafkaDatasetIdentifier(fixedTopics, null);
+    }
+
+    @Nullable
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    @Nullable
+    public Pattern getTopicPattern() {
+        return topicPattern;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultKafkaDatasetIdentifier that = (DefaultKafkaDatasetIdentifier) o;
+        return Objects.equals(topics, that.topics)
+                && Objects.equals(topicPattern, that.topicPattern);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topics, topicPattern);
+    }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java
new file mode 100644
index 0000000..d9475d7
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java
@@ -0,0 +1,44 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Objects;
+
+/** Default implementation of {@link KafkaDatasetFacet}. */
+@PublicEvolving
+public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
+
+    public static final String TYPE_FACET_NAME = "type";
+
+    private final TypeInformation typeInformation;
+
+    public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
+        this.typeInformation = typeInformation;
+    }
+
+    public TypeInformation getTypeInformation() {
+        return typeInformation;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
+        return Objects.equals(typeInformation, that.typeInformation);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(typeInformation);
+    }
+
+    @Override
+    public String name() {
+        return TYPE_FACET_NAME;
+    }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
new file mode 100644
index 0000000..c0d3d0b
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.Properties;
+
+/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */
+@PublicEvolving
+public interface KafkaDatasetFacet extends LineageDatasetFacet {
+    Properties getProperties();
+
+    KafkaDatasetIdentifier getTopicIdentifier();
+
+    void setProperties(Properties properties);
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java
new file mode 100644
index 0000000..76fe41b
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/** Contains method to extract {@link KafkaDatasetFacet}. */
+@PublicEvolving
+public interface KafkaDatasetFacetProvider {
+
+    /**
+     * Returns a Kafka dataset facet or empty in case an implementing class is not able to identify
+     * a dataset.
+     */
+    Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java
new file mode 100644
index 0000000..19f7082
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java
@@ -0,0 +1,30 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */
+@PublicEvolving
+public interface KafkaDatasetIdentifier {
+    @Nullable
+    List<String> getTopics();
+
+    @Nullable
+    Pattern getTopicPattern();
+
+    /**
+     * Assigns lineage dataset's name which is topic pattern if it is present or comma separated
+     * list of topics.
+     */
+    default String toLineageName() {
+        if (getTopicPattern() != null) {
+            return getTopicPattern().toString();
+        }
+        return String.join(",", Objects.requireNonNull(getTopics()));
+    }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java
new file mode 100644
index 0000000..1389fea
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/** Contains method which allows extracting topic identifier. */
+@PublicEvolving
+public interface KafkaDatasetIdentifierProvider {
+
+    /**
+     * Gets Kafka dataset identifier or empty in case a class implementing is not able to extract
+     * dataset identifier.
+     */
+    Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java
new file mode 100644
index 0000000..086303e
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/** Utility class with useful methods for managing lineage objects. */
+public class LineageUtil {
+
+    private static final String KAFKA_DATASET_PREFIX = "kafka://";
+    private static final String COMMA = ",";
+    private static final String SEMICOLON = ";";
+
+    public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
+        return datasetOf(namespace, kafkaDatasetFacet, Collections.emptyList());
+    }
+
+    public static LineageDataset datasetOf(
+            String namespace, KafkaDatasetFacet kafkaDatasetFacet, TypeDatasetFacet typeFacet) {
+        return datasetOf(namespace, kafkaDatasetFacet, Collections.singletonList(typeFacet));
+    }
+
+    private static LineageDataset datasetOf(
+            String namespace,
+            KafkaDatasetFacet kafkaDatasetFacet,
+            List<LineageDatasetFacet> facets) {
+        return new LineageDataset() {
+            @Override
+            public String name() {
+                return kafkaDatasetFacet.getTopicIdentifier().toLineageName();
+            }
+
+            @Override
+            public String namespace() {
+                return namespace;
+            }
+
+            @Override
+            public Map<String, LineageDatasetFacet> facets() {
+                Map<String, LineageDatasetFacet> facetMap = new HashMap<>();
+                facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
+                facetMap.putAll(
+                        facets.stream()
+                                .collect(
+                                        Collectors.toMap(LineageDatasetFacet::name, item -> item)));
+                return facetMap;
+            }
+        };
+    }
+
+    public static String namespaceOf(Properties properties) {
+        String bootstrapServers = properties.getProperty("bootstrap.servers");
+
+        if (bootstrapServers == null) {
+            return KAFKA_DATASET_PREFIX;
+        }
+
+        if (bootstrapServers.contains(COMMA)) {
+            bootstrapServers = bootstrapServers.split(COMMA)[0];
+        } else if (bootstrapServers.contains(SEMICOLON)) {
+            bootstrapServers = bootstrapServers.split(SEMICOLON)[0];
+        }
+
+        return String.format(KAFKA_DATASET_PREFIX + bootstrapServers);
+    }
+
+    public static SourceLineageVertex sourceLineageVertexOf(Collection<LineageDataset> datasets) {
+        return new SourceLineageVertex() {
+            @Override
+            public Boundedness boundedness() {
+                return Boundedness.CONTINUOUS_UNBOUNDED;
+            }
+
+            @Override
+            public List<LineageDataset> datasets() {
+                return datasets.stream().collect(Collectors.toList());
+            }
+        };
+    }
+
+    public static LineageVertex lineageVertexOf(Collection<LineageDataset> datasets) {
+        return new LineageVertex() {
+            @Override
+            public List<LineageDataset> datasets() {
+                return datasets.stream().collect(Collectors.toList());
+            }
+        };
+    }
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java
new file mode 100644
index 0000000..1e64f58
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java
@@ -0,0 +1,11 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+/** Facet definition to contain type information of source and sink. */
+@PublicEvolving
+public interface TypeDatasetFacet extends LineageDatasetFacet {
+    TypeInformation getTypeInformation();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java
new file mode 100644
index 0000000..016a1bb
--- /dev/null
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java
@@ -0,0 +1,16 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/** Contains method to extract {@link TypeDatasetFacet}. */
+@PublicEvolving
+public interface TypeDatasetFacetProvider {
+
+    /**
+     * Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to
+     * resolve type.
+     */
+    Optional<TypeDatasetFacet> getTypeDatasetFacet();
+}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
index 9d081c7..f56a7da 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
@@ -29,7 +29,10 @@
 
 /**
  * A serialization schema which defines how to convert a value of type {@code T} to {@link
- * ProducerRecord}.
+ * ProducerRecord}. {@link org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider} can
+ * be implemented to provide Kafka specific lineage metadata, while {@link
+ * org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider} can be implemented to provide
+ * lineage metadata with type information.
  *
  * @param <T> the type of values being serialized
  */
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index e9fc413..0fba3a3 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -19,16 +19,32 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
+import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 
+import com.google.common.reflect.TypeToken;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.function.Function;
 
@@ -79,6 +95,7 @@
  */
 @PublicEvolving
 public class KafkaRecordSerializationSchemaBuilder<IN> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
 
     @Nullable private Function<? super IN, String> topicSelector;
     @Nullable private SerializationSchema<? super IN> valueSerializationSchema;
@@ -122,7 +139,8 @@
     public KafkaRecordSerializationSchemaBuilder<IN> setTopic(String topic) {
         checkState(this.topicSelector == null, "Topic selector already set.");
         checkNotNull(topic);
-        this.topicSelector = new CachingTopicSelector<>((e) -> topic);
+
+        this.topicSelector = new ConstantTopicSelector<>(topic);
         return this;
     }
 
@@ -283,7 +301,29 @@
         checkState(keySerializationSchema == null, "Key serializer already set.");
     }
 
-    private static class CachingTopicSelector<IN> implements Function<IN, String>, Serializable {
+    private static class ConstantTopicSelector<IN>
+            implements Function<IN, String>, Serializable, KafkaDatasetIdentifierProvider {
+
+        private String topic;
+
+        ConstantTopicSelector(String topic) {
+            this.topic = topic;
+        }
+
+        @Override
+        public String apply(IN in) {
+            return topic;
+        }
+
+        @Override
+        public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+            return Optional.of(
+                    DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList(topic)));
+        }
+    }
+
+    private static class CachingTopicSelector<IN>
+            implements Function<IN, String>, KafkaDatasetIdentifierProvider, Serializable {
 
         private static final int CACHE_RESET_SIZE = 5;
         private final Map<IN, String> cache;
@@ -303,10 +343,21 @@
             }
             return topic;
         }
+
+        @Override
+        public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+            if (topicSelector instanceof KafkaDatasetIdentifierProvider) {
+                return ((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier();
+            } else {
+                return Optional.empty();
+            }
+        }
     }
 
     private static class KafkaRecordSerializationSchemaWrapper<IN>
-            implements KafkaRecordSerializationSchema<IN> {
+            implements KafkaDatasetFacetProvider,
+                    KafkaRecordSerializationSchema<IN>,
+                    TypeDatasetFacetProvider {
         private final SerializationSchema<? super IN> valueSerializationSchema;
         private final Function<? super IN, String> topicSelector;
         private final KafkaPartitioner<? super IN> partitioner;
@@ -369,5 +420,46 @@
                     value,
                     headerProvider != null ? headerProvider.getHeaders(element) : null);
         }
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+                LOG.info("Cannot identify topics. Not an TopicsIdentifierProvider");
+                return Optional.empty();
+            }
+
+            Optional<DefaultKafkaDatasetIdentifier> topicsIdentifier =
+                    ((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier();
+
+            if (!topicsIdentifier.isPresent()) {
+                LOG.info("No topics' identifiers provided");
+                return Optional.empty();
+            }
+
+            return Optional.of(new DefaultKafkaDatasetFacet(topicsIdentifier.get()));
+        }
+
+        @Override
+        public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
+            if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+                return Optional.of(
+                        new DefaultTypeDatasetFacet(
+                                ((ResultTypeQueryable<?>) this.valueSerializationSchema)
+                                        .getProducedType()));
+            } else {
+                // gets type information from serialize method signature
+                TypeToken serializationSchemaType =
+                        TypeToken.of(valueSerializationSchema.getClass());
+                Class parameterType =
+                        serializationSchemaType
+                                .resolveType(SerializationSchema.class.getTypeParameters()[0])
+                                .getRawType();
+                if (parameterType != Object.class) {
+                    return Optional.of(
+                            new DefaultTypeDatasetFacet(TypeInformation.of(parameterType)));
+                }
+            }
+            return Optional.empty();
+        }
     }
 }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index d5b1c37..d3d3c89 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -22,11 +22,22 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.LineageUtil;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -54,8 +65,9 @@
  */
 @PublicEvolving
 public class KafkaSink<IN>
-        implements TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable> {
-
+        implements LineageVertexProvider,
+                TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
     private final DeliveryGuarantee deliveryGuarantee;
 
     private final KafkaRecordSerializationSchema<IN> recordSerializer;
@@ -132,4 +144,42 @@
     protected Properties getKafkaProducerConfig() {
         return kafkaProducerConfig;
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        // enrich dataset facet with properties
+        Optional<KafkaDatasetFacet> kafkaDatasetFacet;
+        if (recordSerializer instanceof KafkaDatasetFacetProvider) {
+            kafkaDatasetFacet =
+                    ((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet();
+
+            if (!kafkaDatasetFacet.isPresent()) {
+                LOG.info("Provider did not return kafka dataset facet");
+                return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+            }
+            kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
+        } else {
+            LOG.info(
+                    "recordSerializer does not implement KafkaDatasetFacetProvider: {}",
+                    recordSerializer);
+            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+        }
+
+        String namespace = LineageUtil.namespaceOf(kafkaProducerConfig);
+
+        Optional<TypeDatasetFacet> typeDatasetFacet = Optional.empty();
+        if (recordSerializer instanceof TypeDatasetFacetProvider) {
+            typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet();
+        }
+
+        if (typeDatasetFacet.isPresent()) {
+            return LineageUtil.sourceLineageVertexOf(
+                    Collections.singleton(
+                            LineageUtil.datasetOf(
+                                    namespace, kafkaDatasetFacet.get(), typeDatasetFacet.get())));
+        }
+
+        return LineageUtil.sourceLineageVertexOf(
+                Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get())));
+    }
 }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 54f5f85..3930275 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -33,6 +33,11 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.lineage.LineageUtil;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
@@ -48,15 +53,20 @@
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Consumer;
@@ -87,8 +97,10 @@
  */
 @PublicEvolving
 public class KafkaSource<OUT>
-        implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
+        implements LineageVertexProvider,
+                Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
                 ResultTypeQueryable<OUT> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
     private static final long serialVersionUID = -8755372893283732098L;
     // Users can choose only one of the following ways to specify the topics to consume from.
     private final KafkaSubscriber subscriber;
@@ -251,4 +263,31 @@
     OffsetsInitializer getStoppingOffsetsInitializer() {
         return stoppingOffsetsInitializer;
     }
+
+    @Override
+    public SourceLineageVertex getLineageVertex() {
+        if (!(subscriber instanceof KafkaDatasetIdentifierProvider)) {
+            LOG.info("unable to determine topic identifier");
+            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+        }
+
+        Optional<DefaultKafkaDatasetIdentifier> topicsIdentifier =
+                ((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier();
+
+        if (!topicsIdentifier.isPresent()) {
+            LOG.info("No topics' identifier returned from subscriber");
+            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
+        }
+
+        DefaultKafkaDatasetFacet kafkaDatasetFacet =
+                new DefaultKafkaDatasetFacet(topicsIdentifier.get(), props);
+
+        String namespace = LineageUtil.namespaceOf(props);
+        return LineageUtil.sourceLineageVertexOf(
+                Collections.singletonList(
+                        LineageUtil.datasetOf(
+                                namespace,
+                                kafkaDatasetFacet,
+                                new DefaultTypeDatasetFacet(getProducedType()))));
+    }
 }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
index 1b819fb..37de884 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
@@ -39,6 +39,10 @@
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
  * three types of subscribing mode.
+ *
+ * <p>When implementing a subscriber, {@link
+ * org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider} can be implemented to
+ * provide lineage metadata with source topics.
  */
 @PublicEvolving
 public interface KafkaSubscriber extends Serializable {
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
index 3423b0f..9cd50fb 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.subscriber;
 
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.TopicPartition;
@@ -26,13 +29,14 @@
 
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
 
 /** A subscriber for a partition set. */
-class PartitionSetSubscriber implements KafkaSubscriber {
+class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
     private static final long serialVersionUID = 390970375272146036L;
     private static final Logger LOG = LoggerFactory.getLogger(PartitionSetSubscriber.class);
     private final Set<TopicPartition> subscribedPartitions;
@@ -73,4 +77,14 @@
     private boolean partitionExistsInTopic(TopicPartition partition, TopicDescription topic) {
         return topic.partitions().size() > partition.partition();
     }
+
+    @Override
+    public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+        return Optional.of(
+                DefaultKafkaDatasetIdentifier.ofTopics(
+                        subscribedPartitions.stream()
+                                .map(TopicPartition::topic)
+                                .distinct()
+                                .collect(Collectors.toList())));
+    }
 }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
index b2ad844..e86ade0 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.subscriber;
 
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.TopicPartition;
@@ -28,6 +31,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
@@ -36,7 +40,7 @@
  * A subscriber to a fixed list of topics. The subscribed topics must have existed in the Kafka
  * cluster, otherwise an exception will be thrown.
  */
-class TopicListSubscriber implements KafkaSubscriber {
+class TopicListSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
     private static final long serialVersionUID = -6917603843104947866L;
     private static final Logger LOG = LoggerFactory.getLogger(TopicListSubscriber.class);
     private final List<String> topics;
@@ -60,4 +64,9 @@
 
         return subscribedPartitions;
     }
+
+    @Override
+    public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+        return Optional.of(DefaultKafkaDatasetIdentifier.ofTopics(topics));
+    }
 }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
index 985ca71..208959e 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.subscriber;
 
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.TopicPartition;
@@ -27,13 +30,14 @@
 
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
 
 /** A subscriber to a topic pattern. */
-class TopicPatternSubscriber implements KafkaSubscriber {
+class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
     private static final long serialVersionUID = -7471048577725467797L;
     private static final Logger LOG = LoggerFactory.getLogger(TopicPatternSubscriber.class);
     private final Pattern topicPattern;
@@ -60,4 +64,9 @@
 
         return subscribedTopicPartitions;
     }
+
+    @Override
+    public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+        return Optional.of(DefaultKafkaDatasetIdentifier.ofPattern(topicPattern));
+    }
 }
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java
new file mode 100644
index 0000000..8693998
--- /dev/null
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java
@@ -0,0 +1,74 @@
+package org.apache.flink.connector.kafka.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LineageUtil}. */
+public class LineageUtilTest {
+    @Test
+    public void testSourceLineageVertexOf() {
+        LineageDataset dataset = new TestingLineageDataset();
+        SourceLineageVertex sourceLineageVertex =
+                LineageUtil.sourceLineageVertexOf(Collections.singletonList(dataset));
+
+        assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+        assertThat(sourceLineageVertex.datasets()).containsExactly(dataset);
+    }
+
+    @Test
+    public void testDatasetNamespaceOf() {
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", "my-kafka-host");
+
+        assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host");
+    }
+
+    @Test
+    public void testDatasetNamespaceOfWithSemicolon() {
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", "my-kafka-host1;my-kafka-host2");
+
+        assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1");
+    }
+
+    @Test
+    public void testDatasetNamespaceOfWithComma() {
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", "my-kafka-host1,my-kafka-host2");
+
+        assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1");
+    }
+
+    @Test
+    public void testDatasetNamespaceWhenNoBootstrapServersProperty() {
+        Properties properties = new Properties();
+        assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://");
+    }
+
+    private static class TestingLineageDataset implements LineageDataset {
+        @Override
+        public String name() {
+            return null;
+        }
+
+        @Override
+        public String namespace() {
+            return null;
+        }
+
+        @Override
+        public Map<String, LineageDatasetFacet> facets() {
+            return null;
+        }
+    }
+}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
index 701f9c8..4d14372 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
@@ -19,6 +19,15 @@
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.util.TestLogger;
@@ -31,6 +40,7 @@
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.assertj.core.api.InstanceOfAssertFactories;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,6 +50,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
@@ -53,6 +64,13 @@
 
     private static Map<String, ?> configurableConfiguration;
     private static Map<String, ?> configuration;
+
+    private interface TestingTopicSelector<T>
+            extends TopicSelector<T>, KafkaDatasetIdentifierProvider {}
+
+    private interface SerializationSchemaWithResultQueryable<T>
+            extends SerializationSchema<T>, ResultTypeQueryable<T> {}
+
     private static boolean isKeySerializer;
 
     @Before
@@ -256,6 +274,134 @@
         assertThat(recordWithInvalidTimestamp.timestamp()).isNull();
     }
 
+    @Test
+    public void testGetLineageDatasetFacetsWhenTopicSelectorNotKafkaTopicsIdentifierProvider() {
+        SerializationSchema<String> serializationSchema = new SimpleStringSchema();
+        KafkaRecordSerializationSchema<String> schema =
+                KafkaRecordSerializationSchema.builder()
+                        .setTopicSelector((TopicSelector<Object>) o -> DEFAULT_TOPIC)
+                        .setValueSerializationSchema(serializationSchema)
+                        .setKeySerializationSchema(serializationSchema)
+                        .build();
+
+        assertThat(schema)
+                .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class))
+                .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet);
+    }
+
+    @Test
+    public void testGetLineageDatasetFacetsWhenNoTopicsIdentifiersFound() {
+        SerializationSchema<String> serializationSchema = new SimpleStringSchema();
+        KafkaRecordSerializationSchema<String> schema =
+                KafkaRecordSerializationSchema.builder()
+                        .setTopicSelector(
+                                new TestingTopicSelector<Object>() {
+                                    @Override
+                                    public Optional<DefaultKafkaDatasetIdentifier>
+                                            getDatasetIdentifier() {
+                                        return Optional.empty();
+                                    }
+
+                                    @Override
+                                    public String apply(Object o) {
+                                        return DEFAULT_TOPIC;
+                                    }
+                                })
+                        .setValueSerializationSchema(serializationSchema)
+                        .setKeySerializationSchema(serializationSchema)
+                        .build();
+        assertThat(schema)
+                .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class))
+                .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet);
+    }
+
+    @Test
+    public void testGetLineageDatasetFacetsValueSerializationSchemaIsResultTypeQueryable() {
+        TypeInformation<String> stringTypeInformation = TypeInformation.of(String.class);
+        SerializationSchemaWithResultQueryable<String> serializationSchema =
+                new SerializationSchemaWithResultQueryable<String>() {
+
+                    @Override
+                    public TypeInformation<String> getProducedType() {
+                        return stringTypeInformation;
+                    }
+
+                    @Override
+                    public byte[] serialize(String o) {
+                        return new byte[0];
+                    }
+                };
+
+        KafkaRecordSerializationSchema<String> schema =
+                KafkaRecordSerializationSchema.builder()
+                        .setTopicSelector(
+                                new TestingTopicSelector<Object>() {
+                                    @Override
+                                    public Optional<DefaultKafkaDatasetIdentifier>
+                                            getDatasetIdentifier() {
+                                        return Optional.of(
+                                                DefaultKafkaDatasetIdentifier.ofTopics(
+                                                        Arrays.asList("topic1", "topic2")));
+                                    }
+
+                                    @Override
+                                    public String apply(Object o) {
+                                        return DEFAULT_TOPIC;
+                                    }
+                                })
+                        .setValueSerializationSchema(serializationSchema)
+                        .setKeySerializationSchema(serializationSchema)
+                        .build();
+
+        Optional<KafkaDatasetFacet> kafkaDatasetFacet =
+                ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet();
+
+        assertThat(kafkaDatasetFacet).isPresent();
+        assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics())
+                .containsExactly("topic1", "topic2");
+        assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
+                .isPresent()
+                .get()
+                .extracting(TypeDatasetFacet::getTypeInformation)
+                .isEqualTo(stringTypeInformation);
+    }
+
+    @Test
+    public void testGetLineageDatasetFacets() {
+        KafkaRecordSerializationSchema<String> schema =
+                KafkaRecordSerializationSchema.builder()
+                        .setTopicSelector(
+                                new TestingTopicSelector<Object>() {
+                                    @Override
+                                    public Optional<DefaultKafkaDatasetIdentifier>
+                                            getDatasetIdentifier() {
+                                        return Optional.of(
+                                                DefaultKafkaDatasetIdentifier.ofTopics(
+                                                        Arrays.asList("topic1", "topic2")));
+                                    }
+
+                                    @Override
+                                    public String apply(Object o) {
+                                        return DEFAULT_TOPIC;
+                                    }
+                                })
+                        .setValueSerializationSchema(new SimpleStringSchema())
+                        .setKeySerializationSchema(new SimpleStringSchema())
+                        .build();
+
+        Optional<KafkaDatasetFacet> kafkaDatasetFacet =
+                ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet();
+
+        assertThat(kafkaDatasetFacet).isPresent();
+        assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics())
+                .containsExactly("topic1", "topic2");
+        assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
+                .isPresent()
+                .get()
+                .extracting(TypeDatasetFacet::getTypeInformation)
+                .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO);
+    }
+
     private static void assertOnlyOneSerializerAllowed(
             List<
                             Function<
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
new file mode 100644
index 0000000..1efb6ec
--- /dev/null
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
@@ -0,0 +1,144 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KafkaSink}. */
+public class KafkaSinkTest {
+
+    Properties kafkaProperties;
+
+    @BeforeEach
+    void setup() {
+        kafkaProperties = new Properties();
+        kafkaProperties.put("bootstrap.servers", "host1;host2");
+    }
+
+    @Test
+    public void testGetLineageVertexWhenSerializerNotAnKafkaDatasetFacetProvider() {
+        KafkaRecordSerializationSchema recordSerializer =
+                new KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider();
+        KafkaSink sink =
+                new KafkaSink(
+                        DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer);
+
+        assertThat(sink.getLineageVertex().datasets()).isEmpty();
+    }
+
+    @Test
+    public void testGetLineageVertexWhenNoKafkaDatasetFacetReturnedFromSerializer() {
+        KafkaRecordSerializationSchema recordSerializer =
+                new KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider();
+
+        KafkaSink sink =
+                new KafkaSink(
+                        DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer);
+
+        assertThat(sink.getLineageVertex().datasets()).isEmpty();
+    }
+
+    @Test
+    public void testGetLineageVertex() {
+        KafkaRecordSerializationSchema recordSerializer =
+                new TestingKafkaRecordSerializationSchema();
+
+        KafkaSink sink =
+                new KafkaSink(
+                        DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", recordSerializer);
+
+        LineageVertex lineageVertex = sink.getLineageVertex();
+
+        assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1");
+        assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1");
+
+        assertThat(
+                        lineageVertex
+                                .datasets()
+                                .get(0)
+                                .facets()
+                                .get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME))
+                .hasFieldOrPropertyWithValue("properties", kafkaProperties)
+                .hasFieldOrPropertyWithValue(
+                        "topicIdentifier",
+                        DefaultKafkaDatasetIdentifier.ofTopics(
+                                Collections.singletonList("topic1")));
+
+        assertThat(
+                        lineageVertex
+                                .datasets()
+                                .get(0)
+                                .facets()
+                                .get(DefaultTypeDatasetFacet.TYPE_FACET_NAME))
+                .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class));
+    }
+
+    private static class KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider
+            implements KafkaRecordSerializationSchema {
+        @Nullable
+        @Override
+        public ProducerRecord<byte[], byte[]> serialize(
+                Object element, KafkaSinkContext context, Long timestamp) {
+            return null;
+        }
+    }
+
+    private static class KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider
+            implements KafkaRecordSerializationSchema, KafkaDatasetFacetProvider {
+        @Nullable
+        @Override
+        public ProducerRecord<byte[], byte[]> serialize(
+                Object element, KafkaSinkContext context, Long timestamp) {
+            return null;
+        }
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            return Optional.empty();
+        }
+    }
+
+    private static class TestingKafkaRecordSerializationSchema
+            implements KafkaRecordSerializationSchema,
+                    KafkaDatasetFacetProvider,
+                    TypeDatasetFacetProvider {
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            return Optional.of(
+                    new DefaultKafkaDatasetFacet(
+                            DefaultKafkaDatasetIdentifier.ofTopics(
+                                    Collections.singletonList("topic1"))));
+        }
+
+        @Nullable
+        @Override
+        public ProducerRecord<byte[], byte[]> serialize(
+                Object element, KafkaSinkContext context, Long timestamp) {
+            return null;
+        }
+
+        @Override
+        public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
+            return Optional.of(new DefaultTypeDatasetFacet(TypeInformation.of(String.class)));
+        }
+    }
+}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
new file mode 100644
index 0000000..259668c
--- /dev/null
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KafkaSource}. */
+public class KafkaSourceTest {
+    Properties kafkaProperties;
+
+    @BeforeEach
+    void setup() {
+        kafkaProperties = new Properties();
+        kafkaProperties.put("bootstrap.servers", "host1;host2");
+    }
+
+    @Test
+    public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() {
+        KafkaSource<String> source =
+                new KafkaSourceBuilder<String>()
+                        .setKafkaSubscriber(
+                                new KafkaSubscriber() {
+                                    @Override
+                                    public Set<TopicPartition> getSubscribedTopicPartitions(
+                                            AdminClient adminClient) {
+                                        return null;
+                                    }
+                                })
+                        .setProperties(kafkaProperties)
+                        .setGroupId("")
+                        .setDeserializer(
+                                new KafkaRecordDeserializationSchema<String>() {
+                                    @Override
+                                    public TypeInformation<String> getProducedType() {
+                                        return null;
+                                    }
+
+                                    @Override
+                                    public void deserialize(
+                                            ConsumerRecord<byte[], byte[]> record,
+                                            Collector<String> out)
+                                            throws IOException {}
+                                })
+                        .setUnbounded(OffsetsInitializer.committedOffsets())
+                        .build();
+
+        assertThat(source.getLineageVertex())
+                .extracting(LineageVertex::datasets)
+                .asList()
+                .isEmpty();
+    }
+
+    @Test
+    public void testGetLineageVertexWhenNoKafkaTopicsIdentifier() {
+        KafkaSource<String> source =
+                new KafkaSourceBuilder<String>()
+                        .setKafkaSubscriber(
+                                new TestingKafkaSubscriber() {
+                                    @Override
+                                    public Optional<DefaultKafkaDatasetIdentifier>
+                                            getDatasetIdentifier() {
+                                        return Optional.empty();
+                                    }
+                                })
+                        .setProperties(kafkaProperties)
+                        .setGroupId("")
+                        .setDeserializer(
+                                new KafkaRecordDeserializationSchema<String>() {
+                                    @Override
+                                    public void deserialize(
+                                            ConsumerRecord<byte[], byte[]> record,
+                                            Collector<String> out)
+                                            throws IOException {}
+
+                                    @Override
+                                    public TypeInformation<String> getProducedType() {
+                                        return TypeInformation.of(String.class);
+                                    }
+                                })
+                        .setUnbounded(OffsetsInitializer.committedOffsets())
+                        .build();
+        assertThat(source.getLineageVertex())
+                .extracting(LineageVertex::datasets)
+                .asList()
+                .isEmpty();
+    }
+
+    @Test
+    public void testGetLineageVertex() {
+        TypeInformation<String> typeInformation = TypeInformation.of(String.class);
+        KafkaSource<String> source =
+                new KafkaSourceBuilder<String>()
+                        .setKafkaSubscriber(new TestingKafkaSubscriber())
+                        .setProperties(kafkaProperties)
+                        .setGroupId("")
+                        .setDeserializer(
+                                new KafkaRecordDeserializationSchema<String>() {
+                                    @Override
+                                    public void deserialize(
+                                            ConsumerRecord<byte[], byte[]> record,
+                                            Collector<String> out)
+                                            throws IOException {}
+
+                                    @Override
+                                    public TypeInformation<String> getProducedType() {
+                                        return typeInformation;
+                                    }
+                                })
+                        .setUnbounded(OffsetsInitializer.committedOffsets())
+                        .build();
+
+        LineageVertex lineageVertex = source.getLineageVertex();
+        assertThat(lineageVertex.datasets()).hasSize(1);
+        LineageDataset dataset = lineageVertex.datasets().get(0);
+
+        assertThat(dataset.namespace()).isEqualTo("kafka://host1");
+        assertThat(dataset.name()).isEqualTo("topic1");
+
+        assertThat(dataset.facets()).containsKey(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
+        DefaultKafkaDatasetFacet kafkaFacet =
+                (DefaultKafkaDatasetFacet)
+                        dataset.facets().get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
+
+        assertThat(kafkaFacet.getProperties()).containsEntry("bootstrap.servers", "host1;host2");
+
+        assertThat(dataset.facets()).containsKey(DefaultTypeDatasetFacet.TYPE_FACET_NAME);
+        assertThat(dataset.facets().get(DefaultTypeDatasetFacet.TYPE_FACET_NAME))
+                .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class));
+    }
+
+    private static class TestingKafkaSubscriber
+            implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
+        @Override
+        public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
+            return Optional.of(
+                    DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList("topic1")));
+        }
+
+        @Override
+        public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
+            return null;
+        }
+    }
+}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
index 258c1c0..4c5a502 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.subscriber;
 
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
 import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
 
 import org.apache.kafka.clients.admin.AdminClient;
@@ -71,6 +73,8 @@
                 new HashSet<>(KafkaSourceTestEnv.getPartitionsForTopics(topics));
 
         assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions);
+        assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
+                .isEqualTo(DefaultKafkaDatasetIdentifier.ofTopics(topics));
     }
 
     @Test
@@ -86,8 +90,8 @@
 
     @Test
     public void testTopicPatternSubscriber() {
-        KafkaSubscriber subscriber =
-                KafkaSubscriber.getTopicPatternSubscriber(Pattern.compile("pattern.*"));
+        Pattern pattern = Pattern.compile("pattern.*");
+        KafkaSubscriber subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern);
         final Set<TopicPartition> subscribedPartitions =
                 subscriber.getSubscribedTopicPartitions(adminClient);
 
@@ -96,6 +100,8 @@
                         KafkaSourceTestEnv.getPartitionsForTopics(Collections.singleton(TOPIC2)));
 
         assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions);
+        assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
+                .isEqualTo(DefaultKafkaDatasetIdentifier.ofPattern(pattern));
     }
 
     @Test
@@ -111,6 +117,8 @@
                 subscriber.getSubscribedTopicPartitions(adminClient);
 
         assertThat(subscribedPartitions).isEqualTo(partitions);
+        assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
+                .isEqualTo(DefaultKafkaDatasetIdentifier.ofTopics(topics));
     }
 
     @Test