blob: 4a218a0b26e0c19a41f7d3f998cfcc1258dd32d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* ParallelIndexSupervisorTask is capable of running multiple subTasks for parallel indexing. This is
* applicable if the input {@link FiniteFirehoseFactory} is splittable. While this task is running, it can submit
* multiple child tasks to overlords. This task succeeds only when all its child tasks succeed; otherwise it fails.
*
* @see ParallelIndexTaskRunner
*/
public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implements ChatHandler
{
public static final String TYPE = "index_parallel";
private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
private final ParallelIndexIngestionSpec ingestionSchema;
private final InputSource baseInputSource;
/**
* If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
* In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
* If this task is overwriting existing segments, then we should know this task is changing segment granularity
* in advance to know what types of lock we should use. However, if intervals are missing, we can't know
* the segment granularity of existing segments until the task reads all data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
* are missing and force to use timeChunk lock.
*
* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
* in the task logs.
*/
private final boolean missingIntervalsInOverwriteMode;
private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();
@MonotonicNonNull
private AuthorizerMapper authorizerMapper;
/**
* A holder for the current phase runner (parallel mode) or index task (sequential mode).
* This variable is lazily initialized in {@link #initializeSubTaskCleaner}.
* Volatile since HTTP API calls can read this variable at any time while this task is running.
*/
@MonotonicNonNull
private volatile CurrentSubTaskHolder currentSubTaskHolder;
/**
* A variable to keep the given toolbox. This variable is lazily initialized in {@link #runTask}.
* Volatile since HTTP API calls can use this variable at any time while this task is running.
*/
@MonotonicNonNull
private volatile TaskToolbox toolbox;
@JsonCreator
public ParallelIndexSupervisorTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") @Nullable String groupId,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") Map<String, Object> context
)
{
super(
getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
groupId,
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
context
);
this.ingestionSchema = ingestionSchema;
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
throw new ISE("forceGuaranteedRollup is set but intervals is missing in granularitySpec");
}
}
this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser()
);
this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
&& !ingestionSchema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.isPresent();
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
}
private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
{
if (!partitionsSpec.isForceGuaranteedRollupCompatible()) {
String incompatibiltyMsg = partitionsSpec.getForceGuaranteedRollupIncompatiblityReason();
String msg = "forceGuaranteedRollup is incompatible with partitionsSpec: " + incompatibiltyMsg;
throw new ISE(msg);
}
}
@Override
public String getType()
{
return TYPE;
}
@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{
return ingestionSchema;
}
@VisibleForTesting
@Nullable
ParallelIndexTaskRunner getCurrentRunner()
{
if (isParallelMode()) {
return currentSubTaskHolder == null ? null : currentSubTaskHolder.getTask();
} else {
return null;
}
}
@Nullable
private <T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(
TaskToolbox toolbox,
Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator
)
{
final ParallelIndexTaskRunner<T, R> newRunner = runnerCreator.apply(toolbox);
if (currentSubTaskHolder.setTask(newRunner)) {
return newRunner;
} else {
return null;
}
}
private static TaskState runNextPhase(@Nullable ParallelIndexTaskRunner nextPhaseRunner) throws Exception
{
if (nextPhaseRunner == null) {
LOG.info("Task is asked to stop. Finish as failed");
return TaskState.FAILED;
} else {
return nextPhaseRunner.run();
}
}
@VisibleForTesting
SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox)
{
return new SinglePhaseParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
ingestionSchema,
getContext()
);
}
@VisibleForTesting
PartialDimensionCardinalityParallelIndexTaskRunner createPartialDimensionCardinalityRunner(TaskToolbox toolbox)
{
return new PartialDimensionCardinalityParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
ingestionSchema,
getContext()
);
}
@VisibleForTesting
PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(
TaskToolbox toolbox,
Integer numShardsOverride
)
{
return new PartialHashSegmentGenerateParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
ingestionSchema,
getContext(),
numShardsOverride
);
}
@VisibleForTesting
PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistributionRunner(TaskToolbox toolbox)
{
return new PartialDimensionDistributionParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
ingestionSchema,
getContext()
);
}
@VisibleForTesting
PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner(
TaskToolbox toolbox,
Map<Interval, PartitionBoundaries> intervalToPartitions
)
{
return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
ingestionSchema,
getContext(),
intervalToPartitions
);
}
@VisibleForTesting
PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(
TaskToolbox toolbox,
List<PartialGenericSegmentMergeIOConfig> ioConfigs
)
{
return new PartialGenericSegmentMergeParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
getIngestionSchema().getDataSchema(),
ioConfigs,
getIngestionSchema().getTuningConfig(),
getContext()
);
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec());
}
@Override
public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
throws IOException
{
return findInputSegments(
getDataSource(),
taskActionClient,
intervals,
ingestionSchema.getIOConfig().getFirehoseFactory()
);
}
@Override
public boolean requireLockExistingSegments()
{
return !ingestionSchema.getIOConfig().isAppendToExisting();
}
@Override
public boolean isPerfectRollup()
{
return isGuaranteedRollup(getIngestionSchema().getIOConfig(), getIngestionSchema().getTuningConfig());
}
@Nullable
@Override
public Granularity getSegmentGranularity()
{
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
if (granularitySpec instanceof ArbitraryGranularitySpec) {
return null;
} else {
return granularitySpec.getSegmentGranularity();
}
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
!= TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS) {
LOG.warn("maxSavedParseExceptions is not supported yet");
}
if (ingestionSchema.getTuningConfig().getMaxParseExceptions() != TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS) {
LOG.warn("maxParseExceptions is not supported yet");
}
if (ingestionSchema.getTuningConfig().isLogParseExceptions() != TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS) {
LOG.warn("logParseExceptions is not supported yet");
}
if (missingIntervalsInOverwriteMode) {
LOG.warn(
"Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
+ "Forced to use timeChunk lock."
);
}
LOG.debug(
"Found chat handler of class[%s]",
Preconditions.checkNotNull(toolbox.getChatHandlerProvider(), "chatHandlerProvider").getClass().getName()
);
authorizerMapper = toolbox.getAuthorizerMapper();
toolbox.getChatHandlerProvider().register(getId(), this, false);
try {
initializeSubTaskCleaner();
if (isParallelMode()) {
this.toolbox = toolbox;
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
return runMultiPhaseParallel(toolbox);
} else {
return runSinglePhaseParallel(toolbox);
}
} else {
if (!baseInputSource.isSplittable()) {
LOG.warn(
"firehoseFactory[%s] is not splittable. Running sequentially.",
baseInputSource.getClass().getSimpleName()
);
} else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) {
LOG.warn(
"maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. "
+ "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel "
+ "ingestion mode.",
ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks()
);
} else {
throw new ISE("Unknown reason for sequentail mode. Failing this task.");
}
return runSequential(toolbox);
}
}
finally {
toolbox.getChatHandlerProvider().unregister(getId());
}
}
private void initializeSubTaskCleaner()
{
if (isParallelMode()) {
currentSubTaskHolder = new CurrentSubTaskHolder((currentRunnerObject, taskConfig) -> {
final ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner) currentRunnerObject;
runner.stopGracefully();
});
} else {
currentSubTaskHolder = new CurrentSubTaskHolder((taskObject, taskConfig) -> {
final IndexTask task = (IndexTask) taskObject;
task.stopGracefully(taskConfig);
});
}
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
}
public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
{
if (null == tuningConfig) {
return false;
}
boolean useRangePartitions = useRangePartitions(tuningConfig);
// Range partitioning is not implemented for runSequential() (but hash partitioning is)
int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
}
private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
{
return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
}
private boolean isParallelMode()
{
return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());
}
/**
* Run the single phase parallel indexing for best-effort rollup. In this mode, each sub task created by
* the supervisor task reads data and generates segments individually.
*/
private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
{
final ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport> runner = createRunner(
toolbox,
this::createSinglePhaseTaskRunner
);
final TaskState state = runNextPhase(runner);
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, runner.getReports());
}
return TaskStatus.fromCode(getId(), state);
}
/**
* Run the multi phase parallel indexing for perfect rollup. In this mode, the parallel indexing is currently
* executed in two phases.
*
* - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key).
* - Partitioned data is maintained by {@link org.apache.druid.indexing.worker.IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
* or indexer) and merges them to create the final segments.
*/
private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
return useRangePartitions(ingestionSchema.getTuningConfig())
? runRangePartitionMultiPhaseParallel(toolbox)
: runHashPartitionMultiPhaseParallel(toolbox);
}
private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
TaskState state;
if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
// only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
throw new ISE(
"forceGuaranteedRollup is set but partitionsSpec [%s] is not a single_dim or hash partition spec.",
ingestionSchema.getTuningConfig().getPartitionsSpec()
);
}
final Integer numShardsOverride;
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
if (partitionsSpec.getNumShards() == null) {
// 0. need to determine numShards by scanning the data
LOG.info("numShards is unspecified, beginning %s phase.", PartialDimensionCardinalityTask.TYPE);
ParallelIndexTaskRunner<PartialDimensionCardinalityTask, DimensionCardinalityReport> cardinalityRunner =
createRunner(
toolbox,
this::createPartialDimensionCardinalityRunner
);
if (cardinalityRunner == null) {
throw new ISE("Could not create cardinality runner for hash partitioning.");
}
state = runNextPhase(cardinalityRunner);
if (state.isFailure()) {
return TaskStatus.failure(getId());
}
int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null
? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
: partitionsSpec.getMaxRowsPerSegment();
LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment);
if (cardinalityRunner.getReports() == null) {
throw new ISE("Could not determine cardinalities for hash partitioning.");
}
numShardsOverride = determineNumShardsFromCardinalityReport(
cardinalityRunner.getReports().values(),
effectiveMaxRowsPerSegment
);
LOG.info("Automatically determined numShards: " + numShardsOverride);
} else {
numShardsOverride = null;
}
// 1. Partial segment generation phase
ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
createRunner(
toolbox,
f -> createPartialHashSegmentGenerateRunner(toolbox, numShardsOverride)
);
state = runNextPhase(indexingRunner);
if (state.isFailure()) {
return TaskStatus.failure(getId());
}
// 2. Partial segment merge phase
// partition (interval, partitionId) -> partition locations
Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations =
groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
final List<PartialGenericSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
partitionToLocations
);
final ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
);
state = runNextPhase(mergeRunner);
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, mergeRunner.getReports());
}
return TaskStatus.fromCode(getId(), state);
}
private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
ParallelIndexTaskRunner<PartialDimensionDistributionTask, DimensionDistributionReport> distributionRunner =
createRunner(
toolbox,
this::createPartialDimensionDistributionRunner
);
TaskState distributionState = runNextPhase(distributionRunner);
if (distributionState.isFailure()) {
return TaskStatus.failure(getId(), PartialDimensionDistributionTask.TYPE + " failed");
}
Map<Interval, PartitionBoundaries> intervalToPartitions =
determineAllRangePartitions(distributionRunner.getReports().values());
if (intervalToPartitions.isEmpty()) {
String msg = "No valid rows for single dimension partitioning."
+ " All rows may have invalid timestamps or multiple dimension values.";
LOG.warn(msg);
return TaskStatus.success(getId(), msg);
}
ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
createRunner(toolbox, tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions));
TaskState indexingState = runNextPhase(indexingRunner);
if (indexingState.isFailure()) {
return TaskStatus.failure(getId(), PartialRangeSegmentGenerateTask.TYPE + " failed");
}
// partition (interval, partitionId) -> partition locations
Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations =
groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
final List<PartialGenericSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
partitionToLocations
);
ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
);
TaskState mergeState = runNextPhase(mergeRunner);
if (mergeState.isSuccess()) {
publishSegments(toolbox, mergeRunner.getReports());
}
return TaskStatus.fromCode(getId(), mergeState);
}
@VisibleForTesting
public static int determineNumShardsFromCardinalityReport(
Collection<DimensionCardinalityReport> reports,
int maxRowsPerSegment
)
{
// aggregate all the sub-reports
Map<Interval, Union> finalCollectors = new HashMap<>();
reports.forEach(report -> {
Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
Union union = finalCollectors.computeIfAbsent(
entry.getKey(),
(key) -> {
return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
}
);
HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
union.update(entryHll);
}
});
// determine the highest cardinality in any interval
long maxCardinality = 0;
for (Union union : finalCollectors.values()) {
maxCardinality = Math.max(maxCardinality, (long) union.getEstimate());
}
LOG.info("Estimated max cardinality: " + maxCardinality);
// determine numShards based on maxRowsPerSegment and the highest per-interval cardinality
long numShards = maxCardinality / maxRowsPerSegment;
if (maxCardinality % maxRowsPerSegment != 0) {
// if there's a remainder add 1 so we stay under maxRowsPerSegment
numShards += 1;
}
try {
return Math.toIntExact(numShards);
}
catch (ArithmeticException ae) {
throw new ISE("Estimated numShards [%s] exceeds integer bounds.", numShards);
}
}
private Map<Interval, PartitionBoundaries> determineAllRangePartitions(Collection<DimensionDistributionReport> reports)
{
Multimap<Interval, StringDistribution> intervalToDistributions = ArrayListMultimap.create();
reports.forEach(report -> {
Map<Interval, StringDistribution> intervalToDistribution = report.getIntervalToDistribution();
intervalToDistribution.forEach(intervalToDistributions::put);
});
return CollectionUtils.mapValues(intervalToDistributions.asMap(), this::determineRangePartition);
}
private PartitionBoundaries determineRangePartition(Collection<StringDistribution> distributions)
{
StringDistributionMerger distributionMerger = new StringSketchMerger();
distributions.forEach(distributionMerger::merge);
StringDistribution mergedDistribution = distributionMerger.getResult();
SingleDimensionPartitionsSpec partitionsSpec =
(SingleDimensionPartitionsSpec) ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec();
final PartitionBoundaries partitions;
Integer targetRowsPerSegment = partitionsSpec.getTargetRowsPerSegment();
if (targetRowsPerSegment == null) {
partitions = mergedDistribution.getEvenPartitionsByMaxSize(partitionsSpec.getMaxRowsPerSegment());
} else {
partitions = mergedDistribution.getEvenPartitionsByTargetSize(targetRowsPerSegment);
}
return partitions;
}
private static Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> groupGenericPartitionLocationsPerPartition(
Map<String, GeneratedPartitionsReport<GenericPartitionStat>> subTaskIdToReport
)
{
final Map<Pair<Interval, Integer>, BuildingShardSpec<?>> intervalAndIntegerToShardSpec = new HashMap<>();
final Object2IntMap<Interval> intervalToNextPartitionId = new Object2IntOpenHashMap<>();
final BiFunction<String, GenericPartitionStat, GenericPartitionLocation> createPartitionLocationFunction =
(subtaskId, partitionStat) -> {
final BuildingShardSpec<?> shardSpec = intervalAndIntegerToShardSpec.computeIfAbsent(
Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
key -> {
// Lazily determine the partitionId to create packed partitionIds for the core partitions.
// See the Javadoc of BucketNumberedShardSpec for details.
final int partitionId = intervalToNextPartitionId.computeInt(
partitionStat.getInterval(),
((interval, nextPartitionId) -> nextPartitionId == null ? 0 : nextPartitionId + 1)
);
return partitionStat.getSecondaryPartition().convert(partitionId);
}
);
return new GenericPartitionLocation(
partitionStat.getTaskExecutorHost(),
partitionStat.getTaskExecutorPort(),
partitionStat.isUseHttps(),
subtaskId,
partitionStat.getInterval(),
shardSpec
);
};
return groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction);
}
private static <S extends PartitionStat, L extends PartitionLocation>
Map<Pair<Interval, Integer>, List<L>> groupPartitionLocationsPerPartition(
Map<String, ? extends GeneratedPartitionsReport<S>> subTaskIdToReport,
BiFunction<String, S, L> createPartitionLocationFunction
)
{
// partition (interval, partitionId) -> partition locations
final Map<Pair<Interval, Integer>, List<L>> partitionToLocations = new HashMap<>();
for (Entry<String, ? extends GeneratedPartitionsReport<S>> entry : subTaskIdToReport.entrySet()) {
final String subTaskId = entry.getKey();
final GeneratedPartitionsReport<S> report = entry.getValue();
for (S partitionStat : report.getPartitionStats()) {
final List<L> locationsOfSamePartition = partitionToLocations.computeIfAbsent(
Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
k -> new ArrayList<>()
);
locationsOfSamePartition.add(createPartitionLocationFunction.apply(subTaskId, partitionStat));
}
}
return partitionToLocations;
}
private static List<PartialGenericSegmentMergeIOConfig> createGenericMergeIOConfigs(
int totalNumMergeTasks,
Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations
)
{
return createMergeIOConfigs(
totalNumMergeTasks,
partitionToLocations,
PartialGenericSegmentMergeIOConfig::new
);
}
@VisibleForTesting
static <M extends PartialSegmentMergeIOConfig, L extends PartitionLocation> List<M> createMergeIOConfigs(
int totalNumMergeTasks,
Map<Pair<Interval, Integer>, List<L>> partitionToLocations,
Function<List<L>, M> createPartialSegmentMergeIOConfig
)
{
final int numMergeTasks = Math.min(totalNumMergeTasks, partitionToLocations.size());
LOG.info(
"Number of merge tasks is set to [%d] based on totalNumMergeTasks[%d] and number of partitions[%d]",
numMergeTasks,
totalNumMergeTasks,
partitionToLocations.size()
);
// Randomly shuffle partitionIds to evenly distribute partitions of potentially different sizes
// This will be improved once we collect partition stats properly.
// See PartitionStat in GeneratedPartitionsReport.
final List<Pair<Interval, Integer>> partitions = new ArrayList<>(partitionToLocations.keySet());
Collections.shuffle(partitions, ThreadLocalRandom.current());
final List<M> assignedPartitionLocations = new ArrayList<>(numMergeTasks);
for (int i = 0; i < numMergeTasks; i++) {
Pair<Integer, Integer> partitionBoundaries = getPartitionBoundaries(i, partitions.size(), numMergeTasks);
final List<L> assignedToSameTask = partitions
.subList(partitionBoundaries.lhs, partitionBoundaries.rhs)
.stream()
.flatMap(intervalAndPartitionId -> partitionToLocations.get(intervalAndPartitionId).stream())
.collect(Collectors.toList());
assignedPartitionLocations.add(createPartialSegmentMergeIOConfig.apply(assignedToSameTask));
}
return assignedPartitionLocations;
}
/**
* Partition items into as evenly-sized splits as possible.
*
* @param index index of partition
* @param total number of items to partition
* @param splits number of desired partitions
*
* @return partition range: [lhs, rhs)
*/
private static Pair<Integer, Integer> getPartitionBoundaries(int index, int total, int splits)
{
int chunk = total / splits;
int remainder = total % splits;
// Distribute the remainder across the first few partitions. For example total=8 and splits=5, will give partitions
// of sizes (starting from i=0): 2, 2, 2, 1, 1
int start = index * chunk + (index < remainder ? index : remainder);
int stop = start + chunk + (index < remainder ? 1 : 0);
return Pair.of(start, stop);
}
private void publishSegments(
TaskToolbox toolbox,
Map<String, PushedSegmentsReport> reportsMap
)
throws IOException
{
final Set<DataSegment> oldSegments = new HashSet<>();
final Set<DataSegment> newSegments = new HashSet<>();
reportsMap
.values()
.forEach(report -> {
oldSegments.addAll(report.getOldSegments());
newSegments.addAll(report.getNewSegments());
});
final boolean storeCompactionState = getContextValue(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
ingestionSchema.getTuningConfig()
);
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)
);
final boolean published = newSegments.isEmpty()
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null)
.isSuccess();
if (published) {
LOG.info("Published [%d] segments", newSegments.size());
} else {
throw new ISE("Failed to publish segments");
}
}
private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
{
final IndexTask indexTask = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
new IndexIngestionSpec(
getIngestionSchema().getDataSchema(),
getIngestionSchema().getIOConfig(),
convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
),
getContext()
);
if (currentSubTaskHolder.setTask(indexTask) && indexTask.isReady(toolbox.getTaskActionClient())) {
return indexTask.run(toolbox);
} else {
LOG.info("Task is asked to stop. Finish as failed");
return TaskStatus.failure(getId());
}
}
private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig)
{
return new IndexTuningConfig(
null,
null,
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
null,
null,
null,
null,
tuningConfig.getPartitionsSpec(),
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getMaxPendingPersists(),
tuningConfig.isForceGuaranteedRollup(),
tuningConfig.isReportParseExceptions(),
null,
tuningConfig.getPushTimeout(),
tuningConfig.getSegmentWriteOutMediumFactory(),
tuningConfig.isLogParseExceptions(),
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
}
// Internal APIs
/**
* Allocate a new {@link SegmentIdWithShardSpec} for a request from {@link SinglePhaseSubTask}.
* The returned segmentIdentifiers have different {@code partitionNum} (thereby different {@link NumberedShardSpec})
* per bucket interval.
*/
@POST
@Path("/segment/allocate")
@Produces(SmileMediaTypes.APPLICATION_JACKSON_SMILE)
@Consumes(SmileMediaTypes.APPLICATION_JACKSON_SMILE)
public Response allocateSegment(DateTime timestamp, @Context final HttpServletRequest req)
{
ChatHandlers.authorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
if (toolbox == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
}
try {
final SegmentIdWithShardSpec segmentIdentifier = allocateNewSegment(timestamp);
return Response.ok(toolbox.getJsonMapper().writeValueAsBytes(segmentIdentifier)).build();
}
catch (IOException | IllegalStateException e) {
return Response.serverError().entity(Throwables.getStackTraceAsString(e)).build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST).entity(Throwables.getStackTraceAsString(e)).build();
}
}
/**
* Allocate a new segment for the given timestamp locally.
* Since the segments returned by this method overwrites any existing segments, this method should be called only
* when the {@link org.apache.druid.indexing.common.LockGranularity} is {@code TIME_CHUNK}.
*/
@VisibleForTesting
SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException
{
final String dataSource = getDataSource();
final GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec();
final Optional<SortedSet<Interval>> bucketIntervals = granularitySpec.bucketIntervals();
// List locks whenever allocating a new segment because locks might be revoked and no longer valid.
final List<TaskLock> locks = toolbox
.getTaskActionClient()
.submit(new LockListAction());
final TaskLock revokedLock = locks.stream().filter(TaskLock::isRevoked).findAny().orElse(null);
if (revokedLock != null) {
throw new ISE("Lock revoked: [%s]", revokedLock);
}
final Map<Interval, String> versions = locks
.stream()
.collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion));
Interval interval;
String version;
if (bucketIntervals.isPresent()) {
// If granularity spec has explicit intervals, we just need to find the version associated to the interval.
// This is because we should have gotten all required locks up front when the task starts up.
final Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
if (!maybeInterval.isPresent()) {
throw new IAE("Could not find interval for timestamp [%s]", timestamp);
}
interval = maybeInterval.get();
if (!bucketIntervals.get().contains(interval)) {
throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec);
}
version = findVersion(versions, interval);
if (version == null) {
throw new ISE("Cannot find a version for interval[%s]", interval);
}
} else {
// We don't have explicit intervals. We can use the segment granularity to figure out what
// interval we need, but we might not have already locked it.
interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
version = findVersion(versions, interval);
if (version == null) {
// We don't have a lock for this interval, so we should lock it now.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)
),
"Cannot acquire a lock for interval[%s]",
interval
);
version = lock.getVersion();
}
}
final int partitionNum = Counters.getAndIncrementInt(partitionNumCountersPerInterval, interval);
return new SegmentIdWithShardSpec(
dataSource,
interval,
version,
new BuildingNumberedShardSpec(partitionNum)
);
}
@Nullable
public static String findVersion(Map<Interval, String> versions, Interval interval)
{
return versions.entrySet().stream()
.filter(entry -> entry.getKey().contains(interval))
.map(Entry::getValue)
.findFirst()
.orElse(null);
}
static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema)
{
return ingestionSchema.getIOConfig().getNonNullInputFormat();
}
/**
* Worker tasks spawned by the supervisor call this API to report the segments they generated and pushed.
*
* @see ParallelIndexSupervisorTaskClient#report(String, SubTaskReport)
*/
@POST
@Path("/report")
@Consumes(SmileMediaTypes.APPLICATION_JACKSON_SMILE)
public Response report(
SubTaskReport report,
@Context final HttpServletRequest req
)
{
ChatHandlers.authorizationCheck(
req,
Action.WRITE,
getDataSource(),
authorizerMapper
);
if (currentSubTaskHolder == null || currentSubTaskHolder.getTask() == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final ParallelIndexTaskRunner runner = currentSubTaskHolder.getTask();
//noinspection unchecked
runner.collectReport(report);
return Response.ok().build();
}
}
// External APIs to get running status
@GET
@Path("/mode")
@Produces(MediaType.APPLICATION_JSON)
public Response getMode(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
}
@GET
@Path("/phase")
@Produces(MediaType.APPLICATION_JSON)
public Response getPhaseName(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
if (isParallelMode()) {
final ParallelIndexTaskRunner runner = getCurrentRunner();
if (runner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running").build();
} else {
return Response.ok(runner.getName()).build();
}
} else {
return Response.status(Status.BAD_REQUEST).entity("task is running in the sequential mode").build();
}
}
@GET
@Path("/progress")
@Produces(MediaType.APPLICATION_JSON)
public Response getProgress(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getProgress()).build();
}
}
@GET
@Path("/subtasks/running")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasks(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getRunningTaskIds()).build();
}
}
@GET
@Path("/subtaskspecs")
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getSubTaskSpecs()).build();
}
}
@GET
@Path("/subtaskspecs/running")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getRunningSubTaskSpecs()).build();
}
}
@GET
@Path("/subtaskspecs/complete")
@Produces(MediaType.APPLICATION_JSON)
public Response getCompleteSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getCompleteSubTaskSpecs()).build();
}
}
@GET
@Path("/subtaskspec/{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpec(@PathParam("id") String id, @Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final SubTaskSpec subTaskSpec = currentRunner.getSubTaskSpec(id);
if (subTaskSpec == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(subTaskSpec).build();
}
}
}
@GET
@Path("/subtaskspec/{id}/state")
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskState(@PathParam("id") String id, @Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final SubTaskSpecStatus subTaskSpecStatus = currentRunner.getSubTaskState(id);
if (subTaskSpecStatus == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(subTaskSpecStatus).build();
}
}
}
@GET
@Path("/subtaskspec/{id}/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getCompleteSubTaskSpecAttemptHistory(
@PathParam("id") String id,
@Context final HttpServletRequest req
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final TaskHistory taskHistory = currentRunner.getCompleteSubTaskSpecAttemptHistory(id);
if (taskHistory == null) {
return Response.status(Status.NOT_FOUND).build();
} else {
return Response.ok(taskHistory.getAttemptHistory()).build();
}
}
}
}