Kafka with topicPattern can ignore old offsets spuriously (#16190)

* * fix

* * simplify

* * simplify tests

* * update matches function definition for Kafka Datasource Metadata

* * add matchesOld

* * override matches and plus for kafka based metadata / sequence numbers

* * implement minus
* add tests

* * fix failing tests

* * remove TODO comments

* * simplfy and add comments

* * remove unused variable in tests

* * remove unneeded function

* * add serde tests

* * more stuff

* * address review comments

* * remove unneeded code.
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
index a7b73f4..cfa3080 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
@@ -26,19 +26,41 @@
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CollectionUtils;
+import org.apache.kafka.common.TopicPartition;
 
 import java.util.Comparator;
+import java.util.Map;
 
 public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long> implements Comparable<KafkaDataSourceMetadata>
 {
+  private static final Logger LOGGER = new Logger(KafkaDataSourceMetadata.class);
 
   @JsonCreator
   public KafkaDataSourceMetadata(
       @JsonProperty("partitions") SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> kafkaPartitions
   )
   {
-    super(kafkaPartitions);
+    super(kafkaPartitions == null
+        ? null
+        : kafkaPartitions instanceof SeekableStreamStartSequenceNumbers
+            ?
+            new KafkaSeekableStreamStartSequenceNumbers(
+                kafkaPartitions.getStream(),
+                ((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getTopic(),
+                kafkaPartitions.getPartitionSequenceNumberMap(),
+                ((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getPartitionOffsetMap(),
+                ((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getExclusivePartitions()
+            )
+            : new KafkaSeekableStreamEndSequenceNumbers(
+                kafkaPartitions.getStream(),
+                ((SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getTopic(),
+                kafkaPartitions.getPartitionSequenceNumberMap(),
+                ((SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getPartitionOffsetMap()
+            ));
   }
 
   @Override
@@ -76,4 +98,71 @@
     }
     return getSeekableStreamSequenceNumbers().compareTo(other.getSeekableStreamSequenceNumbers(), Comparator.naturalOrder());
   }
+
+  @Override
+  public boolean matches(DataSourceMetadata other)
+  {
+    if (!getClass().equals(other.getClass())) {
+      return false;
+    }
+    KafkaDataSourceMetadata thisPlusOther = (KafkaDataSourceMetadata) plus(other);
+    if (thisPlusOther.equals(other.plus(this))) {
+      return true;
+    }
+
+    // check that thisPlusOther contains all metadata from other, and that there is no inconsistency or loss
+    KafkaDataSourceMetadata otherMetadata = (KafkaDataSourceMetadata) other;
+    final SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> otherSequenceNumbers = otherMetadata.getSeekableStreamSequenceNumbers();
+    if (!getSeekableStreamSequenceNumbers().isMultiTopicPartition() && !otherSequenceNumbers.isMultiTopicPartition()) {
+      return false;
+    }
+    final SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> mergedSequenceNumbers = thisPlusOther.getSeekableStreamSequenceNumbers();
+
+    final Map<TopicPartition, Long> topicAndPartitionToSequenceNumber = CollectionUtils.mapKeys(
+        mergedSequenceNumbers.getPartitionSequenceNumberMap(),
+        k -> k.asTopicPartition(mergedSequenceNumbers.getStream())
+    );
+
+    boolean allOtherFoundAndConsistent = otherSequenceNumbers.getPartitionSequenceNumberMap().entrySet().stream().noneMatch(
+        e -> {
+          KafkaTopicPartition kafkaTopicPartition = e.getKey();
+          TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(otherSequenceNumbers.getStream());
+          Long sequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition);
+          long oldSequenceOffset = e.getValue();
+          if (sequenceOffset == null || !sequenceOffset.equals(oldSequenceOffset)) {
+            LOGGER.info(
+                "sequenceOffset found for currently computed and stored metadata does not match for "
+                + "topicPartition: [%s].  currentSequenceOffset: [%s], oldSequenceOffset: [%s]",
+                topicPartition,
+                sequenceOffset,
+                oldSequenceOffset
+            );
+            return true;
+          }
+          return false;
+        }
+    );
+
+    boolean allThisFoundAndConsistent = this.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().entrySet().stream().noneMatch(
+        e -> {
+          KafkaTopicPartition kafkaTopicPartition = e.getKey();
+          TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(this.getSeekableStreamSequenceNumbers().getStream());
+          Long oldSequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition);
+          long sequenceOffset = e.getValue();
+          if (oldSequenceOffset == null || !oldSequenceOffset.equals(sequenceOffset)) {
+            LOGGER.info(
+                "sequenceOffset found for currently computed and stored metadata does not match for "
+                + "topicPartition: [%s].  currentSequenceOffset: [%s], oldSequenceOffset: [%s]",
+                topicPartition,
+                sequenceOffset,
+                oldSequenceOffset
+            );
+            return true;
+          }
+          return false;
+        }
+    );
+
+    return allOtherFoundAndConsistent && allThisFoundAndConsistent;
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java
new file mode 100644
index 0000000..5476f88
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Represents the kafka based end sequenceNumber per partition of a sequence. This class is needed because
+ * of special handling that must be done for multi-topic partitions to ensure that offsets are preserved.
+ * <p>
+ * Do not register this class as a subtype of base class in Jackson. We want this class to be serialized
+ * when written to DB as a {@link SeekableStreamEndSequenceNumbers}. Do not create instances of this class
+ * directly from jackson mapper.
+ */
+@JsonTypeName(SeekableStreamEndSequenceNumbers.TYPE)
+public class KafkaSeekableStreamEndSequenceNumbers extends SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>
+{
+  private final boolean isMultiTopicPartition;
+
+  public KafkaSeekableStreamEndSequenceNumbers(
+      final String stream,
+      // kept for backward compatibility
+      final String topic,
+      final Map<KafkaTopicPartition, Long> partitionSequenceNumberMap,
+      // kept for backward compatibility
+      final Map<KafkaTopicPartition, Long> partitionOffsetMap
+  )
+  {
+    super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap);
+    // how to know it topicPattern if the partitionSequenceNumberMap is empty?
+    isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet()
+        .stream()
+        .findFirst()
+        .get()
+        .isMultiTopicPartition();
+  }
+
+  @Override
+  public boolean isMultiTopicPartition()
+  {
+    return isMultiTopicPartition;
+  }
+
+  @Override
+  public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> plus(
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+  )
+  {
+    validateSequenceNumbersBaseType(other);
+
+    KafkaSeekableStreamEndSequenceNumbers that = (KafkaSeekableStreamEndSequenceNumbers) other;
+
+    if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) {
+      return super.plus(other);
+    }
+
+    String thisTopic = getStream();
+    String thatTopic = that.getStream();
+    final Map<KafkaTopicPartition, Long> newMap;
+    if (!isMultiTopicPartition()) {
+      // going from topicPattern to single topic
+
+      // start with existing sequence numbers which in this case will be all single topic.
+      newMap = new HashMap<>(getPartitionSequenceNumberMap());
+
+      // add all sequence numbers from other where the topic name matches this topic. Transform to single topic
+      // as in this case we will be returning a single topic based sequence map.
+      newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+          .filter(e -> {
+            if (e.getKey().topic().isPresent()) {
+              return e.getKey().topic().get().equals(thisTopic);
+            } else {
+              // this branch shouldn't really be hit since other should be multi-topic here, but adding this
+              // just in case.
+              return thatTopic.equals(thisTopic);
+            }
+          })
+          .collect(Collectors.toMap(
+              e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()),
+              Map.Entry::getValue
+          )));
+    } else {
+      // going from single topic or topicPattern to topicPattern
+
+      // start with existing sequence numbers and transform them to multit-topic keys, as the returned
+      // sequence numbers will be multi-topic based.
+      newMap = CollectionUtils.mapKeys(
+          getPartitionSequenceNumberMap(),
+          k -> new KafkaTopicPartition(
+              true,
+              k.asTopicPartition(thisTopic).topic(),
+              k.partition()
+          )
+      );
+
+      // add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to
+      // multi-topic as in this case we will be returning a multi-topic based sequence map.
+      Pattern pattern = Pattern.compile(thisTopic);
+      newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+          .filter(e -> {
+            if (e.getKey().topic().isPresent()) {
+              return pattern.matcher(e.getKey().topic().get()).matches();
+            } else {
+              return pattern.matcher(thatTopic).matches();
+            }
+          })
+          .collect(Collectors.toMap(
+              e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()),
+              Map.Entry::getValue
+          )));
+    }
+
+    return new SeekableStreamEndSequenceNumbers<>(getStream(), newMap);
+  }
+
+  @Override
+  public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> minus(
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+  )
+  {
+    validateSequenceNumbersBaseType(other);
+
+    final KafkaSeekableStreamEndSequenceNumbers otherEnd =
+        (KafkaSeekableStreamEndSequenceNumbers) other;
+
+    if (!this.isMultiTopicPartition() && !otherEnd.isMultiTopicPartition()) {
+      return super.minus(other);
+    }
+
+    final Map<KafkaTopicPartition, Long> newMap = new HashMap<>();
+    //String thisTopic = getStream();
+    String thatTopic = otherEnd.getStream();
+
+
+    // remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match
+    for (Map.Entry<KafkaTopicPartition, Long> entry : getPartitionSequenceNumberMap().entrySet()) {
+      String thisTopic = entry.getKey().asTopicPartition(getStream()).topic();
+      boolean otherContainsThis = otherEnd.getPartitionSequenceNumberMap().containsKey(entry.getKey());
+      boolean otherContainsThisMultiTopic = otherEnd.getPartitionSequenceNumberMap()
+          .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition()));
+      boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherEnd.getPartitionSequenceNumberMap()
+          .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition())));
+      if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) {
+        newMap.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return new SeekableStreamEndSequenceNumbers<>(
+        getStream(),
+        newMap
+    );
+  }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java
new file mode 100644
index 0000000..24a3e08
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Represents the kafka based start sequenceNumber per partition of a sequence. This class is needed because
+ * of special handling that must be done for multi-topic partitions to ensure that offsets are preserved.
+ * <p>
+ * Do not register this class as a subtype of base class in Jackson. We want this class to be serialized
+ * when written to DB as a {@link SeekableStreamStartSequenceNumbers}. Do not create instances of this class
+ * directly from jackson mapper.
+ */
+@JsonTypeName(SeekableStreamStartSequenceNumbers.TYPE)
+public class KafkaSeekableStreamStartSequenceNumbers extends SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>
+{
+  private final boolean isMultiTopicPartition;
+
+  public KafkaSeekableStreamStartSequenceNumbers(
+      String stream,
+      String topic,
+      Map<KafkaTopicPartition, Long> partitionSequenceNumberMap,
+      Map<KafkaTopicPartition, Long> partitionOffsetMap,
+      @Nullable Set<KafkaTopicPartition> exclusivePartitions
+  )
+  {
+    super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap, exclusivePartitions);
+    // how to know it topicPattern if the partitionSequenceNumberMap is empty?
+    isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet()
+        .stream()
+        .findFirst()
+        .get()
+        .isMultiTopicPartition();
+  }
+
+  @Override
+  public boolean isMultiTopicPartition()
+  {
+    return isMultiTopicPartition;
+  }
+
+  @Override
+  public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> plus(
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+  )
+  {
+    validateSequenceNumbersBaseType(other);
+
+    KafkaSeekableStreamStartSequenceNumbers that = (KafkaSeekableStreamStartSequenceNumbers) other;
+
+    if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) {
+      return super.plus(other);
+    }
+
+
+    String thisTopic = getStream();
+    String thatTopic = that.getStream();
+    final Map<KafkaTopicPartition, Long> newMap;
+    final Set<KafkaTopicPartition> newExclusivePartitions = new HashSet<>();
+    if (!isMultiTopicPartition()) {
+      // going from topicPattern to single topic
+
+      // start with existing sequence numbers which in this case will be all single topic.
+      newMap = new HashMap<>(getPartitionSequenceNumberMap());
+
+      // add all sequence numbers from other where the topic name matches this topic. Transform to single topic
+      // as in this case we will be returning a single topic based sequence map.
+      newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+          .filter(e -> {
+            if (e.getKey().topic().isPresent()) {
+              return e.getKey().topic().get().equals(thisTopic);
+            } else {
+              // this branch shouldn't really be hit since other should be multi-topic here, but adding this
+              // just in case.
+              return thatTopic.equals(thisTopic);
+            }
+          })
+          .collect(Collectors.toMap(
+              e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()),
+              Map.Entry::getValue
+          )));
+
+      // A partition is exclusive if it's
+      // 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or
+      // 2) exclusive in "other"
+      getPartitionSequenceNumberMap().forEach(
+          (partitionId, sequenceOffset) -> {
+            KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition());
+            if (getExclusivePartitions().contains(partitionId) && !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch)) {
+              newExclusivePartitions.add(new KafkaTopicPartition(false, this.getStream(), partitionId.partition()));
+            }
+          }
+      );
+      newExclusivePartitions.addAll(that.getExclusivePartitions());
+    } else {
+      // going from single topic or topicPattern to topicPattern
+
+      // start with existing sequence numbers and transform them to multit-topic keys, as the returned
+      // sequence numbers will be multi-topic based.
+      newMap = CollectionUtils.mapKeys(
+          getPartitionSequenceNumberMap(),
+          k -> new KafkaTopicPartition(
+              true,
+              k.asTopicPartition(thisTopic).topic(),
+              k.partition()
+          )
+      );
+
+      // add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to
+      // multi-topic as in this case we will be returning a multi-topic based sequence map.
+      Pattern pattern = Pattern.compile(thisTopic);
+      newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+          .filter(e -> {
+            if (e.getKey().topic().isPresent()) {
+              return pattern.matcher(e.getKey().topic().get()).matches();
+            } else {
+              return pattern.matcher(thatTopic).matches();
+            }
+          })
+          .collect(Collectors.toMap(
+              e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()),
+              Map.Entry::getValue
+          )));
+
+      // A partition is exclusive if it's
+      // 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or
+      // 2) exclusive in "other"
+      getPartitionSequenceNumberMap().forEach(
+          (partitionId, sequenceOffset) -> {
+            KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition());
+            boolean thatTopicMatchesThisTopicPattern = partitionId.topic().isPresent() ? pattern.matcher(partitionId.topic().get()).matches() : pattern.matcher(thatTopic).matches();
+            if (getExclusivePartitions().contains(partitionId) && (!thatTopicMatchesThisTopicPattern || !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch))) {
+              newExclusivePartitions.add(new KafkaTopicPartition(true, this.getStream(), partitionId.partition()));
+            }
+          }
+      );
+      newExclusivePartitions.addAll(that.getExclusivePartitions());
+    }
+
+    return new SeekableStreamStartSequenceNumbers<>(getStream(), newMap, newExclusivePartitions);
+  }
+
+  @Override
+  public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> minus(
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+  )
+  {
+    validateSequenceNumbersBaseType(other);
+
+    final KafkaSeekableStreamStartSequenceNumbers otherStart =
+        (KafkaSeekableStreamStartSequenceNumbers) other;
+
+    if (!this.isMultiTopicPartition() && !otherStart.isMultiTopicPartition()) {
+      return super.minus(other);
+    }
+
+    final Map<KafkaTopicPartition, Long> newMap = new HashMap<>();
+    final Set<KafkaTopicPartition> newExclusivePartitions = new HashSet<>();
+    String thatTopic = otherStart.getStream();
+
+    // remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match
+    for (Map.Entry<KafkaTopicPartition, Long> entry : getPartitionSequenceNumberMap().entrySet()) {
+      String thisTopic = entry.getKey().asTopicPartition(getStream()).topic();
+      boolean otherContainsThis = otherStart.getPartitionSequenceNumberMap().containsKey(entry.getKey());
+      boolean otherContainsThisMultiTopic = otherStart.getPartitionSequenceNumberMap()
+          .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition()));
+      boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherStart.getPartitionSequenceNumberMap()
+          .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition())));
+      if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) {
+        newMap.put(entry.getKey(), entry.getValue());
+        // A partition is exclusive if it's exclusive in "this" and not in "other"'s partitionSequenceNumberMap
+        if (getExclusivePartitions().contains(entry.getKey())) {
+          newExclusivePartitions.add(entry.getKey());
+        }
+      }
+    }
+
+    return new SeekableStreamStartSequenceNumbers<>(
+        getStream(),
+        newMap,
+        newExclusivePartitions
+    );
+  }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index e9c6b83..aebacec 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -44,6 +44,7 @@
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -63,12 +64,14 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
@@ -92,6 +95,7 @@
 
   private final ServiceEmitter emitter;
   private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
