blob: f8596ec17ac6ff3f5b2a320723dfee5a19f58fec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Abstract class for batch tasks like {@link IndexTask}.
* Provides some methods such as {@link #determineSegmentGranularity}, {@link #findInputSegments},
* and {@link #determineLockGranularityandTryLock} for easily acquiring task locks.
*/
public abstract class AbstractBatchIndexTask extends AbstractTask
{
private static final Logger log = new Logger(AbstractBatchIndexTask.class);
@GuardedBy("this")
private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner();
@GuardedBy("this")
private boolean stopped = false;
private TaskLockHelper taskLockHelper;
protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context)
{
super(id, dataSource, context);
}
protected AbstractBatchIndexTask(
String id,
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
@Nullable Map<String, Object> context
)
{
super(id, groupId, taskResource, dataSource, context);
}
/**
* Run this task. Before running the task, it checks the current task is already stopped and
* registers a cleaner to interrupt the thread running this task on abnormal exits.
*
* @see #runTask(TaskToolbox)
* @see #stopGracefully(TaskConfig)
*/
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
synchronized (this) {
if (stopped) {
return TaskStatus.failure(getId());
} else {
// Register the cleaner to interrupt the current thread first.
// Since the resource closer cleans up the registered resources in LIFO order,
// this will be executed last on abnormal exists.
// The order is sometimes important. For example, Appenderator has two methods of close() and closeNow(), and
// closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close()
// to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be
// called before the current thread is interrupted, so that subsequent close() calls can be ignored.
final Thread currentThread = Thread.currentThread();
resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt());
}
}
return runTask(toolbox);
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
synchronized (this) {
stopped = true;
resourceCloserOnAbnormalExit.clean(taskConfig);
}
}
/**
* Returns an {@link InputRow} iterator which iterates over an input source.
* The returned iterator filters out rows which don't satisfy the given filter or cannot be parsed properly.
* The returned iterator can throw {@link org.apache.druid.java.util.common.parsers.ParseException}s in
* {@link Iterator#hasNext()} when it hits {@link ParseExceptionHandler#maxAllowedParseExceptions}.
*/
public static FilteringCloseableInputRowIterator inputSourceReader(
File tmpDir,
DataSchema dataSchema,
InputSource inputSource,
@Nullable InputFormat inputFormat,
Predicate<InputRow> rowFilter,
RowIngestionMeters ingestionMeters,
ParseExceptionHandler parseExceptionHandler
) throws IOException
{
final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(Collectors.toList());
final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
inputSource.reader(
new InputRowSchema(
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
metricsNames
),
inputFormat,
tmpDir
)
);
return new FilteringCloseableInputRowIterator(
inputSourceReader.read(),
rowFilter,
ingestionMeters,
parseExceptionHandler
);
}
protected static Predicate<InputRow> defaultRowFilter(GranularitySpec granularitySpec)
{
return inputRow -> {
if (inputRow == null) {
return false;
}
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
return optInterval.isPresent();
};
}
/**
* Registers a resource cleaner which is executed on abnormal exits.
*
* @see Task#stopGracefully
*/
protected void registerResourceCloserOnAbnormalExit(Consumer<TaskConfig> cleaner)
{
synchronized (this) {
resourceCloserOnAbnormalExit.register(cleaner);
}
}
/**
* The method to actually process this task. This method is executed in {@link #run(TaskToolbox)}.
*/
public abstract TaskStatus runTask(TaskToolbox toolbox) throws Exception;
/**
* Return true if this task can overwrite existing segments.
*/
public abstract boolean requireLockExistingSegments();
/**
* Find segments to lock in the given intervals.
* If this task is intend to overwrite only some segments in those intervals, this method should return only those
* segments instead of entire segments in those intervals.
*/
public abstract List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
throws IOException;
/**
* Returns true if this task is in the perfect (guaranteed) rollup mode.
*/
public abstract boolean isPerfectRollup();
/**
* Returns the segmentGranularity defined in the ingestion spec.
*/
@Nullable
public abstract Granularity getSegmentGranularity();
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
public TaskLockHelper getTaskLockHelper()
{
return Preconditions.checkNotNull(taskLockHelper, "taskLockHelper is not initialized yet");
}
/**
* Determine lockGranularity to use and try to acquire necessary locks.
* This method respects the value of 'forceTimeChunkLock' in task context.
* If it's set to false or missing, this method checks if this task can use segmentLock.
*/
protected boolean determineLockGranularityAndTryLock(
TaskActionClient client,
GranularitySpec granularitySpec
) throws IOException
{
final List<Interval> intervals = granularitySpec.bucketIntervals().isPresent()
? new ArrayList<>(granularitySpec.bucketIntervals().get())
: Collections.emptyList();
return determineLockGranularityandTryLock(client, intervals);
}
boolean determineLockGranularityandTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
// Respect task context value most.
if (forceTimeChunkLock) {
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
taskLockHelper = new TaskLockHelper(false);
if (!intervals.isEmpty()) {
return tryTimeChunkLock(client, intervals);
} else {
return true;
}
} else {
if (!intervals.isEmpty()) {
final LockGranularityDetermineResult result = determineSegmentGranularity(client, intervals);
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
return tryLockWithDetermineResult(client, result);
} else {
return true;
}
}
}
boolean determineLockGranularityandTryLockWithSegments(
TaskActionClient client,
List<DataSegment> segments,
BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction
) throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
if (forceTimeChunkLock) {
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
taskLockHelper = new TaskLockHelper(false);
segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
return tryTimeChunkLock(
client,
new ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()))
);
} else {
final LockGranularityDetermineResult result = determineSegmentGranularity(segments);
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
segmentCheckFunction.accept(result.lockGranularity, segments);
return tryLockWithDetermineResult(client, result);
}
}
private LockGranularityDetermineResult determineSegmentGranularity(TaskActionClient client, List<Interval> intervals)
throws IOException
{
if (requireLockExistingSegments()) {
if (isPerfectRollup()) {
log.info("Using timeChunk lock for perfect rollup");
return new LockGranularityDetermineResult(LockGranularity.TIME_CHUNK, intervals, null);
} else if (!intervals.isEmpty()) {
// This method finds segments falling in all given intervals and then tries to lock those segments.
// Thus, there might be a race between calling findSegmentsToLock() and determineSegmentGranularity(),
// i.e., a new segment can be added to the interval or an existing segment might be removed.
// Removed segments should be fine because indexing tasks would do nothing with removed segments.
// However, tasks wouldn't know about new segments added after findSegmentsToLock() call, it may missing those
// segments. This is usually fine, but if you want to avoid this, you should use timeChunk lock instead.
return determineSegmentGranularity(findSegmentsToLock(client, intervals));
} else {
log.info("Using segment lock for empty intervals");
return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
}
} else {
log.info("Using segment lock since we don't have to lock existing segments");
return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
}
}
private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranularityDetermineResult result)
throws IOException
{
if (result.lockGranularity == LockGranularity.TIME_CHUNK) {
return tryTimeChunkLock(client, Preconditions.checkNotNull(result.intervals, "intervals"));
} else {
return taskLockHelper.verifyAndLockExistingSegments(
client,
Preconditions.checkNotNull(result.segments, "segments")
);
}
}
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
// In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
final Set<Interval> uniqueIntervals = new HashSet<>();
for (Interval interval : JodaUtils.condenseIntervals(intervals)) {
final Granularity segmentGranularity = getSegmentGranularity();
if (segmentGranularity == null) {
uniqueIntervals.add(interval);
} else {
Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval));
}
}
for (Interval interval : uniqueIntervals) {
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
if (lock == null) {
return false;
}
}
return true;
}
private LockGranularityDetermineResult determineSegmentGranularity(List<DataSegment> segments)
{
if (segments.isEmpty()) {
log.info("Using segment lock for empty segments");
// Set useSegmentLock even though we don't get any locks.
// Note that we should get any lock before data ingestion if we are supposed to use timChunk lock.
return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
}
if (requireLockExistingSegments()) {
final Granularity granularityFromSegments = findGranularityFromSegments(segments);
@Nullable
final Granularity segmentGranularityFromSpec = getSegmentGranularity();
final List<Interval> intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
if (granularityFromSegments == null
|| segmentGranularityFromSpec != null
&& (!granularityFromSegments.equals(segmentGranularityFromSpec)
|| segments.stream().anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
// This case is one of the followings:
// 1) Segments have different granularities.
// 2) Segment granularity in ingestion spec is different from the one of existig segments.
// 3) Some existing segments are not aligned with the segment granularity in the ingestion spec.
log.info("Detected segmentGranularity change. Using timeChunk lock");
return new LockGranularityDetermineResult(LockGranularity.TIME_CHUNK, intervals, null);
} else {
// Use segment lock
// Create a timeline to find latest segments only
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(
segments
);
Set<DataSegment> segmentsToLock = timeline.findNonOvershadowedObjectsInInterval(
JodaUtils.umbrellaInterval(intervals),
Partitions.ONLY_COMPLETE
);
log.info("No segmentGranularity change detected and it's not perfect rollup. Using segment lock");
return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, new ArrayList<>(segmentsToLock));
}
} else {
// Set useSegmentLock even though we don't get any locks.
// Note that we should get any lock before data ingestion if we are supposed to use timChunk lock.
log.info("Using segment lock since we don't have to lock existing segments");
return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList());
}
}
/**
* We currently don't support appending perfectly rolled up segments. This might be supported in the future if there
* is a good use case. If we want to support appending perfectly rolled up segments, we need to fix some other places
* first. For example, {@link HashBasedNumberedShardSpec#getLookup} assumes that
* the start partition ID of the set of perfectly rolled up segments is 0. Instead it might need to store an ordinal
* in addition to the partition ID which represents the ordinal in the perfectly rolled up segment set.
*/
public static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
{
Preconditions.checkArgument(
!tuningConfig.isForceGuaranteedRollup() || !ioConfig.isAppendToExisting(),
"Perfect rollup cannot be guaranteed when appending to existing dataSources"
);
return tuningConfig.isForceGuaranteedRollup();
}
public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(
boolean storeCompactionState,
TaskToolbox toolbox,
IndexTuningConfig tuningConfig
)
{
if (storeCompactionState) {
final Map<String, Object> indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap);
return segments -> segments
.stream()
.map(s -> s.withLastCompactionState(compactionState))
.collect(Collectors.toSet());
} else {
return Function.identity();
}
}
@Nullable
static Granularity findGranularityFromSegments(List<DataSegment> segments)
{
if (segments.isEmpty()) {
return null;
}
final Period firstSegmentPeriod = segments.get(0).getInterval().toPeriod();
final boolean allHasSameGranularity = segments
.stream()
.allMatch(segment -> firstSegmentPeriod.equals(segment.getInterval().toPeriod()));
if (allHasSameGranularity) {
return GranularityType.fromPeriod(firstSegmentPeriod).getDefaultGranularity();
} else {
return null;
}
}
/**
* If the given firehoseFactory is {@link IngestSegmentFirehoseFactory}, then it finds the segments to lock
* from the firehoseFactory. This is because those segments will be read by this task no matter what segments would be
* filtered by intervalsToRead, so they need to be locked.
*
* However, firehoseFactory is not IngestSegmentFirehoseFactory, it means this task will overwrite some segments
* with data read from some input source outside of Druid. As a result, only the segments falling in intervalsToRead
* should be locked.
*
* The order of segments within the returned list is unspecified, but each segment is guaranteed to appear in the list
* only once.
*/
protected static List<DataSegment> findInputSegments(
String dataSource,
TaskActionClient actionClient,
List<Interval> intervalsToRead,
FirehoseFactory firehoseFactory
) throws IOException
{
if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
// intervalsToRead is ignored here.
final List<WindowedSegmentId> inputSegments = ((IngestSegmentFirehoseFactory) firehoseFactory).getSegments();
if (inputSegments == null) {
final Interval inputInterval = Preconditions.checkNotNull(
((IngestSegmentFirehoseFactory) firehoseFactory).getInterval(),
"input interval"
);
return ImmutableList.copyOf(
actionClient.submit(
new RetrieveUsedSegmentsAction(dataSource, inputInterval, null, Segments.ONLY_VISIBLE)
)
);
} else {
final List<String> inputSegmentIds =
inputSegments.stream().map(WindowedSegmentId::getSegmentId).collect(Collectors.toList());
final Collection<DataSegment> dataSegmentsInIntervals = actionClient.submit(
new RetrieveUsedSegmentsAction(
dataSource,
null,
inputSegments.stream()
.flatMap(windowedSegmentId -> windowedSegmentId.getIntervals().stream())
.collect(Collectors.toSet()),
Segments.ONLY_VISIBLE
)
);
return dataSegmentsInIntervals.stream()
.filter(segment -> inputSegmentIds.contains(segment.getId().toString()))
.collect(Collectors.toList());
}
} else {
return ImmutableList.copyOf(
actionClient.submit(
new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE)
)
);
}
}
private static class LockGranularityDetermineResult
{
private final LockGranularity lockGranularity;
@Nullable
private final List<Interval> intervals; // null for segmentLock
@Nullable
private final List<DataSegment> segments; // null for timeChunkLock
private LockGranularityDetermineResult(
LockGranularity lockGranularity,
@Nullable List<Interval> intervals,
@Nullable List<DataSegment> segments
)
{
this.lockGranularity = lockGranularity;
this.intervals = intervals;
this.segments = segments;
}
}
}