| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.seekablestream; |
| |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.google.common.base.Preconditions; |
| |
| import javax.annotation.Nullable; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Set; |
| |
| /** |
| * Represents the start sequenceNumber per partition of a sequence. This class keeps an additional set of |
| * {@link #exclusivePartitions} for Kinesis indexing service in where each start offset can be either inclusive |
| * or exclusive. |
| */ |
| public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> implements |
| SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> |
| { |
| public static final String TYPE = "start"; |
| |
| // stream/topic |
| private final String stream; |
| // partitionId -> sequence number |
| private final Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap; |
| private final Set<PartitionIdType> exclusivePartitions; |
| |
| @JsonCreator |
| public SeekableStreamStartSequenceNumbers( |
| @JsonProperty("stream") final String stream, |
| // kept for backward compatibility |
| @JsonProperty("topic") final String topic, |
| @JsonProperty("partitionSequenceNumberMap") |
| final Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap, |
| // kept for backward compatibility |
| @JsonProperty("partitionOffsetMap") final Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap, |
| @JsonProperty("exclusivePartitions") @Nullable final Set<PartitionIdType> exclusivePartitions |
| ) |
| { |
| this.stream = stream == null ? topic : stream; |
| this.partitionSequenceNumberMap = partitionOffsetMap == null ? partitionSequenceNumberMap : partitionOffsetMap; |
| |
| Preconditions.checkNotNull(this.stream, "stream"); |
| Preconditions.checkNotNull(this.partitionSequenceNumberMap, "partitionIdToSequenceNumberMap"); |
| |
| // exclusiveOffset can be null if this class is deserialized from metadata store. Note that only end offsets are |
| // stored in metadata store. |
| // The default is true because there was only Kafka indexing service before in which the end offset is always |
| // exclusive. |
| this.exclusivePartitions = exclusivePartitions == null ? Collections.emptySet() : exclusivePartitions; |
| } |
| |
| public SeekableStreamStartSequenceNumbers( |
| String stream, |
| Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap, |
| Set<PartitionIdType> exclusivePartitions |
| ) |
| { |
| this(stream, null, partitionSequenceNumberMap, null, exclusivePartitions); |
| } |
| |
| @Override |
| @JsonProperty |
| public String getStream() |
| { |
| return stream; |
| } |
| |
| /** |
| * Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized |
| * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object. |
| */ |
| @JsonProperty |
| public String getTopic() |
| { |
| return stream; |
| } |
| |
| @Override |
| @JsonProperty |
| public Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap() |
| { |
| return partitionSequenceNumberMap; |
| } |
| |
| /** |
| * Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized |
| * SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object. |
| */ |
| @JsonProperty |
| public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap() |
| { |
| return partitionSequenceNumberMap; |
| } |
| |
| @Override |
| public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> plus( |
| SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other |
| ) |
| { |
| validateSequenceNumbersBaseType(other); |
| |
| final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart = |
| (SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other; |
| |
| if (stream.equals(otherStart.stream)) { |
| // Same stream, merge sequences. |
| final Map<PartitionIdType, SequenceOffsetType> newMap = new HashMap<>(partitionSequenceNumberMap); |
| newMap.putAll(otherStart.partitionSequenceNumberMap); |
| |
| // A partition is exclusive if it's |
| // 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or |
| // 2) exclusive in "other" |
| final Set<PartitionIdType> newExclusivePartitions = new HashSet<>(); |
| partitionSequenceNumberMap.forEach( |
| (partitionId, sequenceOffset) -> { |
| if (exclusivePartitions.contains(partitionId) |
| && !otherStart.partitionSequenceNumberMap.containsKey(partitionId)) { |
| newExclusivePartitions.add(partitionId); |
| } |
| } |
| ); |
| newExclusivePartitions.addAll(otherStart.exclusivePartitions); |
| |
| return new SeekableStreamStartSequenceNumbers<>( |
| stream, |
| newMap, |
| newExclusivePartitions |
| ); |
| } else { |
| // Different stream, prefer "other". |
| return other; |
| } |
| } |
| |
| @Override |
| public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator) |
| { |
| validateSequenceNumbersBaseType(other); |
| |
| final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart = |
| (SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other; |
| |
| if (stream.equals(otherStart.stream)) { |
| //Same stream, compare the offset |
| boolean res = false; |
| for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : partitionSequenceNumberMap.entrySet()) { |
| PartitionIdType partitionId = entry.getKey(); |
| SequenceOffsetType sequenceOffset = entry.getValue(); |
| if (otherStart.partitionSequenceNumberMap.get(partitionId) != null && comparator.compare(sequenceOffset, otherStart.partitionSequenceNumberMap.get(partitionId)) > 0) { |
| res = true; |
| break; |
| } |
| } |
| if (res) { |
| return 1; |
| } |
| } |
| return 0; |
| } |
| |
| @Override |
| public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> minus( |
| SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other |
| ) |
| { |
| validateSequenceNumbersBaseType(other); |
| |
| final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart = |
| (SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other; |
| |
| if (stream.equals(otherStart.stream)) { |
| // Same stream, remove partitions present in "that" from "this" |
| final Map<PartitionIdType, SequenceOffsetType> newMap = new HashMap<>(); |
| final Set<PartitionIdType> newExclusivePartitions = new HashSet<>(); |
| |
| for (Entry<PartitionIdType, SequenceOffsetType> entry : partitionSequenceNumberMap.entrySet()) { |
| if (!otherStart.partitionSequenceNumberMap.containsKey(entry.getKey())) { |
| newMap.put(entry.getKey(), entry.getValue()); |
| // A partition is exclusive if it's exclusive in "this" and not in "other"'s partitionSequenceNumberMap |
| if (exclusivePartitions.contains(entry.getKey())) { |
| newExclusivePartitions.add(entry.getKey()); |
| } |
| } |
| } |
| |
| return new SeekableStreamStartSequenceNumbers<>( |
| stream, |
| newMap, |
| newExclusivePartitions |
| ); |
| } else { |
| // Different stream, prefer "this". |
| return this; |
| } |
| } |
| |
| @JsonProperty |
| public Set<PartitionIdType> getExclusivePartitions() |
| { |
| return exclusivePartitions; |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| SeekableStreamStartSequenceNumbers<?, ?> that = (SeekableStreamStartSequenceNumbers<?, ?>) o; |
| return Objects.equals(stream, that.stream) && |
| Objects.equals(partitionSequenceNumberMap, that.partitionSequenceNumberMap) && |
| Objects.equals(exclusivePartitions, that.exclusivePartitions); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash(stream, partitionSequenceNumberMap, exclusivePartitions); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "SeekableStreamStartSequenceNumbers{" + |
| "stream='" + stream + '\'' + |
| ", partitionSequenceNumberMap=" + partitionSequenceNumberMap + |
| ", exclusivePartitions=" + exclusivePartitions + |
| '}'; |
| } |
| } |