+  private final Pattern pattern;
   private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
 
 
@@ -122,6 +126,7 @@
     this.spec = spec;
     this.emitter = spec.getEmitter();
     this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
+    this.pattern = getIoConfig().isMultiTopic() ? Pattern.compile(getIoConfig().getStream()) : null;
   }
 
 
@@ -444,4 +449,79 @@
   {
     return spec.getTuningConfig();
   }
+
+  protected boolean isMultiTopic()
+  {
+    return getIoConfig().isMultiTopic() && pattern != null;
+  }
+
+  /**
+   * Gets the offsets as stored in the metadata store. The map returned will only contain
+   * offsets from topic partitions that match the current supervisor config stream. This
+   * override is needed because in the case of multi-topic, a user could have updated the supervisor
+   * config from single topic to mult-topic, where the new multi-topic pattern regex matches the
+   * old config single topic. Without this override, the previously stored metadata for the single
+   * topic would be deemed as different from the currently configure stream, and not be included in
+   * the offset map returned. This implementation handles these cases appropriately.
+   *
+   * @return the previoulsy stored offsets from metadata storage, possibly updated with offsets removed
+   * for topics that do not match the currently configured supervisor topic. Topic partition keys may also be
+   * updated to single topic or multi-topic depending on the supervisor config, as needed.
+   */
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+  {
+    final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
+    if (checkSourceMetadataMatch(dataSourceMetadata)) {
+      @SuppressWarnings("unchecked")
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata)
+          .getSeekableStreamSequenceNumbers();
+      if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) {
+        Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
+        Set<String> topicMisMatchLogged = new HashSet<>();
+        partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> {
+          final String matchValue;
+          // previous offsets are from multi-topic config
+          if (kafkaTopicPartition.topic().isPresent()) {
+            matchValue = kafkaTopicPartition.topic().get();
+          } else {
+            // previous offsets are from single topic config
+            matchValue = partitions.getStream();
+          }
+
+          KafkaTopicPartition matchingTopicPartition = getMatchingKafkaTopicPartition(kafkaTopicPartition, matchValue);
+
+          if (matchingTopicPartition == null && !topicMisMatchLogged.contains(matchValue)) {
+            log.warn(
+                "Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences",
+                matchValue,
+                getIoConfig().getStream()
+            );
+            topicMisMatchLogged.add(matchValue);
+          }
+          if (matchingTopicPartition != null) {
+            partitionOffsets.put(matchingTopicPartition, value);
+          }
+        });
+        return partitionOffsets;
+      }
+    }
+
+    return Collections.emptyMap();
+  }
+
+  @Nullable
+  private KafkaTopicPartition getMatchingKafkaTopicPartition(
+      final KafkaTopicPartition kafkaTopicPartition,
+      final String streamMatchValue
+  )
+  {
+    final boolean match;
+
+    match = pattern != null
+        ? pattern.matcher(streamMatchValue).matches()
+        : getIoConfig().getStream().equals(streamMatchValue);
+
+    return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null;
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
index 800a1fe..0b013b6 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Injector;
@@ -36,17 +37,34 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 public class KafkaDataSourceMetadataTest
 {
-  private static final KafkaDataSourceMetadata START0 = startMetadata(ImmutableMap.of());
-  private static final KafkaDataSourceMetadata START1 = startMetadata(ImmutableMap.of(0, 2L, 1, 3L));
-  private static final KafkaDataSourceMetadata START2 = startMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
-  private static final KafkaDataSourceMetadata START3 = startMetadata(ImmutableMap.of(0, 2L, 2, 5L));
-  private static final KafkaDataSourceMetadata END0 = endMetadata(ImmutableMap.of());
-  private static final KafkaDataSourceMetadata END1 = endMetadata(ImmutableMap.of(0, 2L, 2, 5L));
-  private static final KafkaDataSourceMetadata END2 = endMetadata(ImmutableMap.of(0, 2L, 1, 4L));
+  private static final KafkaDataSourceMetadata START0 = startMetadata("foo", ImmutableMap.of());
+  private static final KafkaDataSourceMetadata START1 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
+  private static final KafkaDataSourceMetadata START2 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
+  private static final KafkaDataSourceMetadata START3 = startMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata START4 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
+  private static final KafkaDataSourceMetadata START5 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 3L));
+  private static final KafkaDataSourceMetadata START6 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L));
+  private static final KafkaDataSourceMetadata START7 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata START8 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata START9 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata END0 = endMetadata("foo", ImmutableMap.of());
+  private static final KafkaDataSourceMetadata END1 = endMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata END2 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L));
+  private static final KafkaDataSourceMetadata END3 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
+  private static final KafkaDataSourceMetadata END4 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
+  private static final KafkaDataSourceMetadata END5 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata END6 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L));
+  private static final KafkaDataSourceMetadata END7 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata END8 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 4L));
+  private static final KafkaDataSourceMetadata END9 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+  private static final KafkaDataSourceMetadata END10 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
 
   @Test
   public void testMatches()
