| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.input; |
| |
| import com.fasterxml.jackson.annotation.JacksonInject; |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.FluentIterable; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Iterators; |
| import org.apache.druid.client.coordinator.CoordinatorClient; |
| import org.apache.druid.data.input.AbstractInputSource; |
| import org.apache.druid.data.input.InputFileAttribute; |
| import org.apache.druid.data.input.InputFormat; |
| import org.apache.druid.data.input.InputRowSchema; |
| import org.apache.druid.data.input.InputSourceReader; |
| import org.apache.druid.data.input.InputSplit; |
| import org.apache.druid.data.input.MaxSizeSplitHintSpec; |
| import org.apache.druid.data.input.SegmentsSplitHintSpec; |
| import org.apache.druid.data.input.SplitHintSpec; |
| import org.apache.druid.data.input.impl.InputEntityIteratingReader; |
| import org.apache.druid.data.input.impl.SplittableInputSource; |
| import org.apache.druid.data.input.impl.TimestampSpec; |
| import org.apache.druid.indexing.common.RetryPolicy; |
| import org.apache.druid.indexing.common.RetryPolicyFactory; |
| import org.apache.druid.indexing.common.SegmentCacheManagerFactory; |
| import org.apache.druid.indexing.common.config.TaskConfig; |
| import org.apache.druid.indexing.firehose.WindowedSegmentId; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.guava.Comparators; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.druid.query.filter.DimFilter; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.column.ColumnHolder; |
| import org.apache.druid.segment.loading.SegmentCacheManager; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.TimelineObjectHolder; |
| import org.apache.druid.timeline.VersionedIntervalTimeline; |
| import org.apache.druid.timeline.partition.PartitionChunk; |
| import org.apache.druid.timeline.partition.PartitionHolder; |
| import org.apache.druid.utils.Streams; |
| import org.joda.time.Duration; |
| import org.joda.time.Interval; |
| |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.stream.Stream; |
| |
| /** |
| * An {@link org.apache.druid.data.input.InputSource} that allows reading from Druid segments. |
| * |
| * Used internally by {@link org.apache.druid.indexing.common.task.CompactionTask}, and can also be used directly. |
| */ |
| @JsonInclude(JsonInclude.Include.NON_NULL) |
| public class DruidInputSource extends AbstractInputSource implements SplittableInputSource<List<WindowedSegmentId>> |
| { |
| private static final Logger LOG = new Logger(DruidInputSource.class); |
| |
| /** |
| * Timestamp formats that the standard __time column can be parsed with. |
| */ |
| private static final List<String> STANDARD_TIME_COLUMN_FORMATS = ImmutableList.of("millis", "auto"); |
| |
| /** |
| * A Comparator that orders {@link WindowedSegmentId} mainly by segmentId (which is important), and then by intervals |
| * (which is arbitrary, and only here for totality of ordering). |
| */ |
| private static final Comparator<WindowedSegmentId> WINDOWED_SEGMENT_ID_COMPARATOR = |
| Comparator.comparing(WindowedSegmentId::getSegmentId) |
| .thenComparing(windowedSegmentId -> windowedSegmentId.getIntervals().size()) |
| .thenComparing( |
| (WindowedSegmentId a, WindowedSegmentId b) -> { |
| // Same segmentId, same intervals list size. Compare each interval. |
| int cmp = 0; |
| |
| for (int i = 0; i < a.getIntervals().size(); i++) { |
| cmp = Comparators.intervalsByStartThenEnd() |
| .compare(a.getIntervals().get(i), b.getIntervals().get(i)); |
| |
| if (cmp != 0) { |
| return cmp; |
| } |
| } |
| |
| return cmp; |
| } |
| ); |
| |
| private final String dataSource; |
| // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly |
| // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel |
| // batch ingestion. |
| @Nullable |
| private final Interval interval; |
| @Nullable |
| private final List<WindowedSegmentId> segmentIds; |
| private final DimFilter dimFilter; |
| private final IndexIO indexIO; |
| private final CoordinatorClient coordinatorClient; |
| private final SegmentCacheManagerFactory segmentCacheManagerFactory; |
| private final RetryPolicyFactory retryPolicyFactory; |
| private final TaskConfig taskConfig; |
| |
| /** |
| * Included for serde backwards-compatibility only. Not used. |
| */ |
| private final List<String> dimensions; |
| |
| /** |
| * Included for serde backwards-compatibility only. Not used. |
| */ |
| private final List<String> metrics; |
| |
| @JsonCreator |
| public DruidInputSource( |
| @JsonProperty("dataSource") final String dataSource, |
| @JsonProperty("interval") @Nullable Interval interval, |
| // Specifying "segments" is intended only for when this FirehoseFactory has split itself, |
| // not for direct end user use. |
| @JsonProperty("segments") @Nullable List<WindowedSegmentId> segmentIds, |
| @JsonProperty("filter") DimFilter dimFilter, |
| @Nullable @JsonProperty("dimensions") List<String> dimensions, |
| @Nullable @JsonProperty("metrics") List<String> metrics, |
| @JacksonInject IndexIO indexIO, |
| @JacksonInject CoordinatorClient coordinatorClient, |
| @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, |
| @JacksonInject RetryPolicyFactory retryPolicyFactory, |
| @JacksonInject TaskConfig taskConfig |
| ) |
| { |
| Preconditions.checkNotNull(dataSource, "dataSource"); |
| if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) { |
| throw new IAE("Specify exactly one of 'interval' and 'segments'"); |
| } |
| this.dataSource = dataSource; |
| this.interval = interval; |
| this.segmentIds = segmentIds; |
| this.dimFilter = dimFilter; |
| this.dimensions = dimensions; |
| this.metrics = metrics; |
| this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); |
| this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient"); |
| this.segmentCacheManagerFactory = Preconditions.checkNotNull(segmentCacheManagerFactory, "null segmentCacheManagerFactory"); |
| this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory"); |
| this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig"); |
| } |
| |
| @JsonProperty |
| public String getDataSource() |
| { |
| return dataSource; |
| } |
| |
| @Nullable |
| @JsonProperty |
| public Interval getInterval() |
| { |
| return interval; |
| } |
| |
| @Nullable |
| @JsonProperty("segments") |
| public List<WindowedSegmentId> getSegmentIds() |
| { |
| return segmentIds; |
| } |
| |
| @JsonProperty("filter") |
| public DimFilter getDimFilter() |
| { |
| return dimFilter; |
| } |
| |
| /** |
| * Included for serde backwards-compatibility only. Not used. |
| */ |
| @JsonProperty |
| public List<String> getDimensions() |
| { |
| return dimensions; |
| } |
| |
| /** |
| * Included for serde backwards-compatibility only. Not used. |
| */ |
| @JsonProperty |
| public List<String> getMetrics() |
| { |
| return metrics; |
| } |
| |
| @Override |
| protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) |
| { |
| final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory); |
| |
| final List<TimelineObjectHolder<String, DataSegment>> timeline = createTimeline(); |
| final Iterator<DruidSegmentInputEntity> entityIterator = FluentIterable |
| .from(timeline) |
| .transformAndConcat(holder -> { |
| //noinspection ConstantConditions |
| final PartitionHolder<DataSegment> partitionHolder = holder.getObject(); |
| //noinspection ConstantConditions |
| return FluentIterable |
| .from(partitionHolder) |
| .transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval())); |
| }).iterator(); |
| |
| final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter); |
| |
| final InputRowSchema inputRowSchemaToUse; |
| |
| if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) { |
| // Legacy compatibility mode; see https://github.com/apache/druid/pull/10267. |
| LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with " |
| + "the 'druid' input source, set druid.indexer.task.ignoreTimestampSpecForDruidInputSource to false."); |
| |
| inputRowSchemaToUse = new InputRowSchema( |
| new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null), |
| inputRowSchema.getDimensionsSpec(), |
| inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME) |
| ); |
| } else { |
| inputRowSchemaToUse = inputRowSchema; |
| } |
| |
| if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn()) |
| && !STANDARD_TIME_COLUMN_FORMATS.contains(inputRowSchemaToUse.getTimestampSpec().getTimestampFormat())) { |
| // Slight chance the user did this intentionally, but not likely. Log a warning. |
| LOG.warn( |
| "The provided timestampSpec refers to the %s column without using format %s. If you wanted to read the " |
| + "column as-is, switch formats.", |
| inputRowSchemaToUse.getTimestampSpec().getTimestampColumn(), |
| STANDARD_TIME_COLUMN_FORMATS |
| ); |
| } |
| |
| return new InputEntityIteratingReader( |
| inputRowSchemaToUse, |
| inputFormat, |
| entityIterator, |
| temporaryDirectory |
| ); |
| } |
| |
| private List<TimelineObjectHolder<String, DataSegment>> createTimeline() |
| { |
| if (interval == null) { |
| return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); |
| } else { |
| return getTimelineForInterval(coordinatorClient, retryPolicyFactory, dataSource, interval); |
| } |
| } |
| |
| @Override |
| public Stream<InputSplit<List<WindowedSegmentId>>> createSplits( |
| InputFormat inputFormat, |
| @Nullable SplitHintSpec splitHintSpec |
| ) |
| { |
| // segmentIds is supposed to be specified by the supervisor task during the parallel indexing. |
| // If it's not null, segments are already split by the supervisor task and further split won't happen. |
| if (segmentIds == null) { |
| return Streams.sequentialStreamFrom( |
| createSplits( |
| coordinatorClient, |
| retryPolicyFactory, |
| dataSource, |
| interval, |
| splitHintSpec == null ? SplittableInputSource.DEFAULT_SPLIT_HINT_SPEC : splitHintSpec |
| ) |
| ); |
| } else { |
| return Stream.of(new InputSplit<>(segmentIds)); |
| } |
| } |
| |
| @Override |
| public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) |
| { |
| // segmentIds is supposed to be specified by the supervisor task during the parallel indexing. |
| // If it's not null, segments are already split by the supervisor task and further split won't happen. |
| if (segmentIds == null) { |
| return Iterators.size( |
| createSplits( |
| coordinatorClient, |
| retryPolicyFactory, |
| dataSource, |
| interval, |
| splitHintSpec == null ? SplittableInputSource.DEFAULT_SPLIT_HINT_SPEC : splitHintSpec |
| ) |
| ); |
| } else { |
| return 1; |
| } |
| } |
| |
| @Override |
| public SplittableInputSource<List<WindowedSegmentId>> withSplit(InputSplit<List<WindowedSegmentId>> split) |
| { |
| return new DruidInputSource( |
| dataSource, |
| null, |
| split.get(), |
| dimFilter, |
| dimensions, |
| metrics, |
| indexIO, |
| coordinatorClient, |
| segmentCacheManagerFactory, |
| retryPolicyFactory, |
| taskConfig |
| ); |
| } |
| |
| @Override |
| public boolean needsFormat() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| DruidInputSource that = (DruidInputSource) o; |
| return Objects.equals(dataSource, that.dataSource) |
| && Objects.equals(interval, that.interval) |
| && Objects.equals(segmentIds, that.segmentIds) |
| && Objects.equals(dimFilter, that.dimFilter) |
| && Objects.equals(dimensions, that.dimensions) |
| && Objects.equals(metrics, that.metrics); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash(dataSource, interval, segmentIds, dimFilter, dimensions, metrics); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "DruidInputSource{" + |
| "dataSource='" + dataSource + '\'' + |
| ", interval=" + interval + |
| ", segmentIds=" + segmentIds + |
| ", dimFilter=" + dimFilter + |
| (dimensions != null ? ", dimensions=" + dimensions : "") + |
| (metrics != null ? ", metrics=" + metrics : "") + |
| '}'; |
| } |
| |
| public static Iterator<InputSplit<List<WindowedSegmentId>>> createSplits( |
| CoordinatorClient coordinatorClient, |
| RetryPolicyFactory retryPolicyFactory, |
| String dataSource, |
| Interval interval, |
| SplitHintSpec splitHintSpec |
| ) |
| { |
| final SplitHintSpec convertedSplitHintSpec; |
| if (splitHintSpec instanceof SegmentsSplitHintSpec) { |
| final SegmentsSplitHintSpec segmentsSplitHintSpec = (SegmentsSplitHintSpec) splitHintSpec; |
| convertedSplitHintSpec = new MaxSizeSplitHintSpec( |
| segmentsSplitHintSpec.getMaxInputSegmentBytesPerTask(), |
| segmentsSplitHintSpec.getMaxNumSegments() |
| ); |
| } else { |
| convertedSplitHintSpec = splitHintSpec; |
| } |
| |
| final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = getTimelineForInterval( |
| coordinatorClient, |
| retryPolicyFactory, |
| dataSource, |
| interval |
| ); |
| final Map<WindowedSegmentId, Long> segmentIdToSize = createWindowedSegmentIdFromTimeline(timelineSegments); |
| //noinspection ConstantConditions |
| return Iterators.transform( |
| convertedSplitHintSpec.split( |
| // segmentIdToSize is sorted by segment ID; useful for grouping up segments from the same time chunk into |
| // the same input split. |
| segmentIdToSize.keySet().iterator(), |
| segmentId -> new InputFileAttribute( |
| Preconditions.checkNotNull(segmentIdToSize.get(segmentId), "segment size for [%s]", segmentId) |
| ) |
| ), |
| InputSplit::new |
| ); |
| } |
| |
| /** |
| * Returns a map of {@link WindowedSegmentId} to size, sorted by {@link WindowedSegmentId#getSegmentId()}. |
| */ |
| private static SortedMap<WindowedSegmentId, Long> createWindowedSegmentIdFromTimeline( |
| List<TimelineObjectHolder<String, DataSegment>> timelineHolders |
| ) |
| { |
| Map<DataSegment, WindowedSegmentId> windowedSegmentIds = new HashMap<>(); |
| for (TimelineObjectHolder<String, DataSegment> holder : timelineHolders) { |
| for (PartitionChunk<DataSegment> chunk : holder.getObject()) { |
| windowedSegmentIds.computeIfAbsent( |
| chunk.getObject(), |
| segment -> new WindowedSegmentId(segment.getId().toString(), new ArrayList<>()) |
| ).addInterval(holder.getInterval()); |
| } |
| } |
| // It is important to create this map after windowedSegmentIds is completely filled, because WindowedSegmentIds |
| // can be updated while being constructed. (Intervals are added.) |
| SortedMap<WindowedSegmentId, Long> segmentSizeMap = new TreeMap<>(WINDOWED_SEGMENT_ID_COMPARATOR); |
| windowedSegmentIds.forEach((segment, segmentId) -> segmentSizeMap.put(segmentId, segment.getSize())); |
| return segmentSizeMap; |
| } |
| |
| public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForInterval( |
| CoordinatorClient coordinatorClient, |
| RetryPolicyFactory retryPolicyFactory, |
| String dataSource, |
| Interval interval |
| ) |
| { |
| Preconditions.checkNotNull(interval); |
| |
| // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration |
| // as TaskActionClient. |
| final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); |
| Collection<DataSegment> usedSegments; |
| while (true) { |
| try { |
| usedSegments = coordinatorClient.fetchUsedSegmentsInDataSourceForIntervals( |
| dataSource, |
| Collections.singletonList(interval) |
| ); |
| break; |
| } |
| catch (Throwable e) { |
| LOG.warn(e, "Exception getting database segments"); |
| final Duration delay = retryPolicy.getAndIncrementRetryDelay(); |
| if (delay == null) { |
| throw e; |
| } else { |
| final long sleepTime = jitter(delay.getMillis()); |
| LOG.info("Will try again in [%s].", new Duration(sleepTime).toString()); |
| try { |
| Thread.sleep(sleepTime); |
| } |
| catch (InterruptedException e2) { |
| throw new RuntimeException(e2); |
| } |
| } |
| } |
| } |
| |
| return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval); |
| } |
| |
| public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegmentIds( |
| CoordinatorClient coordinatorClient, |
| String dataSource, |
| List<WindowedSegmentId> segmentIds |
| ) |
| { |
| final SortedMap<Interval, TimelineObjectHolder<String, DataSegment>> timeline = new TreeMap<>( |
| Comparators.intervalsByStartThenEnd() |
| ); |
| for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds, "segmentIds")) { |
| final DataSegment segment = coordinatorClient.fetchUsedSegment( |
| dataSource, |
| windowedSegmentId.getSegmentId() |
| ); |
| for (Interval interval : windowedSegmentId.getIntervals()) { |
| final TimelineObjectHolder<String, DataSegment> existingHolder = timeline.get(interval); |
| if (existingHolder != null) { |
| if (!existingHolder.getVersion().equals(segment.getVersion())) { |
| throw new ISE("Timeline segments with the same interval should have the same version: " + |
| "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment); |
| } |
| existingHolder.getObject().add(segment.getShardSpec().createChunk(segment)); |
| } else { |
| timeline.put( |
| interval, |
| new TimelineObjectHolder<>( |
| interval, |
| segment.getInterval(), |
| segment.getVersion(), |
| new PartitionHolder<>(segment.getShardSpec().createChunk(segment)) |
| ) |
| ); |
| } |
| } |
| } |
| |
| // Validate that none of the given windows overlaps (except for when multiple segments share exactly the |
| // same interval). |
| Interval lastInterval = null; |
| for (Interval interval : timeline.keySet()) { |
| if (lastInterval != null && interval.overlaps(lastInterval)) { |
| throw new IAE( |
| "Distinct intervals in input segments may not overlap: [%s] vs [%s]", |
| lastInterval, |
| interval |
| ); |
| } |
| lastInterval = interval; |
| } |
| |
| return new ArrayList<>(timeline.values()); |
| } |
| |
| private static long jitter(long input) |
| { |
| final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0; |
| long retval = input + (long) jitter; |
| return retval < 0 ? 0 : retval; |
| } |
| } |