| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.seekablestream; |
| |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.ImmutableMap; |
| import org.apache.druid.data.input.Committer; |
| import org.apache.druid.indexing.common.TaskToolbox; |
| import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; |
| import org.apache.druid.indexing.overlord.SegmentPublishResult; |
| import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; |
| import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.segment.SegmentUtils; |
| import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; |
| import org.apache.druid.timeline.DataSegment; |
| |
| import javax.annotation.Nullable; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.BiFunction; |
| |
| public class SequenceMetadata<PartitionIdType, SequenceOffsetType> |
| { |
| private static final EmittingLogger log = new EmittingLogger(SequenceMetadata.class); |
| |
| private final int sequenceId; |
| private final String sequenceName; |
| private final Set<PartitionIdType> exclusiveStartPartitions; |
| private final Set<PartitionIdType> assignments; |
| private final boolean sentinel; |
| /** |
| * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because |
| * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread. |
| */ |
| private final ReentrantLock lock = new ReentrantLock(); |
| |
| final Map<PartitionIdType, SequenceOffsetType> startOffsets; |
| final Map<PartitionIdType, SequenceOffsetType> endOffsets; |
| |
| private boolean checkpointed; |
| |
| @JsonCreator |
| public SequenceMetadata( |
| @JsonProperty("sequenceId") int sequenceId, |
| @JsonProperty("sequenceName") String sequenceName, |
| @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> startOffsets, |
| @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> endOffsets, |
| @JsonProperty("checkpointed") boolean checkpointed, |
| @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions |
| ) |
| { |
| Preconditions.checkNotNull(sequenceName); |
| Preconditions.checkNotNull(startOffsets); |
| Preconditions.checkNotNull(endOffsets); |
| this.sequenceId = sequenceId; |
| this.sequenceName = sequenceName; |
| this.startOffsets = ImmutableMap.copyOf(startOffsets); |
| this.endOffsets = new HashMap<>(endOffsets); |
| this.assignments = new HashSet<>(startOffsets.keySet()); |
| this.checkpointed = checkpointed; |
| this.sentinel = false; |
| this.exclusiveStartPartitions = exclusiveStartPartitions == null |
| ? Collections.emptySet() |
| : exclusiveStartPartitions; |
| } |
| |
| @JsonProperty |
| public Set<PartitionIdType> getExclusiveStartPartitions() |
| { |
| return exclusiveStartPartitions; |
| } |
| |
| @JsonProperty |
| public int getSequenceId() |
| { |
| return sequenceId; |
| } |
| |
| @JsonProperty |
| public boolean isCheckpointed() |
| { |
| lock.lock(); |
| try { |
| return checkpointed; |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| @JsonProperty |
| public String getSequenceName() |
| { |
| return sequenceName; |
| } |
| |
| @JsonProperty |
| public Map<PartitionIdType, SequenceOffsetType> getStartOffsets() |
| { |
| return startOffsets; |
| } |
| |
| @JsonProperty |
| public Map<PartitionIdType, SequenceOffsetType> getEndOffsets() |
| { |
| lock.lock(); |
| try { |
| return endOffsets; |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| @JsonProperty |
| public boolean isSentinel() |
| { |
| return sentinel; |
| } |
| |
| void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets) |
| { |
| lock.lock(); |
| try { |
| endOffsets.putAll(newEndOffsets); |
| checkpointed = true; |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| void updateAssignments( |
| Map<PartitionIdType, SequenceOffsetType> currOffsets, |
| BiFunction<SequenceOffsetType, SequenceOffsetType, Boolean> moreToReadFn |
| ) |
| { |
| lock.lock(); |
| try { |
| assignments.clear(); |
| currOffsets.forEach((key, value) -> { |
| SequenceOffsetType endOffset = endOffsets.get(key); |
| if (moreToReadFn.apply(value, endOffset)) { |
| assignments.add(key); |
| } |
| }); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| boolean isOpen() |
| { |
| return !assignments.isEmpty(); |
| } |
| |
| boolean canHandle( |
| SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, |
| OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record |
| ) |
| { |
| lock.lock(); |
| try { |
| final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = runner.createSequenceNumber(endOffsets.get(record.getPartitionId())); |
| final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = runner.createSequenceNumber(startOffsets.get( |
| record.getPartitionId())); |
| final OrderedSequenceNumber<SequenceOffsetType> recordOffset = runner.createSequenceNumber(record.getSequenceNumber()); |
| if (!isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) { |
| return false; |
| } |
| boolean ret; |
| if (!runner.isEndOffsetExclusive()) { |
| // Inclusive endOffsets mean that we must skip the first record of any partition that has been read before. |
| ret = recordOffset.compareTo(partitionStartOffset) |
| >= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0); |
| } else { |
| ret = recordOffset.compareTo(partitionStartOffset) >= 0; |
| } |
| |
| if (runner.isEndOffsetExclusive()) { |
| ret &= recordOffset.compareTo(partitionEndOffset) < 0; |
| } else { |
| ret &= recordOffset.compareTo(partitionEndOffset) <= 0; |
| } |
| |
| return ret; |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| lock.lock(); |
| try { |
| return "SequenceMetadata{" + |
| "sequenceId=" + sequenceId + |
| ", sequenceName='" + sequenceName + '\'' + |
| ", assignments=" + assignments + |
| ", startOffsets=" + startOffsets + |
| ", exclusiveStartPartitions=" + exclusiveStartPartitions + |
| ", endOffsets=" + endOffsets + |
| ", sentinel=" + sentinel + |
| ", checkpointed=" + checkpointed + |
| '}'; |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| Supplier<Committer> getCommitterSupplier( |
| SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, |
| String stream, |
| Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets |
| ) |
| { |
| // Set up committer. |
| return () -> |
| new Committer() |
| { |
| @Override |
| public Object getMetadata() |
| { |
| lock.lock(); |
| |
| try { |
| Preconditions.checkState( |
| assignments.isEmpty(), |
| "This committer can be used only once all the records till sequences [%s] have been consumed, also make" |
| + " sure to call updateAssignments before using this committer", |
| endOffsets |
| ); |
| |
| |
| // merge endOffsets for this sequence with globally lastPersistedOffsets |
| // This is done because this committer would be persisting only sub set of segments |
| // corresponding to the current sequence. Generally, lastPersistedOffsets should already |
| // cover endOffsets but just to be sure take max of sequences and persist that |
| for (Map.Entry<PartitionIdType, SequenceOffsetType> partitionOffset : endOffsets.entrySet()) { |
| SequenceOffsetType newOffsets = partitionOffset.getValue(); |
| if (lastPersistedOffsets.containsKey(partitionOffset.getKey()) |
| && runner.createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())) |
| .compareTo(runner.createSequenceNumber(newOffsets)) > 0) { |
| newOffsets = lastPersistedOffsets.get(partitionOffset.getKey()); |
| } |
| lastPersistedOffsets.put( |
| partitionOffset.getKey(), |
| newOffsets |
| ); |
| } |
| |
| // Publish metadata can be different from persist metadata as we are going to publish only |
| // subset of segments |
| return ImmutableMap.of( |
| SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS, |
| new SeekableStreamStartSequenceNumbers<>(stream, lastPersistedOffsets, exclusiveStartPartitions), |
| SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS, |
| new SeekableStreamEndSequenceNumbers<>(stream, endOffsets) |
| ); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public void run() |
| { |
| // Do nothing. |
| } |
| }; |
| |
| } |
| |
| TransactionalSegmentPublisher createPublisher( |
| SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, |
| TaskToolbox toolbox, |
| boolean useTransaction |
| ) |
| { |
| return new SequenceMetadataTransactionalSegmentPublisher( |
| runner, |
| toolbox, |
| useTransaction |
| ); |
| } |
| |
| private class SequenceMetadataTransactionalSegmentPublisher |
| implements TransactionalSegmentPublisher |
| { |
| private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner; |
| private final TaskToolbox toolbox; |
| private final boolean useTransaction; |
| |
| public SequenceMetadataTransactionalSegmentPublisher( |
| SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner, |
| TaskToolbox toolbox, |
| boolean useTransaction |
| ) |
| { |
| this.runner = runner; |
| this.toolbox = toolbox; |
| this.useTransaction = useTransaction; |
| } |
| |
| @Override |
| public SegmentPublishResult publishAnnotatedSegments( |
| @Nullable Set<DataSegment> mustBeNullOrEmptySegments, |
| Set<DataSegment> segmentsToPush, |
| @Nullable Object commitMetadata |
| ) throws IOException |
| { |
| if (mustBeNullOrEmptySegments != null && !mustBeNullOrEmptySegments.isEmpty()) { |
| throw new ISE( |
| "Stream ingestion task unexpectedly attempted to overwrite segments: %s", |
| SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptySegments) |
| ); |
| } |
| final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata"); |
| final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> finalPartitions = |
| runner.deserializePartitionsFromMetadata( |
| toolbox.getJsonMapper(), |
| commitMetaMap.get(SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS) |
| ); |
| |
| // Sanity check, we should only be publishing things that match our desired end state. |
| if (!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) { |
| throw new ISE( |
| "Driver for sequence[%s] attempted to publish invalid metadata[%s].", |
| SequenceMetadata.this.toString(), |
| commitMetadata |
| ); |
| } |
| |
| final SegmentTransactionalInsertAction action; |
| |
| if (segmentsToPush.isEmpty()) { |
| // If a task ingested no data but made progress reading through its assigned partitions, |
| // we publish no segments but still need to update the supervisor with the current offsets |
| SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> startPartitions = |
| new SeekableStreamStartSequenceNumbers<>( |
| finalPartitions.getStream(), |
| getStartOffsets(), |
| exclusiveStartPartitions |
| ); |
| if (isMetadataUnchanged(startPartitions, finalPartitions)) { |
| // if we created no segments and didn't change any offsets, just do nothing and return. |
| log.info( |
| "With empty segment set, start offsets [%s] and end offsets [%s] are the same, skipping metadata commit.", |
| startPartitions, |
| finalPartitions |
| ); |
| return SegmentPublishResult.ok(segmentsToPush); |
| } else { |
| log.info( |
| "With empty segment set, start offsets [%s] and end offsets [%s] changed, committing new metadata.", |
| startPartitions, |
| finalPartitions |
| ); |
| action = SegmentTransactionalInsertAction.commitMetadataOnlyAction( |
| runner.getAppenderator().getDataSource(), |
| runner.createDataSourceMetadata(startPartitions), |
| runner.createDataSourceMetadata(finalPartitions) |
| ); |
| } |
| } else if (useTransaction) { |
| action = SegmentTransactionalInsertAction.appendAction( |
| segmentsToPush, |
| runner.createDataSourceMetadata( |
| new SeekableStreamStartSequenceNumbers<>( |
| finalPartitions.getStream(), |
| getStartOffsets(), |
| exclusiveStartPartitions |
| ) |
| ), |
| runner.createDataSourceMetadata(finalPartitions) |
| ); |
| } else { |
| action = SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null); |
| } |
| |
| return toolbox.getTaskActionClient().submit(action); |
| } |
| |
| @Override |
| public boolean supportsEmptyPublish() |
| { |
| return true; |
| } |
| } |
| |
| private boolean isMetadataUnchanged( |
| SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> startSequenceNumbers, |
| SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> endSequenceNumbers |
| ) |
| { |
| Map<PartitionIdType, SequenceOffsetType> startMap = startSequenceNumbers.getPartitionSequenceNumberMap(); |
| Map<PartitionIdType, SequenceOffsetType> endMap = endSequenceNumbers.getPartitionSequenceNumberMap(); |
| return startMap.equals(endMap); |
| } |
| } |