@@ -55,33 +73,332 @@
     Assert.assertTrue(START0.matches(START1));
     Assert.assertTrue(START0.matches(START2));
     Assert.assertTrue(START0.matches(START3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior
+    Assert.assertFalse(START0.matches(START4));
+    Assert.assertTrue(START0.matches(START5));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(START0.matches(START6));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(START0.matches(START7));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(START0.matches(START8));
+    // when merging, we lose the sequence numbers for topics foo2, and foo22 here when merging, so false
+    Assert.assertFalse(START0.matches(START9));
 
     Assert.assertTrue(START1.matches(START0));
     Assert.assertTrue(START1.matches(START1));
+    // sequence numbers for topic foo partition 1 are inconsistent, so false
     Assert.assertFalse(START1.matches(START2));
     Assert.assertTrue(START1.matches(START3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertTrue(START1.matches(START5));
+    // when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(START1.matches(START6));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(START1.matches(START7));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(START1.matches(START8));
+    // when merging, we lose the sequence numbers for topics foo2 and foo22, so false
+    Assert.assertFalse(START1.matches(START9));
 
     Assert.assertTrue(START2.matches(START0));
     Assert.assertFalse(START2.matches(START1));
     Assert.assertTrue(START2.matches(START2));
     Assert.assertTrue(START2.matches(START3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START2.matches(START4));
+    // when merging, sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(START2.matches(START5));
+    // when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(START2.matches(START6));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(START2.matches(START7));
 
     Assert.assertTrue(START3.matches(START0));
     Assert.assertTrue(START3.matches(START1));
     Assert.assertTrue(START3.matches(START2));
     Assert.assertTrue(START3.matches(START3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START3.matches(START4));
+    Assert.assertTrue(START3.matches(START5));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(START3.matches(START6));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(START3.matches(START7));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(START3.matches(START8));
+    // when merging, we lose the sequence numbers for topics foo2 and foo22, so false
+    Assert.assertFalse(START3.matches(START9));
+
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START0));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START1));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START2));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START3));
+    Assert.assertTrue(START4.matches(START4));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START5));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START6));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START7));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START8));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(START4.matches(START9));
+
+    Assert.assertTrue(START5.matches(START0));
+    Assert.assertTrue(START5.matches(START1));
+    // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(START5.matches(START2));
+    Assert.assertTrue(START5.matches(START3));
+    Assert.assertTrue(START5.matches(START4));
+    Assert.assertTrue(START5.matches(START5));
+    Assert.assertTrue(START5.matches(START6));
+    Assert.assertTrue(START5.matches(START7));
+    Assert.assertTrue(START5.matches(START8));
+    Assert.assertTrue(START5.matches(START9));
+
+    Assert.assertTrue(START6.matches(START0));
+    Assert.assertTrue(START6.matches(START1));
+    // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(START6.matches(START2));
+    Assert.assertTrue(START6.matches(START3));
+    Assert.assertTrue(START6.matches(START4));
+    Assert.assertTrue(START6.matches(START5));
+    Assert.assertTrue(START6.matches(START6));
+    Assert.assertTrue(START6.matches(START7));
+    Assert.assertTrue(START6.matches(START8));
+    Assert.assertTrue(START6.matches(START9));
+
+    Assert.assertTrue(START7.matches(START0));
+    Assert.assertTrue(START7.matches(START1));
+    Assert.assertTrue(START7.matches(START2));
+    Assert.assertTrue(START7.matches(START3));
+    Assert.assertTrue(START7.matches(START4));
+    Assert.assertTrue(START7.matches(START5));
+    Assert.assertTrue(START7.matches(START6));
+    Assert.assertTrue(START7.matches(START7));
+    Assert.assertTrue(START7.matches(START8));
+    Assert.assertTrue(START7.matches(START9));
+
+    Assert.assertTrue(START8.matches(START0));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START8.matches(START1));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START8.matches(START2));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START8.matches(START3));
+    Assert.assertTrue(START8.matches(START4));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START8.matches(START5));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START8.matches(START6));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START8.matches(START7));
+    Assert.assertTrue(START8.matches(START8));
+    Assert.assertTrue(START8.matches(START9));
+
+    Assert.assertTrue(START9.matches(START0));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START9.matches(START1));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START9.matches(START2));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START9.matches(START3));
+    Assert.assertTrue(START9.matches(START4));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START9.matches(START5));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START9.matches(START6));
+    // when merging, we lose the sequence numbers for topic foo, so false
+    Assert.assertFalse(START9.matches(START7));
+    Assert.assertTrue(START9.matches(START8));
+    Assert.assertTrue(START9.matches(START9));
 
     Assert.assertTrue(END0.matches(END0));
     Assert.assertTrue(END0.matches(END1));
     Assert.assertTrue(END0.matches(END2));
