blob: 935adb3cde0ff47668a7ad664ab39755356d1b08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* ParallelIndexSupervisorTask is capable of running multiple subTasks for parallel indexing. This is
* applicable if the input {@link InputSource} is splittable. While this task is running, it can submit
* multiple child tasks to overlords. This task succeeds only when all its child tasks succeed; otherwise it fails.
*
* @see ParallelIndexTaskRunner
*/
public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
implements ChatHandler, PendingSegmentAllocatingTask
{
public static final String TYPE = "index_parallel";
private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
private static final String TASK_PHASE_FAILURE_MSG = "Failed in phase[%s]. See task logs for details.";
// Sometimes Druid estimates one shard for hash partitioning despite conditions
// indicating that there ought to be more than one. We have not been able to
// reproduce but looking at the code around where the following constant is used one
// possibility is that the sketch's estimate is negative. If that case happens
// code has been added to log it and to set the estimate to the value of the
// following constant. It is not necessary to parameterize this value since if this
// happens it is a bug and the new logging may now provide some evidence to reproduce
// and fix
private static final long DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE = 7L;
private final ParallelIndexIngestionSpec ingestionSchema;
/**
* Base name for the {@link SubTaskSpec} ID.
* It is usually null for most task types and {@link #getId()} is used as the base name.
* Only the compaction task can have a special base name.
*/
private final String baseSubtaskSpecName;
private final InputSource baseInputSource;
/**
* If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
* In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
* If this task is overwriting existing segments, then we should know this task is changing segment granularity
* in advance to know what types of lock we should use. However, if intervals are missing, we can't know
* the segment granularity of existing segments until the task reads all data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
* are missing and force to use timeChunk lock.
* <p>
* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
* in the task logs.
*/
private final boolean missingIntervalsInOverwriteMode;
private final long awaitSegmentAvailabilityTimeoutMillis;
@MonotonicNonNull
private AuthorizerMapper authorizerMapper;
/**
* A holder for the current phase runner (parallel mode) or index task (sequential mode).
* This variable is lazily initialized in {@link #initializeSubTaskCleaner}.
* Volatile since HTTP API calls can read this variable at any time while this task is running.
*/
@MonotonicNonNull
private volatile CurrentSubTaskHolder currentSubTaskHolder;
/**
* A variable to keep the given toolbox. This variable is lazily initialized in {@link #runTask}.
* Volatile since HTTP API calls can use this variable at any time while this task is running.
*/
@MonotonicNonNull
private volatile TaskToolbox toolbox;
/**
* Row stats for index generate phase of parallel indexing. This variable is
* lazily initialized in {@link #runTask}.
* Volatile since HTTP API calls can use this variable while this task is running.
*/
@MonotonicNonNull
private volatile Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;
private IngestionState ingestionState;
private TaskReport.ReportMap completionReports;
private Long segmentsRead;
private Long segmentsPublished;
private final boolean isCompactionTask;
@JsonCreator
public ParallelIndexSupervisorTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") @Nullable String groupId,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("spec") ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") Map<String, Object> context
)
{
this(id, groupId, taskResource, ingestionSchema, null, context, false);
}
public ParallelIndexSupervisorTask(
String id,
@Nullable String groupId,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
@Nullable String baseSubtaskSpecName,
Map<String, Object> context,
boolean isCompactionTask
)
{
super(
getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
groupId,
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
context,
ingestionSchema.getTuningConfig().getMaxAllowedLockCount(),
computeBatchIngestionMode(ingestionSchema.getIOConfig())
);
this.ingestionSchema = ingestionSchema;
this.baseSubtaskSpecName = baseSubtaskSpecName == null ? getId() : baseSubtaskSpecName;
if (getIngestionMode() == IngestionMode.REPLACE &&
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
throw new ISE("GranularitySpec's intervals cannot be empty when using replace.");
}
if (isGuaranteedRollup(getIngestionMode(), ingestionSchema.getTuningConfig())) {
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
}
this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource();
this.missingIntervalsInOverwriteMode = (getIngestionMode()
!= IngestionMode.APPEND)
&& ingestionSchema.getDataSchema()
.getGranularitySpec()
.inputIntervals()
.isEmpty();
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
awaitSegmentAvailabilityTimeoutMillis = ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
this.ingestionState = IngestionState.NOT_STARTED;
this.isCompactionTask = isCompactionTask;
}
private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
{
if (!partitionsSpec.isForceGuaranteedRollupCompatible()) {
String incompatibiltyMsg = partitionsSpec.getForceGuaranteedRollupIncompatiblityReason();
String msg = "forceGuaranteedRollup is incompatible with partitionsSpec: " + incompatibiltyMsg;
throw new ISE(msg);
}
}
@Override
public String getType()
{
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(i, ResourceType.EXTERNAL), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@Nullable
@JsonIgnore
public TaskReport.ReportMap getCompletionReports()
{
return completionReports;
}
@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{
return ingestionSchema;
}
@VisibleForTesting
@Nullable
ParallelIndexTaskRunner getCurrentRunner()
{
if (isParallelMode()) {
return currentSubTaskHolder == null ? null : currentSubTaskHolder.getTask();
} else {
return null;
}
}
@Nullable
@VisibleForTesting
<T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(
TaskToolbox toolbox,
Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator
)
{
final ParallelIndexTaskRunner<T, R> newRunner = runnerCreator.apply(toolbox);
if (currentSubTaskHolder.setTask(newRunner)) {
return newRunner;
} else {
return null;
}
}
private static TaskState runNextPhase(@Nullable ParallelIndexTaskRunner nextPhaseRunner) throws Exception
{
if (nextPhaseRunner == null) {
LOG.info("Task is asked to stop. Finish as failed");
return TaskState.FAILED;
} else {
return nextPhaseRunner.run();
}
}
@VisibleForTesting
SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox)
{
return new SinglePhaseParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
baseSubtaskSpecName,
ingestionSchema,
getContext()
);
}
@VisibleForTesting
PartialDimensionCardinalityParallelIndexTaskRunner createPartialDimensionCardinalityRunner(TaskToolbox toolbox)
{
return new PartialDimensionCardinalityParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
baseSubtaskSpecName,
ingestionSchema,
getContext()
);
}
@VisibleForTesting
PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(
TaskToolbox toolbox,
ParallelIndexIngestionSpec ingestionSchema,
@Nullable Map<Interval, Integer> intervalToNumShardsOverride
)
{
return new PartialHashSegmentGenerateParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
baseSubtaskSpecName,
ingestionSchema,
getContext(),
intervalToNumShardsOverride
);
}
@VisibleForTesting
PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistributionRunner(TaskToolbox toolbox)
{
return new PartialDimensionDistributionParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
baseSubtaskSpecName,
ingestionSchema,
getContext()
);
}
@VisibleForTesting
PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner(
TaskToolbox toolbox,
Map<Interval, PartitionBoundaries> intervalToPartitions,
ParallelIndexIngestionSpec ingestionSchema
)
{
return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
baseSubtaskSpecName,
ingestionSchema,
getContext(),
intervalToPartitions
);
}
@VisibleForTesting
PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(
TaskToolbox toolbox,
List<PartialSegmentMergeIOConfig> ioConfigs,
ParallelIndexIngestionSpec ingestionSchema
)
{
return new PartialGenericSegmentMergeParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
baseSubtaskSpecName,
ingestionSchema.getDataSchema(),
ioConfigs,
ingestionSchema.getTuningConfig(),
getContext()
);
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return determineLockGranularityAndTryLock(
taskActionClient,
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
);
}
@Override
public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
throws IOException
{
return findInputSegments(
getDataSource(),
taskActionClient,
intervals
);
}
@Override
public boolean requireLockExistingSegments()
{
return getIngestionMode() != IngestionMode.APPEND;
}
@Override
public boolean isPerfectRollup()
{
return isGuaranteedRollup(
getIngestionMode(),
getIngestionSchema().getTuningConfig()
);
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Nullable
@Override
public Granularity getSegmentGranularity()
{
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
if (granularitySpec instanceof ArbitraryGranularitySpec) {
return null;
} else {
return granularitySpec.getSegmentGranularity();
}
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
!= TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS) {
LOG.warn("maxSavedParseExceptions is not supported yet");
}
if (ingestionSchema.getTuningConfig().getMaxParseExceptions() != TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS) {
LOG.warn("maxParseExceptions is not supported yet");
}
if (ingestionSchema.getTuningConfig().isLogParseExceptions() != TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS) {
LOG.warn("logParseExceptions is not supported yet");
}
if (missingIntervalsInOverwriteMode) {
LOG.warn(
"Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
+ "Forced to use timeChunk lock."
);
}
LOG.debug(
"Found chat handler of class[%s]",
Preconditions.checkNotNull(toolbox.getChatHandlerProvider(), "chatHandlerProvider").getClass().getName()
);
authorizerMapper = toolbox.getAuthorizerMapper();
toolbox.getChatHandlerProvider().register(getId(), this, false);
// the lineage-based segment allocation protocol must be used as the legacy protocol has a critical bug
// (see SinglePhaseParallelIndexTaskRunner.allocateNewSegment()). However, we tell subtasks to use
// the legacy protocol by default if it's not explicitly set in the taskContext here. This is to guarantee that
// every subtask uses the same protocol so that they can succeed during the replacing rolling upgrade.
// Once the Overlord is upgraded, it will set this context explicitly and new tasks will use the new protocol.
// See DefaultTaskConfig and TaskQueue.add().
addToContextIfAbsent(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
SinglePhaseParallelIndexTaskRunner.LEGACY_DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
);
try {
initializeSubTaskCleaner();
this.toolbox = toolbox;
if (isParallelMode()) {
// emit metric for parallel batch ingestion mode:
emitMetric(toolbox.getEmitter(), "ingest/count", 1);
if (isGuaranteedRollup(
getIngestionMode(),
ingestionSchema.getTuningConfig()
)) {
return runMultiPhaseParallel(toolbox);
} else {
return runSinglePhaseParallel(toolbox);
}
} else {
if (!baseInputSource.isSplittable()) {
LOG.warn(
"firehoseFactory[%s] is not splittable. Running sequentially.",
baseInputSource.getClass().getSimpleName()
);
} else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) {
LOG.warn(
"maxNumConcurrentSubTasks[%s] is less than or equal to 1. Running sequentially. "
+ "Please set maxNumConcurrentSubTasks to something higher than 1 if you want to run in parallel "
+ "ingestion mode.",
ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks()
);
} else {
throw new ISE("Unknown reason for sequentail mode. Failing this task.");
}
return runSequential(toolbox);
}
}
finally {
ingestionState = IngestionState.COMPLETED;
toolbox.getChatHandlerProvider().unregister(getId());
}
}
private void initializeSubTaskCleaner()
{
if (isParallelMode()) {
currentSubTaskHolder = new CurrentSubTaskHolder((currentRunnerObject, taskConfig) -> {
final ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner) currentRunnerObject;
runner.stopGracefully(null);
});
} else {
currentSubTaskHolder = new CurrentSubTaskHolder((taskObject, taskConfig) -> {
final IndexTask task = (IndexTask) taskObject;
task.stopGracefully(taskConfig);
});
}
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
}
/**
* Returns true if this task can run in the parallel mode with the given inputSource and tuningConfig.
* This method should be synchronized with CompactSegments.isParallelMode(ClientCompactionTaskQueryTuningConfig).
*/
public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
{
if (null == tuningConfig) {
return false;
}
boolean useRangePartitions = useRangePartitions(tuningConfig);
// Range partitioning is not implemented for runSequential() (but hash partitioning is)
int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
}
private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
{
return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof DimensionRangePartitionsSpec;
}
private boolean isParallelMode()
{
return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());
}
/**
* Attempt to wait for indexed segments to become available on the cluster.
* @param reportsMap Map containing information with published segments that we are going to wait for.
*/
private void waitForSegmentAvailability(Map<String, PushedSegmentsReport> reportsMap)
{
ArrayList<DataSegment> segmentsToWaitFor = new ArrayList<>();
reportsMap.values()
.forEach(report -> {
segmentsToWaitFor.addAll(report.getNewSegments());
});
waitForSegmentAvailability(
toolbox,
segmentsToWaitFor,
awaitSegmentAvailabilityTimeoutMillis
);
}
/**
* Run the single phase parallel indexing for best-effort rollup. In this mode, each sub task created by
* the supervisor task reads data and generates segments individually.
*/
private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
{
ingestionState = IngestionState.BUILD_SEGMENTS;
ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport> parallelSinglePhaseRunner = createRunner(
toolbox,
this::createSinglePhaseTaskRunner
);
final TaskState state = runNextPhase(parallelSinglePhaseRunner);
TaskStatus taskStatus;
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
if (isCompactionTask) {
// Populate segmentsRead only for compaction tasks
segmentsRead = parallelSinglePhaseRunner.getReports()
.values()
.stream()
.mapToLong(report -> report.getOldSegments().size()).sum();
}
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
taskStatus = TaskStatus.success(getId());
} else {
// there is only success or failure after running....
Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state);
final String errorMessage;
if (parallelSinglePhaseRunner.getStopReason() != null) {
errorMessage = parallelSinglePhaseRunner.getStopReason();
} else {
errorMessage = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
parallelSinglePhaseRunner.getName()
);
}
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}
/**
* Run the multi phase parallel indexing for perfect rollup. In this mode, the parallel indexing is currently
* executed in two phases.
* <p>
* - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key).
* - Partitioned data is maintained by {@link IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
* or indexer) and merges them to create the final segments.
*/
private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
return useRangePartitions(ingestionSchema.getTuningConfig())
? runRangePartitionMultiPhaseParallel(toolbox)
: runHashPartitionMultiPhaseParallel(toolbox);
}
private static ParallelIndexIngestionSpec rewriteIngestionSpecWithIntervalsIfMissing(
ParallelIndexIngestionSpec ingestionSchema,
Collection<Interval> intervals
)
{
if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return ingestionSchema
.withDataSchema(
ingestionSchema.getDataSchema().withGranularitySpec(
ingestionSchema
.getDataSchema()
.getGranularitySpec()
.withIntervals(new ArrayList<>(intervals))
)
);
} else {
return ingestionSchema;
}
}
@VisibleForTesting
TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
TaskState state;
ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
// only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
throw new ISE(
"forceGuaranteedRollup is set but partitionsSpec [%s] is not a single_dim or hash partition spec.",
ingestionSchema.getTuningConfig().getPartitionsSpec()
);
}
final Map<Interval, Integer> intervalToNumShards;
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
final boolean needsInputSampling =
partitionsSpec.getNumShards() == null
|| ingestionSchemaToUse.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
if (needsInputSampling) {
// 0. need to determine intervals and numShards by scanning the data
LOG.info("Needs to determine intervals or numShards, beginning %s phase.", PartialDimensionCardinalityTask.TYPE);
ParallelIndexTaskRunner<PartialDimensionCardinalityTask, DimensionCardinalityReport> cardinalityRunner =
createRunner(
toolbox,
this::createPartialDimensionCardinalityRunner
);
state = runNextPhase(cardinalityRunner);
if (state.isFailure()) {
String errMsg = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
cardinalityRunner.getName()
);
return TaskStatus.failure(getId(), errMsg);
}
if (cardinalityRunner.getReports().isEmpty()) {
String msg = "No valid rows for hash partitioning."
+ " All rows may have invalid timestamps or have been filtered out.";
LOG.warn(msg);
return TaskStatus.success(getId(), msg);
}
if (partitionsSpec.getNumShards() == null) {
int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null
? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
: partitionsSpec.getMaxRowsPerSegment();
LOG.info("effective maxRowsPerSegment is: " + effectiveMaxRowsPerSegment);
intervalToNumShards = determineNumShardsFromCardinalityReport(
cardinalityRunner.getReports().values(),
effectiveMaxRowsPerSegment
);
// This is for potential debugging in case we suspect bad estimation of cardinalities etc,
LOG.debug("intervalToNumShards: %s", intervalToNumShards);
} else {
intervalToNumShards = CollectionUtils.mapValues(
mergeCardinalityReports(cardinalityRunner.getReports().values()),
k -> partitionsSpec.getNumShards()
);
}
ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
ingestionSchemaToUse,
intervalToNumShards.keySet()
);
} else {
// numShards will be determined in PartialHashSegmentGenerateTask
intervalToNumShards = null;
}
// 1. Partial segment generation phase
final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport> indexingRunner =
createRunner(
toolbox,
f -> createPartialHashSegmentGenerateRunner(toolbox, segmentCreateIngestionSpec, intervalToNumShards)
);
state = runNextPhase(indexingRunner);
if (state.isFailure()) {
String errMsg = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
indexingRunner.getName()
);
return TaskStatus.failure(getId(), errMsg);
}
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
// 2. Partial segment merge phase
// partition (interval, partitionId) -> partition locations
Map<Partition, List<PartitionLocation>> partitionToLocations =
getPartitionToLocations(indexingRunner.getReports());
final List<PartialSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
partitionToLocations
);
final ParallelIndexIngestionSpec segmentMergeIngestionSpec = ingestionSchemaToUse;
final ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec)
);
state = runNextPhase(mergeRunner);
TaskStatus taskStatus;
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, mergeRunner.getReports());
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(mergeRunner.getReports());
}
taskStatus = TaskStatus.success(getId());
} else {
// there is only success or failure after running....
Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state);
String errMsg = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
mergeRunner.getName()
);
taskStatus = TaskStatus.failure(getId(), errMsg);
}
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}
@VisibleForTesting
TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
PartialDimensionDistributionParallelIndexTaskRunner distributionRunner =
(PartialDimensionDistributionParallelIndexTaskRunner)
createRunner(
toolbox,
this::createPartialDimensionDistributionRunner
);
TaskState distributionState = runNextPhase(distributionRunner);
if (distributionState.isFailure()) {
String errMsg = StringUtils.format(TASK_PHASE_FAILURE_MSG, distributionRunner.getName());
return TaskStatus.failure(getId(), errMsg);
}
// Get the partition boundaries for each interval
final Map<Interval, PartitionBoundaries> intervalToPartitions;
try {
intervalToPartitions = distributionRunner.getIntervalToPartitionBoundaries(
(DimensionRangePartitionsSpec) ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec()
);
if (intervalToPartitions.isEmpty()) {
String msg = "No valid rows for range partitioning."
+ " All rows may have invalid timestamps or multiple dimension values.";
LOG.warn(msg);
return TaskStatus.success(getId(), msg);
}
}
catch (Exception e) {
String errorMsg = "Error creating partition boundaries.";
if (distributionRunner.getStopReason() != null) {
errorMsg += " " + distributionRunner.getStopReason();
}
LOG.error(e, errorMsg);
return TaskStatus.failure(getId(), errorMsg);
}
ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
ingestionSchemaToUse,
intervalToPartitions.keySet()
);
final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport> indexingRunner =
createRunner(
toolbox,
tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions, segmentCreateIngestionSpec)
);
TaskState indexingState = runNextPhase(indexingRunner);
if (indexingState.isFailure()) {
String errMsg = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
indexingRunner.getName()
);
return TaskStatus.failure(getId(), errMsg);
}
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
// partition (interval, partitionId) -> partition locations
Map<Partition, List<PartitionLocation>> partitionToLocations =
getPartitionToLocations(indexingRunner.getReports());
final List<PartialSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
partitionToLocations
);
final ParallelIndexIngestionSpec segmentMergeIngestionSpec = ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec)
);
TaskState mergeState = runNextPhase(mergeRunner);
TaskStatus taskStatus;
if (mergeState.isSuccess()) {
publishSegments(toolbox, mergeRunner.getReports());
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(mergeRunner.getReports());
}
taskStatus = TaskStatus.success(getId());
} else {
// there is only success or failure after running....
Preconditions.checkState(mergeState.isFailure(), "Unrecognized state after task is complete[%s]", mergeState);
String errMsg = StringUtils.format(
TASK_PHASE_FAILURE_MSG,
mergeRunner.getName()
);
taskStatus = TaskStatus.failure(getId(), errMsg);
}
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}
private static Map<Interval, Union> mergeCardinalityReports(Collection<DimensionCardinalityReport> reports)
{
Map<Interval, Union> finalCollectors = new HashMap<>();
reports.forEach(report -> {
Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
finalCollectors.computeIfAbsent(
entry.getKey(),
k -> new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K)
).update(entryHll);
}
});
return finalCollectors;
}
@VisibleForTesting
public static Map<Interval, Integer> determineNumShardsFromCardinalityReport(
Collection<DimensionCardinalityReport> reports,
int maxRowsPerSegment
)
{
// aggregate all the sub-reports
Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
return computeIntervalToNumShards(maxRowsPerSegment, finalCollectors);
}
@Nonnull
@VisibleForTesting
static Map<Interval, Integer> computeIntervalToNumShards(
int maxRowsPerSegment,
Map<Interval, Union> finalCollectors
)
{
return CollectionUtils.mapValues(
finalCollectors,
union -> {
final double estimatedCardinality = union.getEstimate();
final long estimatedNumShards;
if (estimatedCardinality <= 0) {
estimatedNumShards = DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE;
LOG.warn("Estimated cardinality for union of estimates is zero or less: %.2f, setting num shards to %d",
estimatedCardinality, estimatedNumShards
);
} else {
// determine numShards based on maxRowsPerSegment and the cardinality
estimatedNumShards = Math.round(estimatedCardinality / maxRowsPerSegment);
}
LOG.info("estimatedNumShards %d given estimated cardinality %.2f and maxRowsPerSegment %d",
estimatedNumShards, estimatedCardinality, maxRowsPerSegment
);
// We have seen this before in the wild in situations where more shards should have been created,
// log it if it happens with some information & context
if (estimatedNumShards == 1) {
LOG.info("estimatedNumShards is ONE (%d) given estimated cardinality %.2f and maxRowsPerSegment %d",
estimatedNumShards, estimatedCardinality, maxRowsPerSegment
);
}
try {
return Math.max(Math.toIntExact(estimatedNumShards), 1);
}
catch (ArithmeticException ae) {
throw new ISE("Estimated numShards [%s] exceeds integer bounds.", estimatedNumShards);
}
}
);
}
/**
* Creates a map from partition (interval + bucketId) to the corresponding
* PartitionLocations. Note that the bucketId maybe different from the final
* partitionId (refer to {@link BuildingShardSpec} for more details).
*/
static Map<Partition, List<PartitionLocation>> getPartitionToLocations(
Map<String, GeneratedPartitionsReport> subTaskIdToReport
)
{
// Create a map from partition to list of reports (PartitionStat and subTaskId)
final Map<Partition, List<PartitionReport>> partitionToReports = new TreeMap<>(
// Sort by (interval, bucketId) to maintain order of partitionIds within interval
Comparator
.comparingLong((Partition partition) -> partition.getInterval().getStartMillis())
.thenComparingLong(partition -> partition.getInterval().getEndMillis())
.thenComparingInt(Partition::getBucketId)
);
subTaskIdToReport.forEach(
(subTaskId, report) -> report.getPartitionStats().forEach(
partitionStat -> partitionToReports
.computeIfAbsent(Partition.fromStat(partitionStat), p -> new ArrayList<>())
.add(new PartitionReport(subTaskId, partitionStat))
)
);
final Map<Partition, List<PartitionLocation>> partitionToLocations = new HashMap<>();
Interval prevInterval = null;
final AtomicInteger partitionId = new AtomicInteger(0);
for (Entry<Partition, List<PartitionReport>> entry : partitionToReports.entrySet()) {
final Partition partition = entry.getKey();
// Reset the partitionId if this is a new interval
Interval interval = partition.getInterval();
if (!interval.equals(prevInterval)) {
partitionId.set(0);
prevInterval = interval;
}
// Use any PartitionStat of this partition to create a shard spec
final List<PartitionReport> reportsOfPartition = entry.getValue();
final BuildingShardSpec<?> shardSpec = reportsOfPartition
.get(0).getPartitionStat().getSecondaryPartition()
.convert(partitionId.getAndIncrement());
// Create a PartitionLocation for each PartitionStat
List<PartitionLocation> locationsOfPartition = reportsOfPartition
.stream()
.map(report -> report.getPartitionStat().toPartitionLocation(report.getSubTaskId(), shardSpec))
.collect(Collectors.toList());
partitionToLocations.put(partition, locationsOfPartition);
}
return partitionToLocations;
}
private static List<PartialSegmentMergeIOConfig> createGenericMergeIOConfigs(
int totalNumMergeTasks,
Map<Partition, List<PartitionLocation>> partitionToLocations
)
{
return createMergeIOConfigs(
totalNumMergeTasks,
partitionToLocations,
PartialSegmentMergeIOConfig::new
);
}
@VisibleForTesting
static <M extends PartialSegmentMergeIOConfig, L extends PartitionLocation> List<M> createMergeIOConfigs(
int totalNumMergeTasks,
Map<Partition, List<L>> partitionToLocations,
Function<List<L>, M> createPartialSegmentMergeIOConfig
)
{
final int numMergeTasks = Math.min(totalNumMergeTasks, partitionToLocations.size());
LOG.info(
"Number of merge tasks is set to [%d] based on totalNumMergeTasks[%d] and number of partitions[%d]",
numMergeTasks,
totalNumMergeTasks,
partitionToLocations.size()
);
// Randomly shuffle partitionIds to evenly distribute partitions of potentially different sizes
// This will be improved once we collect partition stats properly.
// See PartitionStat in GeneratedPartitionsReport.
final List<Partition> partitions = new ArrayList<>(partitionToLocations.keySet());
Collections.shuffle(partitions, ThreadLocalRandom.current());
final List<M> assignedPartitionLocations = new ArrayList<>(numMergeTasks);
for (int i = 0; i < numMergeTasks; i++) {
Pair<Integer, Integer> partitionBoundaries = getPartitionBoundaries(i, partitions.size(), numMergeTasks);
final List<L> assignedToSameTask = partitions
.subList(partitionBoundaries.lhs, partitionBoundaries.rhs)
.stream()
.flatMap(partition -> partitionToLocations.get(partition).stream())
.collect(Collectors.toList());
assignedPartitionLocations.add(createPartialSegmentMergeIOConfig.apply(assignedToSameTask));
}
return assignedPartitionLocations;
}
/**
* Partition items into as evenly-sized splits as possible.
*
* @param index index of partition
* @param total number of items to partition
* @param splits number of desired partitions
* @return partition range: [lhs, rhs)
*/
private static Pair<Integer, Integer> getPartitionBoundaries(int index, int total, int splits)
{
int chunk = total / splits;
int remainder = total % splits;
// Distribute the remainder across the first few partitions. For example total=8 and splits=5, will give partitions
// of sizes (starting from i=0): 2, 2, 2, 1, 1
int start = index * chunk + (index < remainder ? index : remainder);
int stop = start + chunk + (index < remainder ? 1 : 0);
return Pair.of(start, stop);
}
private void publishSegments(
TaskToolbox toolbox,
Map<String, PushedSegmentsReport> reportsMap
)
throws IOException
{
final Set<DataSegment> oldSegments = new HashSet<>();
final Set<DataSegment> newSegments = new HashSet<>();
reportsMap
.values()
.forEach(report -> {
oldSegments.addAll(report.getOldSegments());
newSegments.addAll(report.getNewSegments());
});
final boolean storeCompactionState = getContextValue(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema
);
Set<DataSegment> tombStones = Collections.emptySet();
if (getIngestionMode() == IngestionMode.REPLACE) {
TombstoneHelper tombstoneHelper = new TombstoneHelper(toolbox.getTaskActionClient());
List<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervals(newSegments, ingestionSchema.getDataSchema());
if (!tombstoneIntervals.isEmpty()) {
Map<Interval, SegmentIdWithShardSpec> tombstonesAnShards = new HashMap<>();
for (Interval interval : tombstoneIntervals) {
SegmentIdWithShardSpec segmentIdWithShardSpec = allocateNewSegmentForTombstone(
ingestionSchema,
interval.getStart()
);
tombstonesAnShards.put(interval, segmentIdWithShardSpec);
}
tombStones = tombstoneHelper.computeTombstones(ingestionSchema.getDataSchema(), tombstonesAnShards);
// add tombstones
newSegments.addAll(tombStones);
LOG.debugSegments(tombStones, "To publish tombstones");
}
}
final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType)
);
final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess();
if (published) {
LOG.info("Published [%d] segments", newSegments.size());
// segment metrics:
emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", tombStones.size());
emitMetric(toolbox.getEmitter(), "ingest/segments/count", newSegments.size());
} else {
throw new ISE("Failed to publish segments");
}
segmentsPublished = (long) newSegments.size();
}
private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
{
IndexTask sequentialIndexTask = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
baseSubtaskSpecName,
new IndexIngestionSpec(
getIngestionSchema().getDataSchema(),
getIngestionSchema().getIOConfig(),
convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
),
getContext(),
getIngestionSchema().getTuningConfig().getMaxAllowedLockCount(),
// Don't run cleanup in the IndexTask since we are wrapping it in the ParallelIndexSupervisorTask
false
);
if (currentSubTaskHolder.setTask(sequentialIndexTask)
&& sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
TaskStatus status = sequentialIndexTask.run(toolbox);
completionReports = sequentialIndexTask.getCompletionReports();
writeCompletionReports();
return status;
} else {
String msg = "Task was asked to stop. Finish as failed";
LOG.info(msg);
return TaskStatus.failure(getId(), msg);
}
}
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
{
return buildIngestionStatsReport(
IngestionState.COMPLETED,
taskStatus.getErrorMsg(),
segmentsRead,
segmentsPublished
);
}
@Override
protected Map<String, Object> getTaskCompletionRowStats()
{
return doGetRowStatsAndUnparseableEvents(true, false).lhs;
}
@Override
protected Map<String, Object> getTaskCompletionUnparseableEvents()
{
return doGetRowStatsAndUnparseableEvents(true, true).rhs;
}
private void updateAndWriteCompletionReports(TaskStatus status)
{
completionReports = getTaskCompletionReports(status);
writeCompletionReports();
}
private void writeCompletionReports()
{
if (!isCompactionTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
}
private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig)
{
return new IndexTuningConfig(
null,
null,
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isSkipBytesInMemoryOverheadCheck(),
null,
null,
null,
null,
tuningConfig.getPartitionsSpec(),
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getMaxPendingPersists(),
tuningConfig.isForceGuaranteedRollup(),
tuningConfig.isReportParseExceptions(),
null,
tuningConfig.getPushTimeout(),
tuningConfig.getSegmentWriteOutMediumFactory(),
tuningConfig.isLogParseExceptions(),
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions(),
tuningConfig.getMaxColumnsToMerge(),
tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(),
tuningConfig.getNumPersistThreads()
);
}
// Internal APIs
/**
* Allocate a new {@link SegmentIdWithShardSpec} for a request from {@link SinglePhaseSubTask}.
* The returned segmentIdentifiers have different {@code partitionNum} (thereby different {@link NumberedShardSpec})
* per bucket interval.
*/
@POST
@Path("/segment/allocate")
@Produces(SmileMediaTypes.APPLICATION_JACKSON_SMILE)
@Consumes(SmileMediaTypes.APPLICATION_JACKSON_SMILE)
public Response allocateSegment(
Object param,
@Context final HttpServletRequest req
)
{
ChatHandlers.authorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
if (toolbox == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
}
ParallelIndexTaskRunner runner = Preconditions.checkNotNull(getCurrentRunner(), "runner");
if (!(runner instanceof SinglePhaseParallelIndexTaskRunner)) {
throw new ISE(
"Expected [%s], but [%s] is in use",
SinglePhaseParallelIndexTaskRunner.class.getName(),
runner.getClass().getName()
);
}
// This context is set in the constructor of ParallelIndexSupervisorTask if it's not set by others.
final boolean useLineageBasedSegmentAllocation = Preconditions.checkNotNull(
getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY),
"useLineageBasedSegmentAllocation in taskContext"
);
try {
final SegmentIdWithShardSpec segmentIdentifier;
if (useLineageBasedSegmentAllocation) {
SegmentAllocationRequest request = toolbox.getJsonMapper().convertValue(param, SegmentAllocationRequest.class);
segmentIdentifier = ((SinglePhaseParallelIndexTaskRunner) runner)
.allocateNewSegment(
getDataSource(),
request.getTimestamp(),
request.getSequenceName(),
request.getPrevSegmentId()
);
} else {
DateTime timestamp = toolbox.getJsonMapper().convertValue(param, DateTime.class);
segmentIdentifier = ((SinglePhaseParallelIndexTaskRunner) runner)
.allocateNewSegment(
getDataSource(),
timestamp
);
}
return Response.ok(toolbox.getJsonMapper().writeValueAsBytes(segmentIdentifier)).build();
}
catch (MaxAllowedLocksExceededException malee) {
getCurrentRunner().stopGracefully(malee.getMessage());
return Response.status(Response.Status.BAD_REQUEST).entity(malee.getMessage()).build();
}
catch (IOException | IllegalStateException e) {
return Response.serverError().entity(Throwables.getStackTraceAsString(e)).build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST).entity(Throwables.getStackTraceAsString(e)).build();
}
}
static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema)
{
return ingestionSchema.getIOConfig().getNonNullInputFormat();
}
/**
* Worker tasks spawned by the supervisor call this API to report the segments they generated and pushed.
*
* @see ParallelIndexSupervisorTaskClient#report(SubTaskReport)
*/
@POST
@Path("/report")
@Consumes(SmileMediaTypes.APPLICATION_JACKSON_SMILE)
public Response report(
SubTaskReport report,
@Context final HttpServletRequest req
)
{
ChatHandlers.authorizationCheck(
req,
Action.WRITE,
getDataSource(),
authorizerMapper
);
if (currentSubTaskHolder == null || currentSubTaskHolder.getTask() == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final ParallelIndexTaskRunner runner = currentSubTaskHolder.getTask();
//noinspection unchecked
runner.collectReport(report);
return Response.ok().build();
}
}
// External APIs to get running status
@GET
@Path("/mode")
@Produces(MediaType.APPLICATION_JSON)
public Response getMode(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
}
@GET
@Path("/phase")
@Produces(MediaType.APPLICATION_JSON)
public Response getPhaseName(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
if (isParallelMode()) {
final ParallelIndexTaskRunner runner = getCurrentRunner();
if (runner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running").build();
} else {
return Response.ok(runner.getName()).build();
}
} else {
return Response.status(Status.BAD_REQUEST).entity("task is running in the sequential mode").build();
}
}
@GET
@Path("/progress")
@Produces(MediaType.APPLICATION_JSON)
public Response getProgress(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getProgress()).build();
}
}
@GET
@Path("/subtasks/running")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasks(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getRunningTaskIds()).build();
}
}
@GET
@Path("/subtaskspecs")
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getSubTaskSpecs()).build();
}
}
@GET
@Path("/subtaskspecs/running")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getRunningSubTaskSpecs()).build();
}
}
@GET
@Path("/subtaskspecs/complete")
@Produces(MediaType.APPLICATION_JSON)
public Response getCompleteSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
return Response.ok(currentRunner.getCompleteSubTaskSpecs()).build();
}
}
@GET
@Path("/subtaskspec/{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpec(@PathParam("id") String id, @Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final SubTaskSpec subTaskSpec = currentRunner.getSubTaskSpec(id);
if (subTaskSpec == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(subTaskSpec).build();
}
}
}
@GET
@Path("/subtaskspec/{id}/state")
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskState(@PathParam("id") String id, @Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final SubTaskSpecStatus subTaskSpecStatus = currentRunner.getSubTaskState(id);
if (subTaskSpecStatus == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(subTaskSpecStatus).build();
}
}
}
@GET
@Path("/subtaskspec/{id}/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getCompleteSubTaskSpecAttemptHistory(
@PathParam("id") String id,
@Context final HttpServletRequest req
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
final TaskHistory taskHistory = currentRunner.getCompleteSubTaskSpecAttemptHistory(id);
if (taskHistory == null) {
return Response.status(Status.NOT_FOUND).build();
} else {
return Response.ok(taskHistory.getAttemptHistory()).build();
}
}
}
private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
{
if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
// This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
return (RowIngestionMetersTotals) buildSegmentsRowStats;
} else if (buildSegmentsRowStats instanceof Map) {
Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
return new RowIngestionMetersTotals(
((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
);
} else {
// should never happen
throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
}
}
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
boolean includeUnparseable
)
{
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
// Get stats from completed tasks
Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Received an empty report from subtask[%s]" + pushedSegmentsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowIngestionMetersTotals = getBuildSegmentsStatsFromTaskReport(
taskReport,
includeUnparseable ? unparseableEvents : null
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}
RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
parallelSinglePhaseRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
ParallelIndexTaskRunner<?, ?> currentRunner,
boolean includeUnparseable
)
{
if (indexGenerateRowStats != null) {
return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
} else if (!currentRunner.getName().equals("partial segment generation")) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
} else {
Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
TaskReport.ReportMap taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Received an empty report from subtask[%s]", generatedPartitionsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowStatsForCompletedTask =
getBuildSegmentsStatsFromTaskReport(taskReport, unparseableEvents);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport)
taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
).getPayload().getSegmentsRead();
if (segmentsReadFromPartition != null) {
totalSegmentsRead += segmentsReadFromPartition;
}
}
RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
currentRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
if (totalSegmentsRead > 0) {
segmentsRead = totalSegmentsRead;
}
return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
}
private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
Set<String> runningTaskIds,
List<ParseExceptionReport> unparseableEvents,
boolean includeUnparseable
)
{
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
for (String runningTaskId : runningTaskIds) {
try {
final TaskReport.ReportMap report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
if (report == null || report.isEmpty()) {
// task does not have a running report yet
continue;
}
final IngestionStatsAndErrorsTaskReport ingestionStatsReport
= (IngestionStatsAndErrorsTaskReport) report.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY);
final IngestionStatsAndErrors payload = ingestionStatsReport.getPayload();
Map<String, Object> rowStats = payload.getRowStats();
Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
if (includeUnparseable) {
Map<String, Object> taskUnparseableEvents = payload.getUnparseableEvents();
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(buildSegmentsUnparseableEvents);
}
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
}
catch (Exception e) {
LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
}
}
return buildSegmentsRowStats.getTotals();
}
private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
RowIngestionMetersTotals rowStats,
List<ParseExceptionReport> unparseableEvents
)
{
Map<String, Object> rowStatsMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
rowStatsMap.put("totals", totalsMap);
return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
}
private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
TaskReport.ReportMap taskReport,
List<ParseExceptionReport> unparseableEvents
)
{
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport)
taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrors reportData = ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
if (unparseableEvents != null) {
unparseableEvents.addAll(
(List<ParseExceptionReport>)
reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS)
);
}
return totals;
}
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
boolean isFullReport,
boolean includeUnparseable
)
{
if (currentSubTaskHolder == null) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}
Object currentRunner = currentSubTaskHolder.getTask();
if (currentRunner == null) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}
if (isParallelMode()) {
if (isGuaranteedRollup(
getIngestionMode(),
ingestionSchema.getTuningConfig()
)) {
return doGetRowStatsAndUnparseableEventsParallelMultiPhase(
(ParallelIndexTaskRunner<?, ?>) currentRunner,
includeUnparseable
);
} else {
return doGetRowStatsAndUnparseableEventsParallelSinglePhase(
(SinglePhaseParallelIndexTaskRunner) currentRunner,
includeUnparseable
);
}
} else {
IndexTask currentSequentialTask = (IndexTask) currentRunner;
return Pair.of(
currentSequentialTask.doGetRowStats(isFullReport),
currentSequentialTask.doGetUnparseableEvents(isFullReport)
);
}
}
@Override
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
{
if (!isCompactionTask) {
super.cleanUp(toolbox, taskStatus);
}
}
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(doGetRowStatsAndUnparseableEvents(full != null, false).lhs).build();
}
@VisibleForTesting
TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents =
doGetRowStatsAndUnparseableEvents(isFullReport, true);
// use the sequential task's ingestion state if we were running that mode
IngestionState ingestionStateForReport;
if (isParallelMode()) {
ingestionStateForReport = ingestionState;
} else {
IndexTask currentSequentialTask = currentSubTaskHolder.getTask();
ingestionStateForReport = currentSequentialTask == null
? ingestionState
: currentSequentialTask.getIngestionState();
}
return buildLiveIngestionStatsReport(
ingestionStateForReport,
rowStatsAndUnparsebleEvents.rhs,
rowStatsAndUnparsebleEvents.lhs
);
}
@GET
@Path("/liveReports")
@Produces(MediaType.APPLICATION_JSON)
public Response getLiveReports(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(doGetLiveReports(full != null)).build();
}
/**
* Like {@link OverlordClient#taskReportAsMap}, but synchronous, and returns null instead of throwing an error if
* the server returns 404.
*/
@Nullable
@VisibleForTesting
static TaskReport.ReportMap getTaskReport(final OverlordClient overlordClient, final String taskId)
throws InterruptedException, ExecutionException
{
try {
return FutureUtils.get(overlordClient.taskReportAsMap(taskId), true);
}
catch (ExecutionException e) {
if (e.getCause() instanceof HttpResponseException &&
((HttpResponseException) e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
return null;
} else {
throw e;
}
}
}
@VisibleForTesting
public void setToolbox(TaskToolbox toolbox)
{
this.toolbox = toolbox;
}
/**
* Represents a partition uniquely identified by an Interval and a bucketId.
*
* @see org.apache.druid.timeline.partition.BucketNumberedShardSpec
*/
static class Partition
{
final Interval interval;
final int bucketId;
private static Partition fromStat(PartitionStat partitionStat)
{
return new Partition(partitionStat.getInterval(), partitionStat.getBucketId());
}
Partition(Interval interval, int bucketId)
{
this.interval = interval;
this.bucketId = bucketId;
}
public int getBucketId()
{
return bucketId;
}
public Interval getInterval()
{
return interval;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Partition that = (Partition) o;
return getBucketId() == that.getBucketId()
&& Objects.equals(getInterval(), that.getInterval());
}
@Override
public int hashCode()
{
return Objects.hash(getInterval(), getBucketId());
}
@Override
public String toString()
{
return "Partition{" +
"interval=" + interval +
", bucketId=" + bucketId +
'}';
}
}
/**
* Encapsulates a {@link PartitionStat} and the subTaskId that generated it.
*/
private static class PartitionReport
{
private final PartitionStat partitionStat;
private final String subTaskId;
PartitionReport(String subTaskId, PartitionStat partitionStat)
{
this.subTaskId = subTaskId;
this.partitionStat = partitionStat;
}
String getSubTaskId()
{
return subTaskId;
}
PartitionStat getPartitionStat()
{
return partitionStat;
}
}
}