Kafka with topicPattern can ignore old offsets spuriously (#16190)
* * fix
* * simplify
* * simplify tests
* * update matches function definition for Kafka Datasource Metadata
* * add matchesOld
* * override matches and plus for kafka based metadata / sequence numbers
* * implement minus
* add tests
* * fix failing tests
* * remove TODO comments
* * simplfy and add comments
* * remove unused variable in tests
* * remove unneeded function
* * add serde tests
* * more stuff
* * address review comments
* * remove unneeded code.
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
index a7b73f4..cfa3080 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
@@ -26,19 +26,41 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CollectionUtils;
+import org.apache.kafka.common.TopicPartition;
import java.util.Comparator;
+import java.util.Map;
public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long> implements Comparable<KafkaDataSourceMetadata>
{
+ private static final Logger LOGGER = new Logger(KafkaDataSourceMetadata.class);
@JsonCreator
public KafkaDataSourceMetadata(
@JsonProperty("partitions") SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> kafkaPartitions
)
{
- super(kafkaPartitions);
+ super(kafkaPartitions == null
+ ? null
+ : kafkaPartitions instanceof SeekableStreamStartSequenceNumbers
+ ?
+ new KafkaSeekableStreamStartSequenceNumbers(
+ kafkaPartitions.getStream(),
+ ((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getTopic(),
+ kafkaPartitions.getPartitionSequenceNumberMap(),
+ ((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getPartitionOffsetMap(),
+ ((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getExclusivePartitions()
+ )
+ : new KafkaSeekableStreamEndSequenceNumbers(
+ kafkaPartitions.getStream(),
+ ((SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getTopic(),
+ kafkaPartitions.getPartitionSequenceNumberMap(),
+ ((SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getPartitionOffsetMap()
+ ));
}
@Override
@@ -76,4 +98,71 @@
}
return getSeekableStreamSequenceNumbers().compareTo(other.getSeekableStreamSequenceNumbers(), Comparator.naturalOrder());
}
+
+ @Override
+ public boolean matches(DataSourceMetadata other)
+ {
+ if (!getClass().equals(other.getClass())) {
+ return false;
+ }
+ KafkaDataSourceMetadata thisPlusOther = (KafkaDataSourceMetadata) plus(other);
+ if (thisPlusOther.equals(other.plus(this))) {
+ return true;
+ }
+
+ // check that thisPlusOther contains all metadata from other, and that there is no inconsistency or loss
+ KafkaDataSourceMetadata otherMetadata = (KafkaDataSourceMetadata) other;
+ final SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> otherSequenceNumbers = otherMetadata.getSeekableStreamSequenceNumbers();
+ if (!getSeekableStreamSequenceNumbers().isMultiTopicPartition() && !otherSequenceNumbers.isMultiTopicPartition()) {
+ return false;
+ }
+ final SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> mergedSequenceNumbers = thisPlusOther.getSeekableStreamSequenceNumbers();
+
+ final Map<TopicPartition, Long> topicAndPartitionToSequenceNumber = CollectionUtils.mapKeys(
+ mergedSequenceNumbers.getPartitionSequenceNumberMap(),
+ k -> k.asTopicPartition(mergedSequenceNumbers.getStream())
+ );
+
+ boolean allOtherFoundAndConsistent = otherSequenceNumbers.getPartitionSequenceNumberMap().entrySet().stream().noneMatch(
+ e -> {
+ KafkaTopicPartition kafkaTopicPartition = e.getKey();
+ TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(otherSequenceNumbers.getStream());
+ Long sequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition);
+ long oldSequenceOffset = e.getValue();
+ if (sequenceOffset == null || !sequenceOffset.equals(oldSequenceOffset)) {
+ LOGGER.info(
+ "sequenceOffset found for currently computed and stored metadata does not match for "
+ + "topicPartition: [%s]. currentSequenceOffset: [%s], oldSequenceOffset: [%s]",
+ topicPartition,
+ sequenceOffset,
+ oldSequenceOffset
+ );
+ return true;
+ }
+ return false;
+ }
+ );
+
+ boolean allThisFoundAndConsistent = this.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().entrySet().stream().noneMatch(
+ e -> {
+ KafkaTopicPartition kafkaTopicPartition = e.getKey();
+ TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(this.getSeekableStreamSequenceNumbers().getStream());
+ Long oldSequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition);
+ long sequenceOffset = e.getValue();
+ if (oldSequenceOffset == null || !oldSequenceOffset.equals(sequenceOffset)) {
+ LOGGER.info(
+ "sequenceOffset found for currently computed and stored metadata does not match for "
+ + "topicPartition: [%s]. currentSequenceOffset: [%s], oldSequenceOffset: [%s]",
+ topicPartition,
+ sequenceOffset,
+ oldSequenceOffset
+ );
+ return true;
+ }
+ return false;
+ }
+ );
+
+ return allOtherFoundAndConsistent && allThisFoundAndConsistent;
+ }
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java
new file mode 100644
index 0000000..5476f88
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Represents the kafka based end sequenceNumber per partition of a sequence. This class is needed because
+ * of special handling that must be done for multi-topic partitions to ensure that offsets are preserved.
+ * <p>
+ * Do not register this class as a subtype of base class in Jackson. We want this class to be serialized
+ * when written to DB as a {@link SeekableStreamEndSequenceNumbers}. Do not create instances of this class
+ * directly from jackson mapper.
+ */
+@JsonTypeName(SeekableStreamEndSequenceNumbers.TYPE)
+public class KafkaSeekableStreamEndSequenceNumbers extends SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>
+{
+ private final boolean isMultiTopicPartition;
+
+ public KafkaSeekableStreamEndSequenceNumbers(
+ final String stream,
+ // kept for backward compatibility
+ final String topic,
+ final Map<KafkaTopicPartition, Long> partitionSequenceNumberMap,
+ // kept for backward compatibility
+ final Map<KafkaTopicPartition, Long> partitionOffsetMap
+ )
+ {
+ super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap);
+ // how to know it topicPattern if the partitionSequenceNumberMap is empty?
+ isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet()
+ .stream()
+ .findFirst()
+ .get()
+ .isMultiTopicPartition();
+ }
+
+ @Override
+ public boolean isMultiTopicPartition()
+ {
+ return isMultiTopicPartition;
+ }
+
+ @Override
+ public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> plus(
+ SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+ )
+ {
+ validateSequenceNumbersBaseType(other);
+
+ KafkaSeekableStreamEndSequenceNumbers that = (KafkaSeekableStreamEndSequenceNumbers) other;
+
+ if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) {
+ return super.plus(other);
+ }
+
+ String thisTopic = getStream();
+ String thatTopic = that.getStream();
+ final Map<KafkaTopicPartition, Long> newMap;
+ if (!isMultiTopicPartition()) {
+ // going from topicPattern to single topic
+
+ // start with existing sequence numbers which in this case will be all single topic.
+ newMap = new HashMap<>(getPartitionSequenceNumberMap());
+
+ // add all sequence numbers from other where the topic name matches this topic. Transform to single topic
+ // as in this case we will be returning a single topic based sequence map.
+ newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+ .filter(e -> {
+ if (e.getKey().topic().isPresent()) {
+ return e.getKey().topic().get().equals(thisTopic);
+ } else {
+ // this branch shouldn't really be hit since other should be multi-topic here, but adding this
+ // just in case.
+ return thatTopic.equals(thisTopic);
+ }
+ })
+ .collect(Collectors.toMap(
+ e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()),
+ Map.Entry::getValue
+ )));
+ } else {
+ // going from single topic or topicPattern to topicPattern
+
+ // start with existing sequence numbers and transform them to multit-topic keys, as the returned
+ // sequence numbers will be multi-topic based.
+ newMap = CollectionUtils.mapKeys(
+ getPartitionSequenceNumberMap(),
+ k -> new KafkaTopicPartition(
+ true,
+ k.asTopicPartition(thisTopic).topic(),
+ k.partition()
+ )
+ );
+
+ // add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to
+ // multi-topic as in this case we will be returning a multi-topic based sequence map.
+ Pattern pattern = Pattern.compile(thisTopic);
+ newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+ .filter(e -> {
+ if (e.getKey().topic().isPresent()) {
+ return pattern.matcher(e.getKey().topic().get()).matches();
+ } else {
+ return pattern.matcher(thatTopic).matches();
+ }
+ })
+ .collect(Collectors.toMap(
+ e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()),
+ Map.Entry::getValue
+ )));
+ }
+
+ return new SeekableStreamEndSequenceNumbers<>(getStream(), newMap);
+ }
+
+ @Override
+ public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> minus(
+ SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+ )
+ {
+ validateSequenceNumbersBaseType(other);
+
+ final KafkaSeekableStreamEndSequenceNumbers otherEnd =
+ (KafkaSeekableStreamEndSequenceNumbers) other;
+
+ if (!this.isMultiTopicPartition() && !otherEnd.isMultiTopicPartition()) {
+ return super.minus(other);
+ }
+
+ final Map<KafkaTopicPartition, Long> newMap = new HashMap<>();
+ //String thisTopic = getStream();
+ String thatTopic = otherEnd.getStream();
+
+
+ // remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match
+ for (Map.Entry<KafkaTopicPartition, Long> entry : getPartitionSequenceNumberMap().entrySet()) {
+ String thisTopic = entry.getKey().asTopicPartition(getStream()).topic();
+ boolean otherContainsThis = otherEnd.getPartitionSequenceNumberMap().containsKey(entry.getKey());
+ boolean otherContainsThisMultiTopic = otherEnd.getPartitionSequenceNumberMap()
+ .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition()));
+ boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherEnd.getPartitionSequenceNumberMap()
+ .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition())));
+ if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) {
+ newMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return new SeekableStreamEndSequenceNumbers<>(
+ getStream(),
+ newMap
+ );
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java
new file mode 100644
index 0000000..24a3e08
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Represents the kafka based start sequenceNumber per partition of a sequence. This class is needed because
+ * of special handling that must be done for multi-topic partitions to ensure that offsets are preserved.
+ * <p>
+ * Do not register this class as a subtype of base class in Jackson. We want this class to be serialized
+ * when written to DB as a {@link SeekableStreamStartSequenceNumbers}. Do not create instances of this class
+ * directly from jackson mapper.
+ */
+@JsonTypeName(SeekableStreamStartSequenceNumbers.TYPE)
+public class KafkaSeekableStreamStartSequenceNumbers extends SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>
+{
+ private final boolean isMultiTopicPartition;
+
+ public KafkaSeekableStreamStartSequenceNumbers(
+ String stream,
+ String topic,
+ Map<KafkaTopicPartition, Long> partitionSequenceNumberMap,
+ Map<KafkaTopicPartition, Long> partitionOffsetMap,
+ @Nullable Set<KafkaTopicPartition> exclusivePartitions
+ )
+ {
+ super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap, exclusivePartitions);
+ // how to know it topicPattern if the partitionSequenceNumberMap is empty?
+ isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet()
+ .stream()
+ .findFirst()
+ .get()
+ .isMultiTopicPartition();
+ }
+
+ @Override
+ public boolean isMultiTopicPartition()
+ {
+ return isMultiTopicPartition;
+ }
+
+ @Override
+ public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> plus(
+ SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+ )
+ {
+ validateSequenceNumbersBaseType(other);
+
+ KafkaSeekableStreamStartSequenceNumbers that = (KafkaSeekableStreamStartSequenceNumbers) other;
+
+ if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) {
+ return super.plus(other);
+ }
+
+
+ String thisTopic = getStream();
+ String thatTopic = that.getStream();
+ final Map<KafkaTopicPartition, Long> newMap;
+ final Set<KafkaTopicPartition> newExclusivePartitions = new HashSet<>();
+ if (!isMultiTopicPartition()) {
+ // going from topicPattern to single topic
+
+ // start with existing sequence numbers which in this case will be all single topic.
+ newMap = new HashMap<>(getPartitionSequenceNumberMap());
+
+ // add all sequence numbers from other where the topic name matches this topic. Transform to single topic
+ // as in this case we will be returning a single topic based sequence map.
+ newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+ .filter(e -> {
+ if (e.getKey().topic().isPresent()) {
+ return e.getKey().topic().get().equals(thisTopic);
+ } else {
+ // this branch shouldn't really be hit since other should be multi-topic here, but adding this
+ // just in case.
+ return thatTopic.equals(thisTopic);
+ }
+ })
+ .collect(Collectors.toMap(
+ e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()),
+ Map.Entry::getValue
+ )));
+
+ // A partition is exclusive if it's
+ // 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or
+ // 2) exclusive in "other"
+ getPartitionSequenceNumberMap().forEach(
+ (partitionId, sequenceOffset) -> {
+ KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition());
+ if (getExclusivePartitions().contains(partitionId) && !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch)) {
+ newExclusivePartitions.add(new KafkaTopicPartition(false, this.getStream(), partitionId.partition()));
+ }
+ }
+ );
+ newExclusivePartitions.addAll(that.getExclusivePartitions());
+ } else {
+ // going from single topic or topicPattern to topicPattern
+
+ // start with existing sequence numbers and transform them to multit-topic keys, as the returned
+ // sequence numbers will be multi-topic based.
+ newMap = CollectionUtils.mapKeys(
+ getPartitionSequenceNumberMap(),
+ k -> new KafkaTopicPartition(
+ true,
+ k.asTopicPartition(thisTopic).topic(),
+ k.partition()
+ )
+ );
+
+ // add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to
+ // multi-topic as in this case we will be returning a multi-topic based sequence map.
+ Pattern pattern = Pattern.compile(thisTopic);
+ newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
+ .filter(e -> {
+ if (e.getKey().topic().isPresent()) {
+ return pattern.matcher(e.getKey().topic().get()).matches();
+ } else {
+ return pattern.matcher(thatTopic).matches();
+ }
+ })
+ .collect(Collectors.toMap(
+ e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()),
+ Map.Entry::getValue
+ )));
+
+ // A partition is exclusive if it's
+ // 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or
+ // 2) exclusive in "other"
+ getPartitionSequenceNumberMap().forEach(
+ (partitionId, sequenceOffset) -> {
+ KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition());
+ boolean thatTopicMatchesThisTopicPattern = partitionId.topic().isPresent() ? pattern.matcher(partitionId.topic().get()).matches() : pattern.matcher(thatTopic).matches();
+ if (getExclusivePartitions().contains(partitionId) && (!thatTopicMatchesThisTopicPattern || !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch))) {
+ newExclusivePartitions.add(new KafkaTopicPartition(true, this.getStream(), partitionId.partition()));
+ }
+ }
+ );
+ newExclusivePartitions.addAll(that.getExclusivePartitions());
+ }
+
+ return new SeekableStreamStartSequenceNumbers<>(getStream(), newMap, newExclusivePartitions);
+ }
+
+ @Override
+ public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> minus(
+ SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
+ )
+ {
+ validateSequenceNumbersBaseType(other);
+
+ final KafkaSeekableStreamStartSequenceNumbers otherStart =
+ (KafkaSeekableStreamStartSequenceNumbers) other;
+
+ if (!this.isMultiTopicPartition() && !otherStart.isMultiTopicPartition()) {
+ return super.minus(other);
+ }
+
+ final Map<KafkaTopicPartition, Long> newMap = new HashMap<>();
+ final Set<KafkaTopicPartition> newExclusivePartitions = new HashSet<>();
+ String thatTopic = otherStart.getStream();
+
+ // remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match
+ for (Map.Entry<KafkaTopicPartition, Long> entry : getPartitionSequenceNumberMap().entrySet()) {
+ String thisTopic = entry.getKey().asTopicPartition(getStream()).topic();
+ boolean otherContainsThis = otherStart.getPartitionSequenceNumberMap().containsKey(entry.getKey());
+ boolean otherContainsThisMultiTopic = otherStart.getPartitionSequenceNumberMap()
+ .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition()));
+ boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherStart.getPartitionSequenceNumberMap()
+ .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition())));
+ if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) {
+ newMap.put(entry.getKey(), entry.getValue());
+ // A partition is exclusive if it's exclusive in "this" and not in "other"'s partitionSequenceNumberMap
+ if (getExclusivePartitions().contains(entry.getKey())) {
+ newExclusivePartitions.add(entry.getKey());
+ }
+ }
+ }
+
+ return new SeekableStreamStartSequenceNumbers<>(
+ getStream(),
+ newMap,
+ newExclusivePartitions
+ );
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index e9c6b83..aebacec 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -44,6 +44,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -63,12 +64,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -92,6 +95,7 @@
private final ServiceEmitter emitter;
private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
+ private final Pattern pattern;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
@@ -122,6 +126,7 @@
this.spec = spec;
this.emitter = spec.getEmitter();
this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
+ this.pattern = getIoConfig().isMultiTopic() ? Pattern.compile(getIoConfig().getStream()) : null;
}
@@ -444,4 +449,79 @@
{
return spec.getTuningConfig();
}
+
+ protected boolean isMultiTopic()
+ {
+ return getIoConfig().isMultiTopic() && pattern != null;
+ }
+
+ /**
+ * Gets the offsets as stored in the metadata store. The map returned will only contain
+ * offsets from topic partitions that match the current supervisor config stream. This
+ * override is needed because in the case of multi-topic, a user could have updated the supervisor
+ * config from single topic to mult-topic, where the new multi-topic pattern regex matches the
+ * old config single topic. Without this override, the previously stored metadata for the single
+ * topic would be deemed as different from the currently configure stream, and not be included in
+ * the offset map returned. This implementation handles these cases appropriately.
+ *
+ * @return the previoulsy stored offsets from metadata storage, possibly updated with offsets removed
+ * for topics that do not match the currently configured supervisor topic. Topic partition keys may also be
+ * updated to single topic or multi-topic depending on the supervisor config, as needed.
+ */
+ @Override
+ protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+ {
+ final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
+ if (checkSourceMetadataMatch(dataSourceMetadata)) {
+ @SuppressWarnings("unchecked")
+ SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata)
+ .getSeekableStreamSequenceNumbers();
+ if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) {
+ Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
+ Set<String> topicMisMatchLogged = new HashSet<>();
+ partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> {
+ final String matchValue;
+ // previous offsets are from multi-topic config
+ if (kafkaTopicPartition.topic().isPresent()) {
+ matchValue = kafkaTopicPartition.topic().get();
+ } else {
+ // previous offsets are from single topic config
+ matchValue = partitions.getStream();
+ }
+
+ KafkaTopicPartition matchingTopicPartition = getMatchingKafkaTopicPartition(kafkaTopicPartition, matchValue);
+
+ if (matchingTopicPartition == null && !topicMisMatchLogged.contains(matchValue)) {
+ log.warn(
+ "Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences",
+ matchValue,
+ getIoConfig().getStream()
+ );
+ topicMisMatchLogged.add(matchValue);
+ }
+ if (matchingTopicPartition != null) {
+ partitionOffsets.put(matchingTopicPartition, value);
+ }
+ });
+ return partitionOffsets;
+ }
+ }
+
+ return Collections.emptyMap();
+ }
+
+ @Nullable
+ private KafkaTopicPartition getMatchingKafkaTopicPartition(
+ final KafkaTopicPartition kafkaTopicPartition,
+ final String streamMatchValue
+ )
+ {
+ final boolean match;
+
+ match = pattern != null
+ ? pattern.matcher(streamMatchValue).matches()
+ : getIoConfig().getStream().equals(streamMatchValue);
+
+ return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null;
+ }
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
index 800a1fe..0b013b6 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
@@ -36,17 +37,34 @@
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
public class KafkaDataSourceMetadataTest
{
- private static final KafkaDataSourceMetadata START0 = startMetadata(ImmutableMap.of());
- private static final KafkaDataSourceMetadata START1 = startMetadata(ImmutableMap.of(0, 2L, 1, 3L));
- private static final KafkaDataSourceMetadata START2 = startMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
- private static final KafkaDataSourceMetadata START3 = startMetadata(ImmutableMap.of(0, 2L, 2, 5L));
- private static final KafkaDataSourceMetadata END0 = endMetadata(ImmutableMap.of());
- private static final KafkaDataSourceMetadata END1 = endMetadata(ImmutableMap.of(0, 2L, 2, 5L));
- private static final KafkaDataSourceMetadata END2 = endMetadata(ImmutableMap.of(0, 2L, 1, 4L));
+ private static final KafkaDataSourceMetadata START0 = startMetadata("foo", ImmutableMap.of());
+ private static final KafkaDataSourceMetadata START1 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
+ private static final KafkaDataSourceMetadata START2 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
+ private static final KafkaDataSourceMetadata START3 = startMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata START4 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
+ private static final KafkaDataSourceMetadata START5 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 3L));
+ private static final KafkaDataSourceMetadata START6 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L));
+ private static final KafkaDataSourceMetadata START7 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata START8 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata START9 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata END0 = endMetadata("foo", ImmutableMap.of());
+ private static final KafkaDataSourceMetadata END1 = endMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata END2 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L));
+ private static final KafkaDataSourceMetadata END3 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
+ private static final KafkaDataSourceMetadata END4 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
+ private static final KafkaDataSourceMetadata END5 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata END6 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L));
+ private static final KafkaDataSourceMetadata END7 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata END8 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 4L));
+ private static final KafkaDataSourceMetadata END9 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
+ private static final KafkaDataSourceMetadata END10 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
@Test
public void testMatches()
@@ -55,33 +73,332 @@
Assert.assertTrue(START0.matches(START1));
Assert.assertTrue(START0.matches(START2));
Assert.assertTrue(START0.matches(START3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior
+ Assert.assertFalse(START0.matches(START4));
+ Assert.assertTrue(START0.matches(START5));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(START0.matches(START6));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(START0.matches(START7));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(START0.matches(START8));
+ // when merging, we lose the sequence numbers for topics foo2, and foo22 here when merging, so false
+ Assert.assertFalse(START0.matches(START9));
Assert.assertTrue(START1.matches(START0));
Assert.assertTrue(START1.matches(START1));
+ // sequence numbers for topic foo partition 1 are inconsistent, so false
Assert.assertFalse(START1.matches(START2));
Assert.assertTrue(START1.matches(START3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertTrue(START1.matches(START5));
+ // when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(START1.matches(START6));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(START1.matches(START7));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(START1.matches(START8));
+ // when merging, we lose the sequence numbers for topics foo2 and foo22, so false
+ Assert.assertFalse(START1.matches(START9));
Assert.assertTrue(START2.matches(START0));
Assert.assertFalse(START2.matches(START1));
Assert.assertTrue(START2.matches(START2));
Assert.assertTrue(START2.matches(START3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START2.matches(START4));
+ // when merging, sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(START2.matches(START5));
+ // when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(START2.matches(START6));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(START2.matches(START7));
Assert.assertTrue(START3.matches(START0));
Assert.assertTrue(START3.matches(START1));
Assert.assertTrue(START3.matches(START2));
Assert.assertTrue(START3.matches(START3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START3.matches(START4));
+ Assert.assertTrue(START3.matches(START5));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(START3.matches(START6));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(START3.matches(START7));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(START3.matches(START8));
+ // when merging, we lose the sequence numbers for topics foo2 and foo22, so false
+ Assert.assertFalse(START3.matches(START9));
+
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START0));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START1));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START2));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START3));
+ Assert.assertTrue(START4.matches(START4));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START5));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START6));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START7));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START8));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(START4.matches(START9));
+
+ Assert.assertTrue(START5.matches(START0));
+ Assert.assertTrue(START5.matches(START1));
+ // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(START5.matches(START2));
+ Assert.assertTrue(START5.matches(START3));
+ Assert.assertTrue(START5.matches(START4));
+ Assert.assertTrue(START5.matches(START5));
+ Assert.assertTrue(START5.matches(START6));
+ Assert.assertTrue(START5.matches(START7));
+ Assert.assertTrue(START5.matches(START8));
+ Assert.assertTrue(START5.matches(START9));
+
+ Assert.assertTrue(START6.matches(START0));
+ Assert.assertTrue(START6.matches(START1));
+ // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(START6.matches(START2));
+ Assert.assertTrue(START6.matches(START3));
+ Assert.assertTrue(START6.matches(START4));
+ Assert.assertTrue(START6.matches(START5));
+ Assert.assertTrue(START6.matches(START6));
+ Assert.assertTrue(START6.matches(START7));
+ Assert.assertTrue(START6.matches(START8));
+ Assert.assertTrue(START6.matches(START9));
+
+ Assert.assertTrue(START7.matches(START0));
+ Assert.assertTrue(START7.matches(START1));
+ Assert.assertTrue(START7.matches(START2));
+ Assert.assertTrue(START7.matches(START3));
+ Assert.assertTrue(START7.matches(START4));
+ Assert.assertTrue(START7.matches(START5));
+ Assert.assertTrue(START7.matches(START6));
+ Assert.assertTrue(START7.matches(START7));
+ Assert.assertTrue(START7.matches(START8));
+ Assert.assertTrue(START7.matches(START9));
+
+ Assert.assertTrue(START8.matches(START0));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START8.matches(START1));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START8.matches(START2));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START8.matches(START3));
+ Assert.assertTrue(START8.matches(START4));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START8.matches(START5));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START8.matches(START6));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START8.matches(START7));
+ Assert.assertTrue(START8.matches(START8));
+ Assert.assertTrue(START8.matches(START9));
+
+ Assert.assertTrue(START9.matches(START0));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START9.matches(START1));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START9.matches(START2));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START9.matches(START3));
+ Assert.assertTrue(START9.matches(START4));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START9.matches(START5));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START9.matches(START6));
+ // when merging, we lose the sequence numbers for topic foo, so false
+ Assert.assertFalse(START9.matches(START7));
+ Assert.assertTrue(START9.matches(START8));
+ Assert.assertTrue(START9.matches(START9));
Assert.assertTrue(END0.matches(END0));
Assert.assertTrue(END0.matches(END1));
Assert.assertTrue(END0.matches(END2));
+ Assert.assertTrue(END0.matches(END3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END0.matches(END4));
+ Assert.assertTrue(END0.matches(END5));
+ Assert.assertTrue(END0.matches(END6));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(END0.matches(END7));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(END0.matches(END8));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(END0.matches(END9));
+ // when merging, we lose the sequence numbers for topic foo2, so false
+ Assert.assertFalse(END0.matches(END10));
Assert.assertTrue(END1.matches(END0));
Assert.assertTrue(END1.matches(END1));
Assert.assertTrue(END1.matches(END2));
+ Assert.assertTrue(END1.matches(END3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END1.matches(END4));
+ Assert.assertTrue(END1.matches(END5));
+ Assert.assertTrue(END1.matches(END6));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END1.matches(END7));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END1.matches(END8));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END1.matches(END9));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END1.matches(END10));
Assert.assertTrue(END2.matches(END0));
Assert.assertTrue(END2.matches(END1));
Assert.assertTrue(END2.matches(END2));
+ // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(END2.matches(END3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END2.matches(END4));
+ Assert.assertTrue(END2.matches(END5));
+ Assert.assertTrue(END2.matches(END6));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END2.matches(END7));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END2.matches(END8));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END2.matches(END9));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END2.matches(END10));
+
+ Assert.assertTrue(END3.matches(END0));
+ Assert.assertTrue(END3.matches(END1));
+ // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(END3.matches(END2));
+ Assert.assertTrue(END3.matches(END3));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END3.matches(END4));
+ Assert.assertTrue(END3.matches(END5));
+ // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(END3.matches(END6));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END3.matches(END7));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END3.matches(END8));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END3.matches(END9));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END3.matches(END10));
+
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END0));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END1));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END2));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END3));
+ Assert.assertTrue(END4.matches(END4));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END5));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END6));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END7));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END8));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END9));
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertFalse(END4.matches(END10));
+
+ Assert.assertTrue(END5.matches(END0));
+ Assert.assertTrue(END5.matches(END1));
+ Assert.assertTrue(END5.matches(END2));
+ Assert.assertTrue(END5.matches(END3));
+ Assert.assertTrue(END5.matches(END4));
+ Assert.assertTrue(END5.matches(END5));
+ Assert.assertTrue(END5.matches(END6));
+ Assert.assertTrue(END5.matches(END7));
+ Assert.assertTrue(END5.matches(END8));
+ Assert.assertTrue(END5.matches(END9));
+ Assert.assertTrue(END5.matches(END10));
+
+ Assert.assertTrue(END6.matches(END0));
+ Assert.assertTrue(END6.matches(END1));
+ Assert.assertTrue(END6.matches(END2));
+ // when merging, the sequence numbers for topic foo-1 are inconsistent, so false
+ Assert.assertFalse(END6.matches(END3));
+ Assert.assertTrue(END6.matches(END4));
+ Assert.assertTrue(END6.matches(END5));
+ Assert.assertTrue(END6.matches(END6));
+ Assert.assertTrue(END6.matches(END7));
+ Assert.assertTrue(END6.matches(END8));
+ Assert.assertTrue(END6.matches(END9));
+ Assert.assertTrue(END6.matches(END10));
+
+ Assert.assertTrue(END7.matches(END0));
+ Assert.assertTrue(END7.matches(END1));
+ Assert.assertTrue(END7.matches(END2));
+ Assert.assertTrue(END7.matches(END3));
+ Assert.assertTrue(END7.matches(END4));
+ Assert.assertTrue(END7.matches(END5));
+ Assert.assertTrue(END7.matches(END6));
+ Assert.assertTrue(END7.matches(END7));
+ Assert.assertTrue(END7.matches(END8));
+ Assert.assertTrue(END7.matches(END9));
+ Assert.assertTrue(END7.matches(END10));
+
+ Assert.assertTrue(END8.matches(END0));
+ Assert.assertTrue(END8.matches(END1));
+ Assert.assertTrue(END8.matches(END2));
+ // when merging, the sequence numbers for topic foo-1, and foo-2 are inconsistent, so false
+ Assert.assertFalse(END8.matches(END3));
+ Assert.assertTrue(END8.matches(END4));
+ Assert.assertTrue(END8.matches(END5));
+ Assert.assertTrue(END8.matches(END6));
+ Assert.assertTrue(END8.matches(END7));
+ Assert.assertTrue(END8.matches(END8));
+ Assert.assertTrue(END8.matches(END9));
+ Assert.assertTrue(END8.matches(END10));
+
+ Assert.assertTrue(END9.matches(END0));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END9.matches(END1));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END9.matches(END2));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END9.matches(END3));
+ Assert.assertTrue(END9.matches(END4));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END9.matches(END5));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END9.matches(END6));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END9.matches(END7));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END9.matches(END8));
+ Assert.assertTrue(END9.matches(END9));
+ Assert.assertTrue(END9.matches(END10));
+
+ Assert.assertTrue(END10.matches(END0));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END10.matches(END1));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END10.matches(END2));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END10.matches(END3));
+ Assert.assertTrue(END10.matches(END4));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END10.matches(END5));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END10.matches(END6));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END10.matches(END7));
+ // when merging, we lose the sequence numbers for topic foo2 here when merging, so false
+ Assert.assertFalse(END10.matches(END8));
+ Assert.assertTrue(END10.matches(END9));
+ Assert.assertTrue(END10.matches(END10));
}
@Test
@@ -91,6 +408,12 @@
Assert.assertTrue(START1.isValidStart());
Assert.assertTrue(START2.isValidStart());
Assert.assertTrue(START3.isValidStart());
+ Assert.assertTrue(START4.isValidStart());
+ Assert.assertTrue(START5.isValidStart());
+ Assert.assertTrue(START6.isValidStart());
+ Assert.assertTrue(START7.isValidStart());
+ Assert.assertTrue(START8.isValidStart());
+ Assert.assertTrue(START9.isValidStart());
}
@Test
@@ -121,6 +444,85 @@
START2.plus(START2)
);
+ // add comment on this
+ Assert.assertEquals(
+ START4,
+ START1.plus(START4)
+ );
+
+ Assert.assertEquals(
+ startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
+ START1.plus(START5)
+ );
+
+ Assert.assertEquals(
+ startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
+ START1.plus(START6)
+ );
+
+ Assert.assertEquals(
+ startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+ START1.plus(START7)
+ );
+
+ Assert.assertEquals(
+ startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+ START2.plus(START6)
+ );
+
+ // add comment on this
+ Assert.assertEquals(
+ START0,
+ START4.plus(START0)
+ );
+
+ // add comment on this
+ Assert.assertEquals(
+ START1,
+ START4.plus(START1)
+ );
+
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertEquals(
+ START4,
+ START4.plus(START5)
+ );
+
+ Assert.assertEquals(
+ START5,
+ START5.plus(START4)
+ );
+
+ Assert.assertEquals(
+ startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L)),
+ START5.plus(START6)
+ );
+
+ Assert.assertEquals(
+ startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+ START7.plus(START8)
+ );
+
+ Assert.assertEquals(
+ startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+ START7.plus(START9)
+ );
+
+ Assert.assertEquals(
+ startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+ START8.plus(START7)
+ );
+
+ Assert.assertEquals(
+ startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+ START8.plus(START9)
+ );
+
+ Assert.assertEquals(
+ startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+ START9.plus(START8)
+ );
+
Assert.assertEquals(
endMetadata(ImmutableMap.of(0, 2L, 2, 5L)),
END0.plus(END1)
@@ -130,27 +532,137 @@
endMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
END1.plus(END2)
);
+
+ // add comment on this
+ Assert.assertEquals(
+ END4,
+ END0.plus(END4)
+ );
+
+ // add comment on this
+ Assert.assertEquals(
+ END4,
+ END1.plus(END4)
+ );
+
+ // add comment on this
+ Assert.assertEquals(
+ END0,
+ END4.plus(END0)
+ );
+
+ // add comment on this
+ Assert.assertEquals(
+ END1,
+ END4.plus(END1)
+ );
+
+ Assert.assertEquals(
+ END3,
+ END2.plus(END3)
+ );
+
+ Assert.assertEquals(
+ END2,
+ END3.plus(END2)
+ );
+
+ // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
+ Assert.assertEquals(
+ END4,
+ END4.plus(END5)
+ );
+
+ Assert.assertEquals(
+ END5,
+ END5.plus(END4)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END5.plus(END9)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END5.plus(END10)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+ END5.plus(END6)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END5.plus(END7)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END7.plus(END5)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END9.plus(END5)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END9.plus(END10)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END10.plus(END9)
+ );
}
@Test
public void testMinus()
{
Assert.assertEquals(
- startMetadata(ImmutableMap.of(1, 3L)),
- START1.minus(START3)
- );
-
- Assert.assertEquals(
startMetadata(ImmutableMap.of()),
START0.minus(START2)
);
Assert.assertEquals(
+ START0,
+ START0.minus(START4)
+ );
+
+ Assert.assertEquals(
startMetadata(ImmutableMap.of()),
START1.minus(START2)
);
Assert.assertEquals(
+ startMetadata(ImmutableMap.of(1, 3L)),
+ START1.minus(START3)
+ );
+
+ Assert.assertEquals(
+ START1,
+ START1.minus(START4)
+ );
+
+ Assert.assertEquals(
+ startMetadata("foo", ImmutableMap.of()),
+ START1.minus(START5)
+ );
+
+ Assert.assertEquals(
+ startMetadata("foo", ImmutableMap.of()),
+ START1.minus(START6)
+ );
+
+ Assert.assertEquals(
+ startMetadata("foo", ImmutableMap.of(1, 3L)),
+ START1.minus(START7)
+ );
+
+ Assert.assertEquals(
startMetadata(ImmutableMap.of(2, 5L)),
START2.minus(START1)
);
@@ -161,6 +673,21 @@
);
Assert.assertEquals(
+ START4,
+ START4.minus(START0)
+ );
+
+ Assert.assertEquals(
+ START4,
+ START4.minus(START1)
+ );
+
+ Assert.assertEquals(
+ startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
+ START5.minus(START1)
+ );
+
+ Assert.assertEquals(
endMetadata(ImmutableMap.of(1, 4L)),
END2.minus(END1)
);
@@ -169,6 +696,101 @@
endMetadata(ImmutableMap.of(2, 5L)),
END1.minus(END2)
);
+
+ Assert.assertEquals(
+ END0,
+ END0.minus(END4)
+ );
+
+ Assert.assertEquals(
+ END4,
+ END4.minus(END0)
+ );
+
+ Assert.assertEquals(
+ END1,
+ END1.minus(END4)
+ );
+
+ Assert.assertEquals(
+ END4,
+ END4.minus(END1)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
+ END5.minus(END1)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END5.minus(END4)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(2, 5L)),
+ END5.minus(END6)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
+ END6.minus(END5)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
+ END6.minus(END7)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END7.minus(END5)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(2, 5L)),
+ END7.minus(END8)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END7.minus(END9)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END7.minus(END10)
+ );
+
+ Assert.assertEquals(
+ END9,
+ END9.minus(END6)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
+ END9.minus(END7)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(2, 5L)),
+ END9.minus(END8)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
+ END9.minus(END9)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
+ END9.minus(END10)
+ );
+
+ Assert.assertEquals(
+ endMetadataMultiTopic("foo2.*", ImmutableList.of("foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
+ END10.minus(END9)
+ );
}
@Test
@@ -204,19 +826,59 @@
private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long> offsets)
{
+ return startMetadata("foo", offsets);
+ }
+
+ private static KafkaDataSourceMetadata startMetadata(
+ String topic,
+ Map<Integer, Long> offsets
+ )
+ {
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
offsets,
k -> new KafkaTopicPartition(
false,
- "foo",
+ topic,
k
)
);
- return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>("foo", newOffsets, ImmutableSet.of()));
+ return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, newOffsets, ImmutableSet.of()));
}
+ private static KafkaDataSourceMetadata startMetadataMultiTopic(
+ String topicPattern,
+ List<String> topics,
+ Map<Integer, Long> offsets
+ )
+ {
+ Assert.assertFalse(topics.isEmpty());
+ Pattern pattern = Pattern.compile(topicPattern);
+ Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
+ Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
+ for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
+ for (String topic : topics) {
+ newOffsets.put(
+ new KafkaTopicPartition(
+ true,
+ topic,
+ e.getKey()
+
+ ),
+ e.getValue()
+ );
+ }
+ }
+ return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topicPattern, newOffsets, ImmutableSet.of()));
+ }
+
+
private static KafkaDataSourceMetadata endMetadata(Map<Integer, Long> offsets)
{
+ return endMetadata("foo", offsets);
+ }
+
+ private static KafkaDataSourceMetadata endMetadata(String topic, Map<Integer, Long> offsets)
+ {
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
offsets,
k -> new KafkaTopicPartition(
@@ -225,7 +887,33 @@
k
)
);
- return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", newOffsets));
+ return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, newOffsets));
+ }
+
+ private static KafkaDataSourceMetadata endMetadataMultiTopic(
+ String topicPattern,
+ List<String> topics,
+ Map<Integer, Long> offsets
+ )
+ {
+ Assert.assertFalse(topics.isEmpty());
+ Pattern pattern = Pattern.compile(topicPattern);
+ Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
+ Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
+ for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
+ for (String topic : topics) {
+ newOffsets.put(
+ new KafkaTopicPartition(
+ true,
+ topic,
+ e.getKey()
+
+ ),
+ e.getValue()
+ );
+ }
+ }
+ return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topicPattern, newOffsets));
}
private static ObjectMapper createObjectMapper()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java
new file mode 100644
index 0000000..e3b3cef
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class KafkaSeekableStreamEndSequenceNumbersTest
+{
+
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final String stream = "theStream";
+ final Map<KafkaTopicPartition, Long> offsetMap = ImmutableMap.of(
+ new KafkaTopicPartition(false, null, 1), 2L,
+ new KafkaTopicPartition(false, null, 3), 4L
+ );
+
+ final KafkaSeekableStreamEndSequenceNumbers partitions = new KafkaSeekableStreamEndSequenceNumbers(
+ stream,
+ null,
+ offsetMap,
+ null
+ );
+ final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+ // Check round-trip.
+ final SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long> partitions2 = OBJECT_MAPPER.readValue(
+ serializedString,
+ new TypeReference<SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>>() {}
+ );
+
+ Assert.assertEquals(
+ "Round trip",
+ partitions,
+ new KafkaSeekableStreamEndSequenceNumbers(partitions2.getStream(),
+ partitions2.getTopic(),
+ partitions2.getPartitionSequenceNumberMap(),
+ partitions2.getPartitionOffsetMap()
+ )
+ );
+
+ // Check backwards compatibility.
+ final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+ serializedString,
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ Assert.assertEquals(stream, asMap.get("stream"));
+ Assert.assertEquals(stream, asMap.get("topic"));
+
+ // Jackson will deserialize the maps as string -> int maps, not int -> long.
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+ );
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+ );
+
+ // check that KafkaSeekableStreamEndSequenceNumbers not registered with mapper, so no possible collision
+ // when deserializing it from String / bytes
+ boolean expectedExceptionThrown = false;
+ try {
+ OBJECT_MAPPER.readValue(
+ serializedString,
+ KafkaSeekableStreamEndSequenceNumbers.class
+ );
+ }
+ catch (InvalidTypeIdException e) {
+ expectedExceptionThrown = true;
+ }
+
+ Assert.assertTrue("KafkaSeekableStreamEndSequenceNumbers should not be registered type", expectedExceptionThrown);
+ }
+
+ private static ObjectMapper createObjectMapper()
+ {
+ DruidModule module = new KafkaIndexTaskModule();
+ final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
+ .addModule(
+ binder -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+ }
+ )
+ .build();
+
+ ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+ module.getJacksonModules().forEach(objectMapper::registerModule);
+ return objectMapper;
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java
new file mode 100644
index 0000000..07eaced
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.initialization.CoreInjectorBuilder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaSeekableStreamStartSequenceNumbersTest
+{
+ private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final String stream = "theStream";
+ final Map<KafkaTopicPartition, Long> offsetMap = ImmutableMap.of(
+ new KafkaTopicPartition(false, null, 1), 2L,
+ new KafkaTopicPartition(false, null, 3), 4L
+ );
+
+ Set<KafkaTopicPartition> exclusivePartitions = ImmutableSet.of(new KafkaTopicPartition(false, null, 1));
+
+
+ final KafkaSeekableStreamStartSequenceNumbers partitions = new KafkaSeekableStreamStartSequenceNumbers(
+ stream,
+ null,
+ offsetMap,
+ null,
+ exclusivePartitions
+ );
+ final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
+
+ // Check round-trip.
+ final SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> partitions2 = OBJECT_MAPPER.readValue(
+ serializedString,
+ new TypeReference<SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>>() {}
+ );
+
+ Assert.assertEquals(
+ "Round trip",
+ partitions,
+ new KafkaSeekableStreamStartSequenceNumbers(
+ partitions2.getStream(),
+ partitions2.getTopic(),
+ partitions2.getPartitionSequenceNumberMap(),
+ partitions2.getPartitionOffsetMap(),
+ partitions2.getExclusivePartitions()
+ )
+ );
+
+ // Check backwards compatibility.
+ final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
+ serializedString,
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ Assert.assertEquals(stream, asMap.get("stream"));
+ Assert.assertEquals(stream, asMap.get("topic"));
+
+ // Jackson will deserialize the maps as string -> int maps, not int -> long.
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+ );
+ Assert.assertEquals(
+ offsetMap,
+ OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
+ );
+
+ Assert.assertEquals(
+ exclusivePartitions,
+ OBJECT_MAPPER.convertValue(asMap.get("exclusivePartitions"), new TypeReference<Set<KafkaTopicPartition>>() {})
+ );
+
+ // check that KafkaSeekableStreamStartSequenceNumbers not registered with mapper, so no possible collision
+ // when deserializing it from String / bytes
+ boolean expectedExceptionThrown = false;
+ try {
+ OBJECT_MAPPER.readValue(
+ serializedString,
+ KafkaSeekableStreamStartSequenceNumbers.class
+ );
+ }
+ catch (InvalidTypeIdException e) {
+ expectedExceptionThrown = true;
+ }
+
+ Assert.assertTrue("KafkaSeekableStreamStartSequenceNumbers should not be registered type", expectedExceptionThrown);
+ }
+
+ private static ObjectMapper createObjectMapper()
+ {
+ DruidModule module = new KafkaIndexTaskModule();
+ final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
+ .addModule(
+ binder -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
+ }
+ )
+ .build();
+
+ ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+ module.getJacksonModules().forEach(objectMapper::registerModule);
+ return objectMapper;
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index e4b4b2f..a7d4778 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -166,6 +166,8 @@
private SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> taskClient;
private TaskQueue taskQueue;
private String topic;
+ private String topicPattern;
+ private boolean multiTopic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private StubServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;
@@ -174,7 +176,13 @@
private static String getTopic()
{
//noinspection StringConcatenationMissingWhitespace
- return TOPIC_PREFIX + topicPostfix++;
+ return TOPIC_PREFIX + topicPostfix;
+ }
+
+ private static String getTopicPattern()
+ {
+ //noinspection StringConcatenationMissingWhitespace
+ return TOPIC_PREFIX + topicPostfix + ".*";
}
@Parameterized.Parameters(name = "numThreads = {0}")
@@ -222,6 +230,9 @@
taskQueue = createMock(TaskQueue.class);
topic = getTopic();
+ topicPattern = getTopicPattern();
+ topicPostfix++;
+ multiTopic = false; // assign to true in test if you wish to test multi-topic
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new StubServiceEmitter("KafkaSupervisorTest", "localhost");
EmittingLogger.registerEmitter(serviceEmitter);
@@ -906,11 +917,12 @@
}
/**
- * Test generating the starting offsets from the partition data stored in druid_dataSource which contains the
- * offsets of the last built segments.
+ * Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which
+ * contains the offsets from previous single-topic config of the last built segments, where the previous topic does
+ * match the new configuration.
*/
@Test
- public void testDatasourceMetadata() throws Exception
+ public void testDatasourceMetadataSingleTopicToSingleTopicMatch() throws Exception
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
addSomeEvents(100);
@@ -948,6 +960,203 @@
);
}
+ /**
+ * Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which
+ * contains the offsets from previous single-topic config of the last built segments, where the previous topic does
+ * match the new configuration.
+ */
+ @Test
+ public void testDatasourceMetadataSingleTopicToMultiTopicMatch() throws Exception
+ {
+ multiTopic = true;
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+ addSomeEvents(100);
+
+ Capture<KafkaIndexTask> captured = Capture.newInstance();
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+ // these should match
+ partitionSequenceNumberMap.putAll(singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of())
+ )
+ ).anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ KafkaIndexTask task = captured.getValue();
+ KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+ Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+ Assert.assertEquals(
+ 10L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
+ );
+ Assert.assertEquals(
+ 20L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
+ );
+ Assert.assertEquals(
+ 30L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
+ );
+ Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+ }
+
+ /**
+ * Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which
+ * contains the offsets from previous single-topic config of the last built segments, where the previous topic does
+ * not match the new configuration.
+ */
+ @Test
+ public void testDatasourceMetadataSingleTopicToMultiTopicNotMatch() throws Exception
+ {
+ multiTopic = true;
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+ addSomeEvents(100);
+
+ Capture<KafkaIndexTask> captured = Capture.newInstance();
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+ // these should not match
+ partitionSequenceNumberMap.putAll(singlePartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>("notMatch", partitionSequenceNumberMap.build(), ImmutableSet.of())
+ )
+ ).anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ KafkaIndexTask task = captured.getValue();
+ KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+ Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+ Assert.assertEquals(
+ 0L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
+ );
+ Assert.assertEquals(
+ 0L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
+ );
+ Assert.assertEquals(
+ 0L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
+ );
+ Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+ }
+
+ /**
+ * Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which
+ * contains the offsets from previous multi-topic config of the last built segments.
+ */
+ @Test
+ public void testDatasourceMetadataMultiTopicToSingleTopic() throws Exception
+ {
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+ addSomeEvents(100);
+
+ Capture<KafkaIndexTask> captured = Capture.newInstance();
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+ // these should match
+ partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
+ // these should not match
+ partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(topicPattern, partitionSequenceNumberMap.build(), ImmutableSet.of())
+ )
+ ).anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ KafkaIndexTask task = captured.getValue();
+ KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+ Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+ Assert.assertEquals(
+ 10L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue()
+ );
+ Assert.assertEquals(
+ 20L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue()
+ );
+ Assert.assertEquals(
+ 30L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue()
+ );
+ Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+ }
+
+ /**
+ * Test generating the starting offsets for muti-topic config from the partition data stored in druid_dataSource which
+ * contains the offsets from previous multi-topic config of the last built segments.
+ */
+ @Test
+ public void testDatasourceMetadataMultiTopicToMultiTopic() throws Exception
+ {
+ multiTopic = true;
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+ addSomeEvents(100);
+
+ Capture<KafkaIndexTask> captured = Capture.newInstance();
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
+ // these should match
+ partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
+ // these should not match
+ partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of())
+ )
+ ).anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ KafkaIndexTask task = captured.getValue();
+ KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+ Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+ Assert.assertEquals(
+ 10L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
+ );
+ Assert.assertEquals(
+ 20L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
+ );
+ Assert.assertEquals(
+ 30L,
+ taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
+ );
+ Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+ }
+
@Test
public void testBadMetadataOffsets() throws Exception
{
@@ -4621,8 +4830,8 @@
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
- topic,
- null,
+ multiTopic ? null : topic,
+ multiTopic ? topicPattern : null,
INPUT_FORMAT,
replicas,
taskCount,
@@ -5038,6 +5247,13 @@
offset2, new KafkaTopicPartition(false, topic, partition3), offset3);
}
+ private static ImmutableMap<KafkaTopicPartition, Long> multiTopicPartitionMap(String topic, int partition1, long offset1,
+ int partition2, long offset2, int partition3, long offset3)
+ {
+ return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1), offset1, new KafkaTopicPartition(true, topic, partition2),
+ offset2, new KafkaTopicPartition(true, topic, partition3), offset3);
+ }
+
private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final String taskType;
@@ -5109,7 +5325,7 @@
Deserializer valueDeserializerObject = new ByteArrayDeserializer();
return new KafkaRecordSupplier(
new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject),
- false
+ getIoConfig().isMultiTopic()
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
index a95f4cc..de61d46 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.java.util.common.IAE;
import java.util.Collections;
import java.util.Comparator;
@@ -47,6 +46,8 @@
public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> implements
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
{
+ public static final String TYPE = "end";
+
// stream/topic
private final String stream;
// partitionId -> sequence number
@@ -126,13 +127,7 @@
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
- if (this.getClass() != other.getClass()) {
- throw new IAE(
- "Expected instance of %s, got %s",
- this.getClass().getName(),
- other.getClass().getName()
- );
- }
+ validateSequenceNumbersBaseType(other);
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherEnd =
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -151,13 +146,7 @@
@Override
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
{
- if (this.getClass() != other.getClass()) {
- throw new IAE(
- "Expected instance of %s, got %s",
- this.getClass().getName(),
- other.getClass().getName()
- );
- }
+ validateSequenceNumbersBaseType(other);
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -185,13 +174,7 @@
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
- if (this.getClass() != other.getClass()) {
- throw new IAE(
- "Expected instance of %s, got %s",
- this.getClass().getName(),
- other.getClass().getName()
- );
- }
+ validateSequenceNumbersBaseType(other);
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherEnd =
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
index 44f1343..ad1801e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java
@@ -24,14 +24,15 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.java.util.common.IAE;
import java.util.Comparator;
import java.util.Map;
@JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = SeekableStreamEndSequenceNumbers.class)
@JsonSubTypes({
- @Type(name = "start", value = SeekableStreamStartSequenceNumbers.class),
- @Type(name = "end", value = SeekableStreamEndSequenceNumbers.class)
+ @Type(name = SeekableStreamStartSequenceNumbers.TYPE, value = SeekableStreamStartSequenceNumbers.class),
+ @Type(name = SeekableStreamEndSequenceNumbers.TYPE, value = SeekableStreamEndSequenceNumbers.class)
})
public interface SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
{
@@ -41,6 +42,29 @@
String getStream();
/**
+ * Returns whether the sequence number data is for possibly multiple streams / topics.
+ */
+ default boolean isMultiTopicPartition()
+ {
+ return false;
+ }
+
+ /**
+ * throws exception if this class and other class are not equal
+ * @param other the other instance to compare.
+ */
+ default void validateSequenceNumbersBaseType(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other)
+ {
+ if (this.getClass() != other.getClass()) {
+ throw new IAE(
+ "Expected instance of %s, got %s",
+ this.getClass().getName(),
+ other.getClass().getName()
+ );
+ }
+ }
+
+ /**
* Returns a map of partitionId -> sequenceNumber.
*/
Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
index b1a72ff..7d3eab9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -42,6 +41,8 @@
public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> implements
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
{
+ public static final String TYPE = "start";
+
// stream/topic
private final String stream;
// partitionId -> sequence number
@@ -121,13 +122,7 @@
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
- if (this.getClass() != other.getClass()) {
- throw new IAE(
- "Expected instance of %s, got %s",
- this.getClass().getName(),
- other.getClass().getName()
- );
- }
+ validateSequenceNumbersBaseType(other);
final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -165,13 +160,7 @@
@Override
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
{
- if (this.getClass() != other.getClass()) {
- throw new IAE(
- "Expected instance of %s, got %s",
- this.getClass().getName(),
- other.getClass().getName()
- );
- }
+ validateSequenceNumbersBaseType(other);
final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@@ -199,13 +188,7 @@
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
- if (this.getClass() != other.getClass()) {
- throw new IAE(
- "Expected instance of %s, got %s",
- this.getClass().getName(),
- other.getClass().getName()
- );
- }
+ validateSequenceNumbersBaseType(other);
final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 59cd05b..f15a975 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3863,10 +3863,9 @@
}
}
- private Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage()
+ protected Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage()
{
- final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
- dataSource);
+ final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
&& checkSourceMetadataMatch(dataSourceMetadata)) {
@SuppressWarnings("unchecked")
@@ -3889,6 +3888,11 @@
return Collections.emptyMap();
}
+ protected DataSourceMetadata retrieveDataSourceMetadata()
+ {
+ return indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
+ }
+
/**
* Fetches the earliest or latest offset from the stream via the {@link RecordSupplier}
*/