+    Assert.assertTrue(END0.matches(END3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END0.matches(END4));
+    Assert.assertTrue(END0.matches(END5));
+    Assert.assertTrue(END0.matches(END6));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(END0.matches(END7));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(END0.matches(END8));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(END0.matches(END9));
+    // when merging, we lose the sequence numbers for topic foo2, so false
+    Assert.assertFalse(END0.matches(END10));
 
     Assert.assertTrue(END1.matches(END0));
     Assert.assertTrue(END1.matches(END1));
     Assert.assertTrue(END1.matches(END2));
+    Assert.assertTrue(END1.matches(END3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END1.matches(END4));
+    Assert.assertTrue(END1.matches(END5));
+    Assert.assertTrue(END1.matches(END6));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END1.matches(END7));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END1.matches(END8));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END1.matches(END9));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END1.matches(END10));
 
     Assert.assertTrue(END2.matches(END0));
     Assert.assertTrue(END2.matches(END1));
     Assert.assertTrue(END2.matches(END2));
+    // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(END2.matches(END3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END2.matches(END4));
+    Assert.assertTrue(END2.matches(END5));
+    Assert.assertTrue(END2.matches(END6));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END2.matches(END7));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END2.matches(END8));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END2.matches(END9));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END2.matches(END10));
+
+    Assert.assertTrue(END3.matches(END0));
+    Assert.assertTrue(END3.matches(END1));
+    // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(END3.matches(END2));
+    Assert.assertTrue(END3.matches(END3));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END3.matches(END4));
+    Assert.assertTrue(END3.matches(END5));
+    // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(END3.matches(END6));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END3.matches(END7));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END3.matches(END8));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END3.matches(END9));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END3.matches(END10));
+
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END0));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END1));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END2));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END3));
+    Assert.assertTrue(END4.matches(END4));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END5));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END6));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END7));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END8));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END9));
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertFalse(END4.matches(END10));
+
+    Assert.assertTrue(END5.matches(END0));
+    Assert.assertTrue(END5.matches(END1));
+    Assert.assertTrue(END5.matches(END2));
+    Assert.assertTrue(END5.matches(END3));
+    Assert.assertTrue(END5.matches(END4));
+    Assert.assertTrue(END5.matches(END5));
+    Assert.assertTrue(END5.matches(END6));
+    Assert.assertTrue(END5.matches(END7));
+    Assert.assertTrue(END5.matches(END8));
+    Assert.assertTrue(END5.matches(END9));
+    Assert.assertTrue(END5.matches(END10));
+
+    Assert.assertTrue(END6.matches(END0));
+    Assert.assertTrue(END6.matches(END1));
+    Assert.assertTrue(END6.matches(END2));
+    // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+    Assert.assertFalse(END6.matches(END3));
+    Assert.assertTrue(END6.matches(END4));
+    Assert.assertTrue(END6.matches(END5));
+    Assert.assertTrue(END6.matches(END6));
+    Assert.assertTrue(END6.matches(END7));
+    Assert.assertTrue(END6.matches(END8));
+    Assert.assertTrue(END6.matches(END9));
+    Assert.assertTrue(END6.matches(END10));
+
+    Assert.assertTrue(END7.matches(END0));
+    Assert.assertTrue(END7.matches(END1));
+    Assert.assertTrue(END7.matches(END2));
+    Assert.assertTrue(END7.matches(END3));
+    Assert.assertTrue(END7.matches(END4));
+    Assert.assertTrue(END7.matches(END5));
+    Assert.assertTrue(END7.matches(END6));
+    Assert.assertTrue(END7.matches(END7));
+    Assert.assertTrue(END7.matches(END8));
+    Assert.assertTrue(END7.matches(END9));
+    Assert.assertTrue(END7.matches(END10));
+
+    Assert.assertTrue(END8.matches(END0));
+    Assert.assertTrue(END8.matches(END1));
+    Assert.assertTrue(END8.matches(END2));
+    // when merging, the sequence numbers for topic foo-1, and foo-2 are inconsistent, so false
+    Assert.assertFalse(END8.matches(END3));
+    Assert.assertTrue(END8.matches(END4));
+    Assert.assertTrue(END8.matches(END5));
+    Assert.assertTrue(END8.matches(END6));
+    Assert.assertTrue(END8.matches(END7));
+    Assert.assertTrue(END8.matches(END8));
+    Assert.assertTrue(END8.matches(END9));
+    Assert.assertTrue(END8.matches(END10));
+
+    Assert.assertTrue(END9.matches(END0));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END9.matches(END1));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END9.matches(END2));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END9.matches(END3));
+    Assert.assertTrue(END9.matches(END4));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END9.matches(END5));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END9.matches(END6));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END9.matches(END7));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END9.matches(END8));
+    Assert.assertTrue(END9.matches(END9));
+    Assert.assertTrue(END9.matches(END10));
+
+    Assert.assertTrue(END10.matches(END0));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END10.matches(END1));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END10.matches(END2));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END10.matches(END3));
+    Assert.assertTrue(END10.matches(END4));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END10.matches(END5));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END10.matches(END6));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END10.matches(END7));
+    // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+    Assert.assertFalse(END10.matches(END8));
+    Assert.assertTrue(END10.matches(END9));
+    Assert.assertTrue(END10.matches(END10));
   }
 
   @Test
