| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.common.task; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.errorprone.annotations.concurrent.GuardedBy; |
| import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; |
| import org.apache.druid.data.input.InputFormat; |
| import org.apache.druid.data.input.InputRow; |
| import org.apache.druid.data.input.InputSource; |
| import org.apache.druid.data.input.InputSourceReader; |
| import org.apache.druid.data.input.impl.DimensionsSpec; |
| import org.apache.druid.indexer.IngestionState; |
| import org.apache.druid.indexer.report.IngestionStatsAndErrors; |
| import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; |
| import org.apache.druid.indexer.report.TaskContextReport; |
| import org.apache.druid.indexer.report.TaskReport; |
| import org.apache.druid.indexing.common.LockGranularity; |
| import org.apache.druid.indexing.common.TaskLock; |
| import org.apache.druid.indexing.common.TaskLockType; |
| import org.apache.druid.indexing.common.TaskToolbox; |
| import org.apache.druid.indexing.common.actions.LockListAction; |
| import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; |
| import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; |
| import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; |
| import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction; |
| import org.apache.druid.indexing.common.actions.TaskAction; |
| import org.apache.druid.indexing.common.actions.TaskActionClient; |
| import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; |
| import org.apache.druid.indexing.common.config.TaskConfig; |
| import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; |
| import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException; |
| import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; |
| import org.apache.druid.indexing.input.InputRowSchemas; |
| import org.apache.druid.indexing.overlord.SegmentPublishResult; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.JodaUtils; |
| import org.apache.druid.java.util.common.NonnullPair; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.Stopwatch; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.granularity.GranularityType; |
| import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; |
| import org.apache.druid.query.DruidMetrics; |
| import org.apache.druid.query.QueryContexts; |
| import org.apache.druid.query.SegmentDescriptor; |
| import org.apache.druid.segment.SegmentSchemaMapping; |
| import org.apache.druid.segment.handoff.SegmentHandoffNotifier; |
| import org.apache.druid.segment.incremental.ParseExceptionHandler; |
| import org.apache.druid.segment.incremental.RowIngestionMeters; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.segment.indexing.IngestionSpec; |
| import org.apache.druid.segment.indexing.TuningConfig; |
| import org.apache.druid.segment.indexing.granularity.GranularitySpec; |
| import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; |
| import org.apache.druid.segment.transform.TransformSpec; |
| import org.apache.druid.timeline.CompactionState; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.Partitions; |
| import org.apache.druid.timeline.SegmentTimeline; |
| import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; |
| import org.apache.druid.timeline.partition.TombstoneShardSpec; |
| import org.joda.time.DateTime; |
| import org.joda.time.Interval; |
| import org.joda.time.Period; |
| |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Abstract class for batch tasks like {@link IndexTask}. |
| * Provides some methods such as {@link #determineSegmentGranularity}, {@link #findInputSegments}, |
| * and {@link #determineLockGranularityAndTryLock} for easily acquiring task locks. |
| */ |
| public abstract class AbstractBatchIndexTask extends AbstractTask |
| { |
| private static final Logger log = new Logger(AbstractBatchIndexTask.class); |
| |
| private boolean segmentAvailabilityConfirmationCompleted = false; |
| private long segmentAvailabilityWaitTimeMs = 0L; |
| |
| @GuardedBy("this") |
| private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner(); |
| |
| @GuardedBy("this") |
| private boolean stopped = false; |
| |
| private TaskLockHelper taskLockHelper; |
| |
| private final int maxAllowedLockCount; |
| |
| private final Map<Interval, String> intervalToLockVersion = new HashMap<>(); |
| |
| protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context, IngestionMode ingestionMode) |
| { |
| super(id, dataSource, context, ingestionMode); |
| maxAllowedLockCount = -1; |
| } |
| |
| protected AbstractBatchIndexTask( |
| String id, |
| @Nullable String groupId, |
| @Nullable TaskResource taskResource, |
| String dataSource, |
| @Nullable Map<String, Object> context, |
| int maxAllowedLockCount, |
| IngestionMode ingestionMode |
| ) |
| { |
| super(id, groupId, taskResource, dataSource, context, ingestionMode); |
| this.maxAllowedLockCount = maxAllowedLockCount; |
| } |
| |
| /** |
| * Run this task. Before running the task, it checks the current task is already stopped and |
| * registers a cleaner to interrupt the thread running this task on abnormal exits. |
| * |
| * @see #runTask(TaskToolbox) |
| * @see #stopGracefully(TaskConfig) |
| */ |
| @Override |
| public String setup(TaskToolbox toolbox) throws Exception |
| { |
| if (taskLockHelper == null) { |
| // Subclasses generally use "isReady" to initialize the taskLockHelper. It's not guaranteed to be called before |
| // "run", and so we call it here to ensure it happens. |
| // |
| // We're only really calling it for its side effects, and we expect it to return "true". If it doesn't, something |
| // strange is going on, so bail out. |
| if (!isReady(toolbox.getTaskActionClient())) { |
| throw new ISE("Cannot start; not ready!"); |
| } |
| } |
| |
| synchronized (this) { |
| if (stopped) { |
| return "Attempting to run a task that has been stopped. See overlord & task logs for more details."; |
| } else { |
| // Register the cleaner to interrupt the current thread first. |
| // Since the resource closer cleans up the registered resources in LIFO order, |
| // this will be executed last on abnormal exists. |
| // The order is sometimes important. For example, Appenderator has two methods of close() and closeNow(), and |
| // closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close() |
| // to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be |
| // called before the current thread is interrupted, so that subsequent close() calls can be ignored. |
| final Thread currentThread = Thread.currentThread(); |
| resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt()); |
| } |
| } |
| return super.setup(toolbox); |
| } |
| |
| @Override |
| public void stopGracefully(TaskConfig taskConfig) |
| { |
| synchronized (this) { |
| stopped = true; |
| resourceCloserOnAbnormalExit.clean(taskConfig); |
| } |
| } |
| |
| /** |
| * Returns an {@link InputRow} iterator which iterates over an input source. |
| * The returned iterator filters out rows which don't satisfy the given filter or cannot be parsed properly. |
| * The returned iterator can throw {@link org.apache.druid.java.util.common.parsers.ParseException}s in |
| * {@link Iterator#hasNext()} when it hits {@link ParseExceptionHandler#maxAllowedParseExceptions}. |
| */ |
| public static FilteringCloseableInputRowIterator inputSourceReader( |
| File tmpDir, |
| DataSchema dataSchema, |
| InputSource inputSource, |
| @Nullable InputFormat inputFormat, |
| Predicate<InputRow> rowFilter, |
| RowIngestionMeters ingestionMeters, |
| ParseExceptionHandler parseExceptionHandler |
| ) throws IOException |
| { |
| final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate( |
| inputSource.reader( |
| InputRowSchemas.fromDataSchema(dataSchema), |
| inputFormat, |
| tmpDir |
| ) |
| ); |
| return new FilteringCloseableInputRowIterator( |
| inputSourceReader.read(ingestionMeters), |
| rowFilter, |
| ingestionMeters, |
| parseExceptionHandler |
| ); |
| } |
| |
| /** |
| * Creates a predicate that is true for input rows which (a) are non-null and |
| * (b) can be bucketed into an interval using the given granularity spec. |
| * This predicate filters out all rows if the granularity spec has no |
| * input intervals. |
| */ |
| protected static Predicate<InputRow> allowNonNullRowsStrictlyWithinInputIntervalsOf( |
| GranularitySpec granularitySpec |
| ) |
| { |
| return inputRow -> |
| inputRow != null |
| && granularitySpec.bucketInterval(inputRow.getTimestamp()).isPresent(); |
| } |
| |
| /** |
| * Creates a predicate that is true for input rows which (a) are non-null and |
| * (b) can be bucketed into an interval using the given granularity spec. |
| * This predicate allows all non-null rows if the granularity spec has |
| * no input intervals. |
| */ |
| protected static Predicate<InputRow> allowNonNullRowsWithinInputIntervalsOf( |
| GranularitySpec granularitySpec |
| ) |
| { |
| return inputRow -> |
| inputRow != null |
| && (granularitySpec.inputIntervals().isEmpty() |
| || granularitySpec.bucketInterval(inputRow.getTimestamp()).isPresent()); |
| } |
| |
| /** |
| * Registers a resource cleaner which is executed on abnormal exits. |
| * |
| * @see Task#stopGracefully |
| */ |
| protected void registerResourceCloserOnAbnormalExit(Consumer<TaskConfig> cleaner) |
| { |
| synchronized (this) { |
| resourceCloserOnAbnormalExit.register(cleaner); |
| } |
| } |
| |
| /** |
| * Return true if this task can overwrite existing segments. |
| */ |
| public abstract boolean requireLockExistingSegments(); |
| |
| /** |
| * Find segments to lock in the given intervals. |
| * If this task is intend to overwrite only some segments in those intervals, this method should return only those |
| * segments instead of entire segments in those intervals. |
| */ |
| public abstract List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals) |
| throws IOException; |
| |
| /** |
| * Returns true if this task is in the perfect (guaranteed) rollup mode. |
| */ |
| public abstract boolean isPerfectRollup(); |
| |
| /** |
| * Returns the segmentGranularity defined in the ingestion spec. |
| */ |
| @Nullable |
| public abstract Granularity getSegmentGranularity(); |
| |
| @Override |
| public int getPriority() |
| { |
| return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); |
| } |
| |
| public TaskLockHelper getTaskLockHelper() |
| { |
| return Preconditions.checkNotNull(taskLockHelper, "taskLockHelper is not initialized yet"); |
| } |
| |
| /** |
| * Attempts to acquire a lock that covers certain intervals. |
| * <p> |
| * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. |
| * <p> |
| * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method |
| * will initialize {@link #taskLockHelper} as a side effect. |
| * |
| * @return whether the lock was acquired |
| */ |
| public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals) |
| throws IOException |
| { |
| final boolean forceTimeChunkLock = getContextValue( |
| Tasks.FORCE_TIME_CHUNK_LOCK_KEY, |
| Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK |
| ); |
| final IngestionMode ingestionMode = getIngestionMode(); |
| // Respect task context value most. |
| if (forceTimeChunkLock || ingestionMode == IngestionMode.REPLACE) { |
| log.info( |
| "Using time chunk lock since forceTimeChunkLock is [%s] and mode is [%s].", |
| forceTimeChunkLock, ingestionMode |
| ); |
| taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK); |
| if (!intervals.isEmpty()) { |
| return tryTimeChunkLock(client, intervals); |
| } else { |
| return true; |
| } |
| } else { |
| if (!intervals.isEmpty()) { |
| final LockGranularityDetermineResult result = determineSegmentGranularity(client, intervals); |
| taskLockHelper = createLockHelper(result.lockGranularity); |
| return tryLockWithDetermineResult(client, result); |
| } else { |
| // This branch is the only one that will not initialize taskLockHelper. |
| return true; |
| } |
| } |
| } |
| |
| /** |
| * Attempts to acquire a lock that covers certain segments. |
| * <p> |
| * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. |
| * <p> |
| * This method will initialize {@link #taskLockHelper} as a side effect. |
| * |
| * @return whether the lock was acquired |
| */ |
| boolean determineLockGranularityAndTryLockWithSegments( |
| TaskActionClient client, |
| List<DataSegment> segments, |
| BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction |
| ) throws IOException |
| { |
| final boolean forceTimeChunkLock = getContextValue( |
| Tasks.FORCE_TIME_CHUNK_LOCK_KEY, |
| Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK |
| ); |
| |
| if (forceTimeChunkLock) { |
| log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY); |
| taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK); |
| segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments); |
| return tryTimeChunkLock( |
| client, |
| new ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet())) |
| ); |
| } else { |
| final LockGranularityDetermineResult result = determineSegmentGranularity(segments); |
| taskLockHelper = createLockHelper(result.lockGranularity); |
| segmentCheckFunction.accept(result.lockGranularity, segments); |
| return tryLockWithDetermineResult(client, result); |
| } |
| } |
| |
| private LockGranularityDetermineResult determineSegmentGranularity(TaskActionClient client, List<Interval> intervals) |
| throws IOException |
| { |
| if (requireLockExistingSegments()) { |
| if (isPerfectRollup()) { |
| log.info("Using timeChunk lock for perfect rollup"); |
| return new LockGranularityDetermineResult(LockGranularity.TIME_CHUNK, intervals, null); |
| } else if (!intervals.isEmpty()) { |
| // This method finds segments falling in all given intervals and then tries to lock those segments. |
| // Thus, there might be a race between calling findSegmentsToLock() and determineSegmentGranularity(), |
| // i.e., a new segment can be added to the interval or an existing segment might be removed. |
| // Removed segments should be fine because indexing tasks would do nothing with removed segments. |
| // However, tasks wouldn't know about new segments added after findSegmentsToLock() call, it may missing those |
| // segments. This is usually fine, but if you want to avoid this, you should use timeChunk lock instead. |
| return determineSegmentGranularity(findSegmentsToLock(client, intervals)); |
| } else { |
| log.info("Using segment lock for empty intervals"); |
| return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList()); |
| } |
| } else { |
| log.info("Using segment lock since we don't have to lock existing segments"); |
| return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList()); |
| } |
| } |
| |
| private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranularityDetermineResult result) |
| throws IOException |
| { |
| if (result.lockGranularity == LockGranularity.TIME_CHUNK) { |
| return tryTimeChunkLock(client, Preconditions.checkNotNull(result.intervals, "intervals")); |
| } else { |
| return taskLockHelper.verifyAndLockExistingSegments( |
| client, |
| Preconditions.checkNotNull(result.segments, "segments") |
| ); |
| } |
| } |
| |
| /** |
| * Builds a TaskAction to publish segments based on the type of locks that this |
| * task acquires. |
| * |
| * @see #determineLockType |
| */ |
| protected TaskAction<SegmentPublishResult> buildPublishAction( |
| Set<DataSegment> segmentsToBeOverwritten, |
| Set<DataSegment> segmentsToPublish, |
| SegmentSchemaMapping segmentSchemaMapping, |
| TaskLockType lockType |
| ) |
| { |
| switch (lockType) { |
| case REPLACE: |
| return SegmentTransactionalReplaceAction.create(segmentsToPublish, segmentSchemaMapping); |
| case APPEND: |
| return SegmentTransactionalAppendAction.forSegments(segmentsToPublish, segmentSchemaMapping); |
| default: |
| return SegmentTransactionalInsertAction.overwriteAction( |
| segmentsToBeOverwritten, |
| segmentsToPublish, |
| segmentSchemaMapping |
| ); |
| } |
| } |
| |
| protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException |
| { |
| // The given intervals are first converted to align with segment granularity. This is because, |
| // when an overwriting task finds a version for a given input row, it expects the interval |
| // associated to each version to be equal or larger than the time bucket where the input row falls in. |
| // See ParallelIndexSupervisorTask.findVersion(). |
| final Iterator<Interval> intervalIterator; |
| final Granularity segmentGranularity = getSegmentGranularity(); |
| if (segmentGranularity == null) { |
| intervalIterator = JodaUtils.condenseIntervals(intervals).iterator(); |
| } else { |
| IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity); |
| // the following is calling a condense that does not materialize the intervals: |
| intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator()); |
| } |
| |
| // Intervals are already condensed to avoid creating too many locks. |
| // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup. |
| Interval prev = null; |
| int locksAcquired = 0; |
| while (intervalIterator.hasNext()) { |
| final Interval cur = intervalIterator.next(); |
| if (prev != null && cur.equals(prev)) { |
| continue; |
| } |
| |
| if (maxAllowedLockCount >= 0 && locksAcquired >= maxAllowedLockCount) { |
| throw new MaxAllowedLocksExceededException(maxAllowedLockCount); |
| } |
| |
| prev = cur; |
| final TaskLockType taskLockType = determineLockType(LockGranularity.TIME_CHUNK); |
| final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(taskLockType, cur)); |
| if (lock == null) { |
| return false; |
| } |
| lock.assertNotRevoked(); |
| locksAcquired++; |
| intervalToLockVersion.put(cur, lock.getVersion()); |
| } |
| return true; |
| } |
| |
| private TaskLockHelper createLockHelper(LockGranularity lockGranularity) |
| { |
| return new TaskLockHelper( |
| lockGranularity == LockGranularity.SEGMENT, |
| determineLockType(lockGranularity) |
| ); |
| } |
| |
| /** |
| * Determines the type of lock to use with the given lock granularity. |
| */ |
| private TaskLockType determineLockType(LockGranularity lockGranularity) |
| { |
| if (lockGranularity == LockGranularity.SEGMENT) { |
| return TaskLockType.EXCLUSIVE; |
| } |
| |
| final boolean useConcurrentLocks = QueryContexts.getAsBoolean( |
| Tasks.USE_CONCURRENT_LOCKS, |
| getContextValue(Tasks.USE_CONCURRENT_LOCKS), |
| Tasks.DEFAULT_USE_CONCURRENT_LOCKS |
| ); |
| final IngestionMode ingestionMode = getIngestionMode(); |
| if (useConcurrentLocks) { |
| return ingestionMode == IngestionMode.APPEND ? TaskLockType.APPEND : TaskLockType.REPLACE; |
| } |
| |
| final TaskLockType contextTaskLockType = QueryContexts.getAsEnum( |
| Tasks.TASK_LOCK_TYPE, |
| getContextValue(Tasks.TASK_LOCK_TYPE), |
| TaskLockType.class |
| ); |
| |
| final TaskLockType lockType; |
| if (contextTaskLockType == null) { |
| lockType = getContextValue(Tasks.USE_SHARED_LOCK, false) |
| ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE; |
| } else { |
| lockType = contextTaskLockType; |
| } |
| |
| if ((lockType == TaskLockType.SHARED || lockType == TaskLockType.APPEND) |
| && ingestionMode != IngestionMode.APPEND) { |
| // Lock types SHARED and APPEND are allowed only in APPEND ingestion mode |
| return Tasks.DEFAULT_TASK_LOCK_TYPE; |
| } else { |
| return lockType; |
| } |
| } |
| |
| private LockGranularityDetermineResult determineSegmentGranularity(List<DataSegment> segments) |
| { |
| if (segments.isEmpty()) { |
| log.info("Using segment lock for empty segments"); |
| // Set useSegmentLock even though we don't get any locks. |
| // Note that we should get any lock before data ingestion if we are supposed to use timChunk lock. |
| return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList()); |
| } |
| |
| if (requireLockExistingSegments()) { |
| final Granularity granularityFromSegments = findGranularityFromSegments(segments); |
| @Nullable |
| final Granularity segmentGranularityFromSpec = getSegmentGranularity(); |
| final List<Interval> intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()); |
| |
| if (granularityFromSegments == null |
| || segmentGranularityFromSpec != null |
| && (!granularityFromSegments.equals(segmentGranularityFromSpec) |
| || segments.stream() |
| .anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) { |
| // This case is one of the followings: |
| // 1) Segments have different granularities. |
| // 2) Segment granularity in ingestion spec is different from the one of existig segments. |
| // 3) Some existing segments are not aligned with the segment granularity in the ingestion spec. |
| log.info("Detected segmentGranularity change. Using timeChunk lock"); |
| return new LockGranularityDetermineResult(LockGranularity.TIME_CHUNK, intervals, null); |
| } else { |
| // Use segment lock |
| // Create a timeline to find latest segments only |
| final SegmentTimeline timeline = SegmentTimeline.forSegments( |
| segments |
| ); |
| |
| Set<DataSegment> segmentsToLock = timeline.findNonOvershadowedObjectsInInterval( |
| JodaUtils.umbrellaInterval(intervals), |
| Partitions.ONLY_COMPLETE |
| ); |
| log.info("No segmentGranularity change detected and it's not perfect rollup. Using segment lock"); |
| return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, new ArrayList<>(segmentsToLock)); |
| } |
| } else { |
| // Set useSegmentLock even though we don't get any locks. |
| // Note that we should get any lock before data ingestion if we are supposed to use timChunk lock. |
| log.info("Using segment lock since we don't have to lock existing segments"); |
| return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList()); |
| } |
| } |
| |
| /** |
| * We currently don't support appending perfectly rolled up segments. This might be supported in the future if there |
| * is a good use case. If we want to support appending perfectly rolled up segments, we need to fix some other places |
| * first. For example, {@link HashBasedNumberedShardSpec#getLookup} assumes that |
| * the start partition ID of the set of perfectly rolled up segments is 0. Instead it might need to store an ordinal |
| * in addition to the partition ID which represents the ordinal in the perfectly rolled up segment set. |
| */ |
| public static boolean isGuaranteedRollup( |
| IngestionMode ingestionMode, |
| IndexTuningConfig tuningConfig |
| ) |
| { |
| Preconditions.checkArgument( |
| !(ingestionMode == IngestionMode.APPEND && tuningConfig.isForceGuaranteedRollup()), |
| "Perfect rollup cannot be guaranteed when appending to existing dataSources" |
| ); |
| return tuningConfig.isForceGuaranteedRollup(); |
| } |
| |
| public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments( |
| boolean storeCompactionState, |
| TaskToolbox toolbox, |
| IngestionSpec ingestionSpec |
| ) |
| { |
| if (storeCompactionState) { |
| TuningConfig tuningConfig = ingestionSpec.getTuningConfig(); |
| GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec(); |
| // We do not need to store dimensionExclusions and spatialDimensions since auto compaction does not support them |
| DimensionsSpec dimensionsSpec = ingestionSpec.getDataSchema().getDimensionsSpec() == null |
| ? null |
| : new DimensionsSpec(ingestionSpec.getDataSchema().getDimensionsSpec().getDimensions()); |
| // We only need to store filter since that is the only field auto compaction support |
| Map<String, Object> transformSpec = ingestionSpec.getDataSchema().getTransformSpec() == null || TransformSpec.NONE.equals(ingestionSpec.getDataSchema().getTransformSpec()) |
| ? null |
| : new ClientCompactionTaskTransformSpec(ingestionSpec.getDataSchema().getTransformSpec().getFilter()).asMap(toolbox.getJsonMapper()); |
| List<Object> metricsSpec = ingestionSpec.getDataSchema().getAggregators() == null |
| ? null |
| : toolbox.getJsonMapper().convertValue(ingestionSpec.getDataSchema().getAggregators(), new TypeReference<List<Object>>() {}); |
| |
| return CompactionState.addCompactionStateToSegments( |
| tuningConfig.getPartitionsSpec(), |
| dimensionsSpec, |
| metricsSpec, |
| transformSpec, |
| tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()), |
| granularitySpec.asMap(toolbox.getJsonMapper()) |
| ); |
| } else { |
| return Function.identity(); |
| } |
| } |
| |
| @Nullable |
| static Granularity findGranularityFromSegments(List<DataSegment> segments) |
| { |
| if (segments.isEmpty()) { |
| return null; |
| } |
| final Period firstSegmentPeriod = segments.get(0).getInterval().toPeriod(); |
| final boolean allHasSameGranularity = segments |
| .stream() |
| .allMatch(segment -> firstSegmentPeriod.equals(segment.getInterval().toPeriod())); |
| if (allHasSameGranularity) { |
| return GranularityType.fromPeriod(firstSegmentPeriod).getDefaultGranularity(); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * <p> |
| * This task will overwrite some segments with data read from input source outside of Druid. |
| * As a result, only the segments falling in intervalsToRead should be locked. |
| * <p> |
| * The order of segments within the returned list is unspecified, but each segment is guaranteed to appear in the list |
| * only once. |
| */ |
| protected static List<DataSegment> findInputSegments( |
| String dataSource, |
| TaskActionClient actionClient, |
| List<Interval> intervalsToRead |
| ) throws IOException |
| { |
| return ImmutableList.copyOf( |
| actionClient.submit( |
| new RetrieveUsedSegmentsAction(dataSource, intervalsToRead) |
| ) |
| ); |
| } |
| |
| /** |
| * Wait for segments to become available on the cluster. If waitTimeout is reached, giveup on waiting. This is a |
| * QoS method that can be used to make Batch Ingest tasks wait to finish until their ingested data is available on |
| * the cluster. Doing so gives an end user assurance that a Successful task status means their data is available |
| * for querying. |
| * |
| * @return True if all segments became available before the {@code waitTimeoutMillis} |
| * elapsed, otherwise false. |
| */ |
| protected boolean waitForSegmentAvailability( |
| TaskToolbox toolbox, |
| List<DataSegment> segmentsToWaitFor, |
| long waitTimeout |
| ) |
| { |
| if (segmentsToWaitFor.isEmpty()) { |
| log.info("No segments to wait for availability."); |
| return true; |
| } else if (waitTimeout < 0) { |
| log.warn("Not waiting for segment availability as waitTimeout[%s] is less than zero.", waitTimeout); |
| return false; |
| } |
| log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size()); |
| final Stopwatch stopwatch = Stopwatch.createStarted(); |
| |
| try ( |
| SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory() |
| .createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource()) |
| ) { |
| |
| final ExecutorService exec = Execs.directExecutor(); |
| final CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size()); |
| |
| notifier.start(); |
| for (DataSegment s : segmentsToWaitFor) { |
| notifier.registerSegmentHandoffCallback( |
| new SegmentDescriptor(s.getInterval(), s.getVersion(), s.getShardSpec().getPartitionNum()), |
| exec, |
| () -> { |
| doneSignal.countDown(); |
| log.debug( |
| "Segment[%s] is now available, [%d] segments remaining.", |
| s.getId(), doneSignal.getCount() |
| ); |
| } |
| ); |
| } |
| segmentAvailabilityConfirmationCompleted = doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS); |
| return segmentAvailabilityConfirmationCompleted; |
| } |
| catch (InterruptedException e) { |
| log.warn("Interrupted while waiting for segment availablity; Unable to confirm availability!"); |
| Thread.currentThread().interrupt(); |
| return false; |
| } |
| finally { |
| segmentAvailabilityWaitTimeMs = stopwatch.millisElapsed(); |
| toolbox.getEmitter().emit( |
| new ServiceMetricEvent.Builder() |
| .setDimension("dataSource", getDataSource()) |
| .setDimension("taskType", getType()) |
| .setDimension("taskId", getId()) |
| .setDimension("groupId", getGroupId()) |
| .setDimensionIfNotNull(DruidMetrics.TAGS, getContextValue(DruidMetrics.TAGS)) |
| .setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted) |
| .setMetric("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs) |
| ); |
| } |
| } |
| |
| @Nullable |
| public static String findVersion(Map<Interval, String> versions, Interval interval) |
| { |
| return versions.entrySet().stream() |
| .filter(entry -> entry.getKey().contains(interval)) |
| .map(Map.Entry::getValue) |
| .findFirst() |
| .orElse(null); |
| } |
| |
| public static NonnullPair<Interval, String> findIntervalAndVersion( |
| TaskToolbox toolbox, |
| IngestionSpec<?, ?> ingestionSpec, |
| DateTime timestamp, |
| TaskLockType taskLockType |
| ) throws IOException |
| { |
| // This method is called whenever subtasks need to allocate a new segment via the supervisor task. |
| // As a result, this code is never called in the Overlord. For now using the materialized intervals |
| // here is ok for performance reasons |
| GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec(); |
| final Set<Interval> materializedBucketIntervals = granularitySpec.materializedBucketIntervals(); |
| |
| // List locks whenever allocating a new segment because locks might be revoked and no longer valid. |
| final List<TaskLock> locks = toolbox |
| .getTaskActionClient() |
| .submit(new LockListAction()); |
| final TaskLock revokedLock = locks.stream().filter(TaskLock::isRevoked).findAny().orElse(null); |
| if (revokedLock != null) { |
| throw new ISE("Lock revoked: [%s]", revokedLock); |
| } |
| final Map<Interval, String> versions = locks.stream().collect( |
| Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion) |
| ); |
| |
| final Interval interval; |
| final String version; |
| if (!materializedBucketIntervals.isEmpty()) { |
| // If granularity spec has explicit intervals, we just need to find the version associated to the interval. |
| // This is because we should have gotten all required locks up front when the task starts up. |
| final Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp); |
| if (!maybeInterval.isPresent()) { |
| throw new IAE("Could not find interval for timestamp [%s]", timestamp); |
| } |
| |
| interval = maybeInterval.get(); |
| if (!materializedBucketIntervals.contains(interval)) { |
| throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); |
| } |
| |
| version = AbstractBatchIndexTask.findVersion(versions, interval); |
| if (version == null) { |
| throw new ISE("Cannot find a version for interval[%s]", interval); |
| } |
| } else { |
| // We don't have explicit intervals. We can use the segment granularity to figure out what |
| // interval we need, but we might not have already locked it. |
| interval = granularitySpec.getSegmentGranularity().bucket(timestamp); |
| final String existingLockVersion = AbstractBatchIndexTask.findVersion(versions, interval); |
| if (existingLockVersion == null) { |
| if (ingestionSpec.getTuningConfig() instanceof ParallelIndexTuningConfig) { |
| final int maxAllowedLockCount = ((ParallelIndexTuningConfig) ingestionSpec.getTuningConfig()) |
| .getMaxAllowedLockCount(); |
| if (maxAllowedLockCount >= 0 && locks.size() >= maxAllowedLockCount) { |
| throw new MaxAllowedLocksExceededException(maxAllowedLockCount); |
| } |
| } |
| // We don't have a lock for this interval, so we should lock it now. |
| final TaskLock lock = Preconditions.checkNotNull( |
| toolbox.getTaskActionClient().submit( |
| new TimeChunkLockTryAcquireAction(taskLockType, interval) |
| ), |
| "Cannot acquire a lock for interval[%s]", |
| interval |
| ); |
| lock.assertNotRevoked(); |
| version = lock.getVersion(); |
| } else { |
| version = existingLockVersion; |
| } |
| } |
| return new NonnullPair<>(interval, version); |
| } |
| |
| private static class LockGranularityDetermineResult |
| { |
| private final LockGranularity lockGranularity; |
| @Nullable |
| private final List<Interval> intervals; // null for segmentLock |
| @Nullable |
| private final List<DataSegment> segments; // null for timeChunkLock |
| |
| private LockGranularityDetermineResult( |
| LockGranularity lockGranularity, |
| @Nullable List<Interval> intervals, |
| @Nullable List<DataSegment> segments |
| ) |
| { |
| this.lockGranularity = lockGranularity; |
| this.intervals = intervals; |
| this.segments = segments; |
| } |
| } |
| |
| /** |
| * @return The interval and version containing the given timestamp if one exists, otherwise null. |
| */ |
| @Nullable |
| private Pair<Interval, String> lookupVersion(DateTime timestamp) |
| { |
| java.util.Optional<Map.Entry<Interval, String>> intervalAndVersion |
| = intervalToLockVersion.entrySet() |
| .stream() |
| .filter(e -> e.getKey().contains(timestamp)) |
| .findFirst(); |
| return intervalAndVersion.map( |
| entry -> new Pair<>(entry.getKey(), entry.getValue()) |
| ).orElse(null); |
| } |
| |
| protected SegmentIdWithShardSpec allocateNewSegmentForTombstone( |
| IngestionSpec ingestionSchema, |
| DateTime timestamp |
| ) |
| { |
| // Since tombstones are derived from inputIntervals, inputIntervals cannot be empty for replace, and locks are |
| // all acquired upfront then the following stream query should always find the version |
| Pair<Interval, String> intervalAndVersion = lookupVersion(timestamp); |
| return new SegmentIdWithShardSpec( |
| ingestionSchema.getDataSchema().getDataSource(), |
| intervalAndVersion.lhs, |
| intervalAndVersion.rhs, |
| new TombstoneShardSpec() |
| ); |
| } |
| |
| @Nullable |
| protected Map<String, Object> getTaskCompletionRowStats() |
| { |
| return null; |
| } |
| |
| @Nullable |
| protected Map<String, Object> getTaskCompletionUnparseableEvents() |
| { |
| return null; |
| } |
| |
| protected TaskReport.ReportMap buildLiveIngestionStatsReport( |
| IngestionState ingestionState, |
| Map<String, Object> unparseableEvents, |
| Map<String, Object> rowStats |
| ) |
| { |
| return TaskReport.buildTaskReports( |
| new IngestionStatsAndErrorsTaskReport( |
| getId(), |
| new IngestionStatsAndErrors( |
| ingestionState, |
| unparseableEvents, |
| rowStats, |
| null, |
| false, |
| 0L, |
| null, |
| null, |
| null |
| ) |
| ), |
| new TaskContextReport(getId(), getContext()) |
| ); |
| } |
| |
| /** |
| * Builds a singleton map with {@link IngestionStatsAndErrorsTaskReport#REPORT_KEY} |
| * as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as value. |
| */ |
| protected TaskReport.ReportMap buildIngestionStatsReport( |
| IngestionState ingestionState, |
| String errorMessage, |
| Long segmentsRead, |
| Long segmentsPublished |
| ) |
| { |
| return TaskReport.buildTaskReports( |
| new IngestionStatsAndErrorsTaskReport( |
| getId(), |
| new IngestionStatsAndErrors( |
| ingestionState, |
| getTaskCompletionUnparseableEvents(), |
| getTaskCompletionRowStats(), |
| errorMessage, |
| segmentAvailabilityConfirmationCompleted, |
| segmentAvailabilityWaitTimeMs, |
| Collections.emptyMap(), |
| segmentsRead, |
| segmentsPublished |
| ) |
| ), |
| new TaskContextReport(getId(), getContext()) |
| ); |
| } |
| |
| protected static boolean addBuildSegmentStatsToReport(boolean isFullReport, IngestionState ingestionState) |
| { |
| return isFullReport |
| || ingestionState == IngestionState.BUILD_SEGMENTS |
| || ingestionState == IngestionState.COMPLETED; |
| } |
| |
| } |