@@ -91,6 +408,12 @@
     Assert.assertTrue(START1.isValidStart());
     Assert.assertTrue(START2.isValidStart());
     Assert.assertTrue(START3.isValidStart());
+    Assert.assertTrue(START4.isValidStart());
+    Assert.assertTrue(START5.isValidStart());
+    Assert.assertTrue(START6.isValidStart());
+    Assert.assertTrue(START7.isValidStart());
+    Assert.assertTrue(START8.isValidStart());
+    Assert.assertTrue(START9.isValidStart());
   }
 
   @Test
@@ -121,6 +444,85 @@
         START2.plus(START2)
     );
 
+    // add comment on this
+    Assert.assertEquals(
+        START4,
+        START1.plus(START4)
+    );
+
+    Assert.assertEquals(
+        startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
+        START1.plus(START5)
+    );
+
+    Assert.assertEquals(
+        startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
+        START1.plus(START6)
+    );
+
+    Assert.assertEquals(
+        startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+        START1.plus(START7)
+    );
+
+    Assert.assertEquals(
+        startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+        START2.plus(START6)
+    );
+
+    // add comment on this
+    Assert.assertEquals(
+        START0,
+        START4.plus(START0)
+    );
+
+    // add comment on this
+    Assert.assertEquals(
+        START1,
+        START4.plus(START1)
+    );
+
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertEquals(
+        START4,
+        START4.plus(START5)
+    );
+
+    Assert.assertEquals(
+        START5,
+        START5.plus(START4)
+    );
+
+    Assert.assertEquals(
+        startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L)),
+        START5.plus(START6)
+    );
+
+    Assert.assertEquals(
+        startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+        START7.plus(START8)
+    );
+
+    Assert.assertEquals(
+        startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+        START7.plus(START9)
+    );
+
+    Assert.assertEquals(
+        startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+        START8.plus(START7)
+    );
+
+    Assert.assertEquals(
+        startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+        START8.plus(START9)
+    );
+
+    Assert.assertEquals(
+        startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+        START9.plus(START8)
+    );
+
     Assert.assertEquals(
         endMetadata(ImmutableMap.of(0, 2L, 2, 5L)),
         END0.plus(END1)
@@ -130,27 +532,137 @@
         endMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
         END1.plus(END2)
     );
+
+    // add comment on this
+    Assert.assertEquals(
+        END4,
+        END0.plus(END4)
+    );
+
+    // add comment on this
+    Assert.assertEquals(
+        END4,
+        END1.plus(END4)
+    );
+
+    // add comment on this
+    Assert.assertEquals(
+        END0,
+        END4.plus(END0)
+    );
+
+    // add comment on this
+    Assert.assertEquals(
+        END1,
+        END4.plus(END1)
+    );
+
+    Assert.assertEquals(
+        END3,
+        END2.plus(END3)
+    );
+
+    Assert.assertEquals(
+        END2,
+        END3.plus(END2)
+    );
+
+    // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior    Assert.assertFalse(START1.matches(START4));
+    Assert.assertEquals(
+        END4,
+        END4.plus(END5)
+    );
+
+    Assert.assertEquals(
+        END5,
+        END5.plus(END4)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END5.plus(END9)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END5.plus(END10)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+        END5.plus(END6)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END5.plus(END7)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END7.plus(END5)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END9.plus(END5)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END9.plus(END10)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END10.plus(END9)
+    );
   }
 
   @Test
   public void testMinus()
   {
     Assert.assertEquals(
-        startMetadata(ImmutableMap.of(1, 3L)),
-        START1.minus(START3)
-    );
-
-    Assert.assertEquals(
         startMetadata(ImmutableMap.of()),
         START0.minus(START2)
     );
 
     Assert.assertEquals(
+        START0,
+        START0.minus(START4)
+    );
+
+    Assert.assertEquals(
         startMetadata(ImmutableMap.of()),
         START1.minus(START2)
     );
 
     Assert.assertEquals(
+        startMetadata(ImmutableMap.of(1, 3L)),
+        START1.minus(START3)
+    );
+
+    Assert.assertEquals(
+        START1,
+        START1.minus(START4)
+    );
+
+    Assert.assertEquals(
+        startMetadata("foo", ImmutableMap.of()),
+        START1.minus(START5)
+    );
+
+    Assert.assertEquals(
+        startMetadata("foo", ImmutableMap.of()),
+        START1.minus(START6)
+    );
+
+    Assert.assertEquals(
+        startMetadata("foo", ImmutableMap.of(1, 3L)),
+        START1.minus(START7)
+    );
+
+    Assert.assertEquals(
         startMetadata(ImmutableMap.of(2, 5L)),
         START2.minus(START1)
     );
@@ -161,6 +673,21 @@
     );
 
     Assert.assertEquals(
+        START4,
+        START4.minus(START0)
+    );
+
+    Assert.assertEquals(
+        START4,
+        START4.minus(START1)
+    );
+
+    Assert.assertEquals(
+        startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
+        START5.minus(START1)
+    );
+
+    Assert.assertEquals(
         endMetadata(ImmutableMap.of(1, 4L)),
         END2.minus(END1)
     );
@@ -169,6 +696,101 @@
         endMetadata(ImmutableMap.of(2, 5L)),
         END1.minus(END2)
     );
+
+    Assert.assertEquals(
+        END0,
+        END0.minus(END4)
+    );
+
+    Assert.assertEquals(
+        END4,
+        END4.minus(END0)
+    );
+
+    Assert.assertEquals(
+        END1,
+        END1.minus(END4)
+    );
+
+    Assert.assertEquals(
+        END4,
+        END4.minus(END1)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
+        END5.minus(END1)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END5.minus(END4)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(2, 5L)),
+        END5.minus(END6)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
+        END6.minus(END5)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
+        END6.minus(END7)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END7.minus(END5)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(2, 5L)),
+        END7.minus(END8)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END7.minus(END9)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END7.minus(END10)
+    );
+
+    Assert.assertEquals(
+        END9,
+        END9.minus(END6)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
+        END9.minus(END7)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(2, 5L)),
+        END9.minus(END8)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
+        END9.minus(END9)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
+        END9.minus(END10)
+    );
+
+    Assert.assertEquals(
+        endMetadataMultiTopic("foo2.*", ImmutableList.of("foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+        END10.minus(END9)
+    );
   }
 
   @Test
@@ -204,19 +826,59 @@
 
   private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long> offsets)
   {
+    return startMetadata("foo", offsets);
+  }
+
+  private static KafkaDataSourceMetadata startMetadata(
+      String topic,
+      Map<Integer, Long> offsets
+  )
+  {
     Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
         offsets,
         k -> new KafkaTopicPartition(
             false,
-            "foo",
+            topic,
             k
         )
     );
-    return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>("foo", newOffsets, ImmutableSet.of()));
+    return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, newOffsets, ImmutableSet.of()));
   }
 
+  private static KafkaDataSourceMetadata startMetadataMultiTopic(
+      String topicPattern,
+      List<String> topics,
+      Map<Integer, Long> offsets
+  )
+  {
+    Assert.assertFalse(topics.isEmpty());
+    Pattern pattern = Pattern.compile(topicPattern);
+    Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
+    Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
+    for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
+      for (String topic : topics) {
+        newOffsets.put(
+            new KafkaTopicPartition(
+                true,
+                topic,
+                e.getKey()
+
+            ),
+            e.getValue()
+        );
+      }
+    }
+    return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topicPattern, newOffsets, ImmutableSet.of()));
+  }
+
+
   private static KafkaDataSourceMetadata endMetadata(Map<Integer, Long> offsets)
   {
+    return endMetadata("foo", offsets);
+  }
+
+  private static KafkaDataSourceMetadata endMetadata(String topic, Map<Integer, Long> offsets)
+  {
     Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
         offsets,
         k -> new KafkaTopicPartition(
@@ -225,7 +887,33 @@
             k
         )
     );
-    return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", newOffsets));
+    return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, newOffsets));
+  }
+
+  private static KafkaDataSourceMetadata endMetadataMultiTopic(
+      String topicPattern,
+      List<String> topics,
+      Map<Integer, Long> offsets
+  )
+  {
+    Assert.assertFalse(topics.isEmpty());
+    Pattern pattern = Pattern.compile(topicPattern);
+    Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
+    Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
+    for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
+      for (String topic : topics) {
+        newOffsets.put(
+            new KafkaTopicPartition(
+                true,
+                topic,
+                e.getKey()
+
+            ),
+            e.getValue()
+        );
+      }
+    }
+    return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topicPattern, newOffsets));
   }
 
   private static ObjectMapper createObjectMapper()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java
new file mode 100644
index 0000000..e3b3cef
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class KafkaSeekableStreamEndSequenceNumbersTest
+{
+
+  private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    final String stream = "theStream";
+    final Map<KafkaTopicPartition, Long> offsetMap = ImmutableMap.of(
+        new KafkaTopicPartition(false, null, 1), 2L,
+        new KafkaTopicPartition(false, null, 3), 4L
+    );
+
+    final KafkaSeekableStreamEndSequenceNumbers partitions = new KafkaSeekableStreamEndSequenceNumbers(
+        stream,
+        null,
+        offsetMap,
+        null
+    );
+    final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+    // Check round-trip.
+    final SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long> partitions2 = OBJECT_MAPPER.readValue(
+        serializedString,
+        new TypeReference<SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>>() {}
+    );
+
+    Assert.assertEquals(
+        "Round trip",
+        partitions,
+        new KafkaSeekableStreamEndSequenceNumbers(partitions2.getStream(),
+            partitions2.getTopic(),
+            partitions2.getPartitionSequenceNumberMap(),
+            partitions2.getPartitionOffsetMap()
+        )
+    );
+
+    // Check backwards compatibility.
+    final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+        serializedString,
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    Assert.assertEquals(stream, asMap.get("stream"));
+    Assert.assertEquals(stream, asMap.get("topic"));
+
+    // Jackson will deserialize the maps as string -> int maps, not int -> long.
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+    );
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+    );
+
+    // check that KafkaSeekableStreamEndSequenceNumbers not registered with mapper, so no possible collision
+    // when deserializing it from String / bytes
+    boolean expectedExceptionThrown = false;
+    try {
+      OBJECT_MAPPER.readValue(
+          serializedString,
+          KafkaSeekableStreamEndSequenceNumbers.class
+      );
+    }
+    catch (InvalidTypeIdException e) {
+      expectedExceptionThrown = true;
+    }
+
+    Assert.assertTrue("KafkaSeekableStreamEndSequenceNumbers should not be registered type", expectedExceptionThrown);
+  }
+
+  private static ObjectMapper createObjectMapper()
+  {
+    DruidModule module = new KafkaIndexTaskModule();
+    final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
+        .addModule(
+            binder -> {
+              binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+              binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+              binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+            }
+        )
+        .build();
+
+    ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+    module.getJacksonModules().forEach(objectMapper::registerModule);
+    return objectMapper;
+  }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java
new file mode 100644
index 0000000..07eaced
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaSeekableStreamStartSequenceNumbersTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    final String stream = "theStream";
+    final Map<KafkaTopicPartition, Long> offsetMap = ImmutableMap.of(
+        new KafkaTopicPartition(false, null, 1), 2L,
+        new KafkaTopicPartition(false, null, 3), 4L
+    );
+
+    Set<KafkaTopicPartition> exclusivePartitions = ImmutableSet.of(new KafkaTopicPartition(false, null, 1));
+
+
+    final KafkaSeekableStreamStartSequenceNumbers partitions = new KafkaSeekableStreamStartSequenceNumbers(
+        stream,
+        null,
+        offsetMap,
+        null,
+        exclusivePartitions
+    );
+    final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+    // Check round-trip.
+    final SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> partitions2 = OBJECT_MAPPER.readValue(
+        serializedString,
+        new TypeReference<SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>>() {}
+    );
+
+    Assert.assertEquals(
+        "Round trip",
+        partitions,
+        new KafkaSeekableStreamStartSequenceNumbers(
+            partitions2.getStream(),
+            partitions2.getTopic(),
+            partitions2.getPartitionSequenceNumberMap(),
+            partitions2.getPartitionOffsetMap(),
+            partitions2.getExclusivePartitions()
+        )
+    );
+
+    // Check backwards compatibility.
+    final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+        serializedString,
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    Assert.assertEquals(stream, asMap.get("stream"));
+    Assert.assertEquals(stream, asMap.get("topic"));
+
+    // Jackson will deserialize the maps as string -> int maps, not int -> long.
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+    );
+    Assert.assertEquals(
+        offsetMap,
+        OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+    );
+
+    Assert.assertEquals(
+        exclusivePartitions,
+        OBJECT_MAPPER.convertValue(asMap.get("exclusivePartitions"), new TypeReference<Set<KafkaTopicPartition>>() {})
+    );
+
+    // check that KafkaSeekableStreamStartSequenceNumbers not registered with mapper, so no possible collision
+    // when deserializing it from String / bytes
+    boolean expectedExceptionThrown = false;
+    try {
+      OBJECT_MAPPER.readValue(
+          serializedString,
+          KafkaSeekableStreamStartSequenceNumbers.class
+      );
+    }
+    catch (InvalidTypeIdException e) {
+      expectedExceptionThrown = true;
+    }
+
+    Assert.assertTrue("KafkaSeekableStreamStartSequenceNumbers should not be registered type", expectedExceptionThrown);
+  }
+
+  private static ObjectMapper createObjectMapper()
+  {
+    DruidModule module = new KafkaIndexTaskModule();
+    final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
+        .addModule(
+            binder -> {
+              binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+              binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+              binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+            }
+        )
+        .build();
+
+    ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+    module.getJacksonModules().forEach(objectMapper::registerModule);
+    return objectMapper;
+  }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index e4b4b2f..a7d4778 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -166,6 +166,8 @@
   private SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> taskClient;
   private TaskQueue taskQueue;
   private String topic;
+  private String topicPattern;
+  private boolean multiTopic;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
   private StubServiceEmitter serviceEmitter;
   private SupervisorStateManagerConfig supervisorConfig;
@@ -174,7 +176,13 @@
   private static String getTopic()
   {
     //noinspection StringConcatenationMissingWhitespace
-    return TOPIC_PREFIX + topicPostfix++;
+    return TOPIC_PREFIX + topicPostfix;
+  }
+
+  private static String getTopicPattern()
+  {
+    //noinspection StringConcatenationMissingWhitespace
+    return TOPIC_PREFIX + topicPostfix + ".*";
   }
 
   @Parameterized.Parameters(name = "numThreads = {0}")
@@ -222,6 +230,9 @@
     taskQueue = createMock(TaskQueue.class);
 
     topic = getTopic();
+    topicPattern = getTopicPattern();
+    topicPostfix++;
+    multiTopic = false; // assign to true in test if you wish to test multi-topic
     rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
     serviceEmitter = new StubServiceEmitter("KafkaSupervisorTest", "localhost");
     EmittingLogger.registerEmitter(serviceEmitter);
@@ -906,11 +917,12 @@
   }
 
   /**
-   * Test generating the starting offsets from the partition data stored in druid_dataSource which contains the
-   * offsets of the last built segments.
+   * Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which
+   * contains the offsets from previous single-topic config of the last built segments, where the previous topic does
+   * match the new configuration.
    */
   @Test
-  public void testDatasourceMetadata() throws Exception
+  public void testDatasourceMetadataSingleTopicToSingleTopicMatch() throws Exception
   {
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
     addSomeEvents(100);
@@ -948,6 +960,203 @@
     );
   }
 
+  /**
+   * Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which
+   * contains the offsets from previous single-topic config of the last built segments, where the previous topic does
+   * match the new configuration.
+   */
+  @Test
+  public void testDatasourceMetadataSingleTopicToMultiTopicMatch() throws Exception
+  {
+    multiTopic = true;
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+    addSomeEvents(100);
+
+    Capture<KafkaIndexTask> captured = Capture.newInstance();
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+    // these should match
+    partitionSequenceNumberMap.putAll(singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
+    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of())
+        )
+    ).anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    KafkaIndexTask task = captured.getValue();
+    KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+    Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        10L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
+    );
+    Assert.assertEquals(
+        20L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
+    );
+    Assert.assertEquals(
+        30L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
+    );
+    Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+  }
+
+  /**
+   * Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which
+   * contains the offsets from previous single-topic config of the last built segments, where the previous topic does
+   * not match the new configuration.
+   */
+  @Test
+  public void testDatasourceMetadataSingleTopicToMultiTopicNotMatch() throws Exception
+  {
+    multiTopic = true;
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+    addSomeEvents(100);
+
+    Capture<KafkaIndexTask> captured = Capture.newInstance();
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+    // these should not match
+    partitionSequenceNumberMap.putAll(singlePartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
+    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            new SeekableStreamStartSequenceNumbers<>("notMatch", partitionSequenceNumberMap.build(), ImmutableSet.of())
+        )
+    ).anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    KafkaIndexTask task = captured.getValue();
+    KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+    Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        0L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
+    );
+    Assert.assertEquals(
+        0L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
+    );
+    Assert.assertEquals(
+        0L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
+    );
+    Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+  }
+
+  /**
+   * Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which
+   * contains the offsets from previous multi-topic config of the last built segments.
+   */
+  @Test
+  public void testDatasourceMetadataMultiTopicToSingleTopic() throws Exception
+  {
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+    addSomeEvents(100);
+
+    Capture<KafkaIndexTask> captured = Capture.newInstance();
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+    // these should match
+    partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
+    // these should not match
+    partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
+    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            new SeekableStreamStartSequenceNumbers<>(topicPattern, partitionSequenceNumberMap.build(), ImmutableSet.of())
+        )
+    ).anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    KafkaIndexTask task = captured.getValue();
+    KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+    Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        10L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue()
+    );
+    Assert.assertEquals(
+        20L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue()
+    );
+    Assert.assertEquals(
+        30L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue()
+    );
+    Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+  }
+
+  /**
+   * Test generating the starting offsets for muti-topic config from the partition data stored in druid_dataSource which
+   * contains the offsets from previous multi-topic config of the last built segments.
+   */
+  @Test
+  public void testDatasourceMetadataMultiTopicToMultiTopic() throws Exception
+  {
+    multiTopic = true;
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+    addSomeEvents(100);
+
+    Capture<KafkaIndexTask> captured = Capture.newInstance();
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+    // these should match
+    partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
+    // these should not match
+    partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
+    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of())
+        )
+    ).anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    KafkaIndexTask task = captured.getValue();
+    KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+    Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        10L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
+    );
+    Assert.assertEquals(
+        20L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
+    );
+    Assert.assertEquals(
+        30L,
+        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
+    );
+    Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+  }
+
   @Test
   public void testBadMetadataOffsets() throws Exception
   {
@@ -4621,8 +4830,8 @@
     consumerProperties.put("myCustomKey", "myCustomValue");
     consumerProperties.put("bootstrap.servers", kafkaHost);
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
-        topic,
-        null,
+        multiTopic ? null : topic,
+        multiTopic ? topicPattern : null,
         INPUT_FORMAT,
         replicas,
         taskCount,
@@ -5038,6 +5247,13 @@
                            offset2, new KafkaTopicPartition(false, topic, partition3), offset3);
   }
 
+  private static ImmutableMap<KafkaTopicPartition, Long> multiTopicPartitionMap(String topic, int partition1, long offset1,
+                                                                                int partition2, long offset2, int partition3, long offset3)
+  {
+    return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1), offset1, new KafkaTopicPartition(true, topic, partition2),
+        offset2, new KafkaTopicPartition(true, topic, partition3), offset3);
+  }
+
   private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
   {
     private final String taskType;
@@ -5109,7 +5325,7 @@
       Deserializer valueDeserializerObject = new ByteArrayDeserializer();
       return new KafkaRecordSupplier(
           new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject),
-          false
+          getIoConfig().isMultiTopic()
       );
     }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
index a95f4cc..de61d46 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
@@ -22,7 +22,6 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.java.util.common.IAE;
 
 import java.util.Collections;
 import java.util.Comparator;
@@ -47,6 +46,8 @@
 public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> implements
     SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
 {
+  public static final String TYPE = "end";
+
   // stream/topic
   private final String stream;
   // partitionId -> sequence number
@@ -126,13 +127,7 @@
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
   )
   {
-    if (this.getClass() != other.getClass()) {
-      throw new IAE(
-          "Expected instance of %s, got %s",
-          this.getClass().getName(),
-          other.getClass().getName()
-      );
-    }
+    validateSequenceNumbersBaseType(other);
 
     final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherEnd =
         (SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -151,13 +146,7 @@
   @Override
   public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
   {
-    if (this.getClass() != other.getClass()) {
-      throw new IAE(
-          "Expected instance of %s, got %s",
-          this.getClass().getName(),
-          other.getClass().getName()
-      );
-    }
+    validateSequenceNumbersBaseType(other);
 
     final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
         (SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -185,13 +174,7 @@
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
   )
   {
-    if (this.getClass() != other.getClass()) {
-      throw new IAE(
-          "Expected instance of %s, got %s",
-          this.getClass().getName(),
-          other.getClass().getName()
-      );
-    }
+    validateSequenceNumbersBaseType(other);
 
     final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherEnd =
         (SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
index 44f1343..ad1801e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
@@ -24,14 +24,15 @@
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.java.util.common.IAE;
 
 import java.util.Comparator;
 import java.util.Map;
 
 @JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = SeekableStreamEndSequenceNumbers.class)
 @JsonSubTypes({
-    @Type(name = "start", value = SeekableStreamStartSequenceNumbers.class),
-    @Type(name = "end", value = SeekableStreamEndSequenceNumbers.class)
+    @Type(name = SeekableStreamStartSequenceNumbers.TYPE, value = SeekableStreamStartSequenceNumbers.class),
+    @Type(name = SeekableStreamEndSequenceNumbers.TYPE, value = SeekableStreamEndSequenceNumbers.class)
 })
 public interface SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
 {
@@ -41,6 +42,29 @@
   String getStream();
 
   /**
+   * Returns whether the sequence number data is for possibly multiple streams / topics.
+   */
+  default boolean isMultiTopicPartition()
+  {
+    return false;
+  }
+
+  /**
+   * throws exception if this class and other class are not equal
+   * @param other the other instance to compare.
+   */
+  default void validateSequenceNumbersBaseType(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other)
+  {
+    if (this.getClass() != other.getClass()) {
+      throw new IAE(
+          "Expected instance of %s, got %s",
+          this.getClass().getName(),
+          other.getClass().getName()
+      );
+    }
+  }
+
+  /**
    * Returns a map of partitionId -> sequenceNumber.
    */
   Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
index b1a72ff..7d3eab9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
@@ -22,7 +22,6 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.java.util.common.IAE;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -42,6 +41,8 @@
 public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> implements
     SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
 {
+  public static final String TYPE = "start";
+
   // stream/topic
   private final String stream;
   // partitionId -> sequence number
@@ -121,13 +122,7 @@
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
   )
   {
-    if (this.getClass() != other.getClass()) {
-      throw new IAE(
-          "Expected instance of %s, got %s",
-          this.getClass().getName(),
-          other.getClass().getName()
-      );
-    }
+    validateSequenceNumbersBaseType(other);
 
     final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
         (SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -165,13 +160,7 @@
   @Override
   public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
   {
-    if (this.getClass() != other.getClass()) {
-      throw new IAE(
-          "Expected instance of %s, got %s",
-          this.getClass().getName(),
-          other.getClass().getName()
-      );
-    }
+    validateSequenceNumbersBaseType(other);
 
     final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
         (SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -199,13 +188,7 @@
       SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
   )
   {
-    if (this.getClass() != other.getClass()) {
-      throw new IAE(
-          "Expected instance of %s, got %s",
-          this.getClass().getName(),
-          other.getClass().getName()
-      );
-    }
+    validateSequenceNumbersBaseType(other);
 
     final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
         (SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 59cd05b..f15a975 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3863,10 +3863,9 @@
     }
   }
 
-  private Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage()
+  protected Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage()
   {
-    final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
-        dataSource);
+    final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
     if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
         && checkSourceMetadataMatch(dataSourceMetadata)) {
       @SuppressWarnings("unchecked")
@@ -3889,6 +3888,11 @@
     return Collections.emptyMap();
   }
 
+  protected DataSourceMetadata retrieveDataSourceMetadata()
+  {
+    return indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+  }
+
   /**
    * Fetches the earliest or latest offset from the stream via the {@link RecordSupplier}
    */