blob: ce6aee98af3548f26bd0ea0b455d6933dfffc819 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation of {@link ParallelIndexTaskRunner} to support best-effort roll-up. This runner can submit and
* monitor multiple {@link SinglePhaseSubTask}s.
* <p>
* As its name indicates, distributed indexing is done in a single phase, i.e., without shuffling intermediate data. As
* a result, this task can't be used for perfect rollup.
*/
public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner<SinglePhaseSubTask, PushedSegmentsReport>
{
/**
* A flag to determine what protocol to use for segment allocation. The Overlod sets this context explicitly
* for all tasks to use the lineage-based protocol in 0.22 or later.
*/
public static final String CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY = "useLineageBasedSegmentAllocation";
/**
* A legacy default for {@link #CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY} when the Overlord is running on
* 0.21 or earlier.
*
* The new lineage-based segment allocation protocol must be used as the legacy protocol has a critical bug.
* However, we tell subtasks to use the legacy protocol by default unless it is explicitly set in the taskContext.
* This is to guarantee that every subtask uses the same protocol during the replacing rolling upgrade so that
* batch tasks that are already running can continue. Once the upgrade is done, the Overlod will set this context
* explicitly for all tasks to use the new protocol.
*
* @see SinglePhaseParallelIndexTaskRunner#allocateNewSegment(String, DateTime)
* @see org.apache.druid.indexing.overlord.TaskQueue#add(Task)
* @see #DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
*/
@Deprecated
static final boolean LEGACY_DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION = false;
/**
* A new default for {@link #CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY} when the Overlord is running on 0.22
* or later. The new lineage-based segment allocation protocol must be used to ensure data correctness.
*
* @see SinglePhaseParallelIndexTaskRunner#allocateNewSegment(String, DateTime, String, String)
*/
public static final boolean DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION = true;
private static final String PHASE_NAME = "segment generation";
// interval -> next partitionId
private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();
// sequenceName -> list of segmentIds
private final ConcurrentHashMap<String, List<String>> sequenceToSegmentIds = new ConcurrentHashMap<>();
private final ParallelIndexIngestionSpec ingestionSchema;
private final SplittableInputSource<?> baseInputSource;
private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
SinglePhaseParallelIndexTaskRunner(
TaskToolbox toolbox,
String taskId,
String groupId,
String baseSubtaskSpecName,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
super(
toolbox,
taskId,
groupId,
baseSubtaskSpecName,
ingestionSchema.getTuningConfig(),
context
);
this.ingestionSchema = ingestionSchema;
this.baseInputSource = (SplittableInputSource) ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}
@VisibleForTesting
SinglePhaseParallelIndexTaskRunner(
TaskToolbox toolbox,
String taskId,
String groupId,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this(toolbox, taskId, groupId, taskId, ingestionSchema, context, centralizedDatasourceSchemaConfig);
}
@Override
public String getName()
{
return PHASE_NAME;
}
@VisibleForTesting
ParallelIndexIngestionSpec getIngestionSchema()
{
return ingestionSchema;
}
@VisibleForTesting
@Override
Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator() throws IOException
{
return baseInputSource.createSplits(
ingestionSchema.getIOConfig().getInputFormat(),
getTuningConfig().getSplitHintSpec()
).map(this::newTaskSpec).iterator();
}
@Override
int estimateTotalNumSubTasks() throws IOException
{
return baseInputSource.estimateNumSplits(
ingestionSchema.getIOConfig().getInputFormat(),
getTuningConfig().getSplitHintSpec()
);
}
@VisibleForTesting
SubTaskSpec<SinglePhaseSubTask> newTaskSpec(InputSplit split)
{
final FirehoseFactory firehoseFactory;
final InputSource inputSource;
firehoseFactory = null;
inputSource = baseInputSource.withSplit(split);
final Map<String, Object> subtaskContext = new HashMap<>(getContext());
return new SinglePhaseSubTaskSpec(
getBaseSubtaskSpecName() + "_" + getAndIncrementNextSpecId(),
getGroupId(),
getTaskId(),
new ParallelIndexIngestionSpec(
ingestionSchema.getDataSchema(),
new ParallelIndexIOConfig(
firehoseFactory,
inputSource,
ingestionSchema.getIOConfig().getInputFormat(),
ingestionSchema.getIOConfig().isAppendToExisting(),
ingestionSchema.getIOConfig().isDropExisting()
),
ingestionSchema.getTuningConfig()
),
subtaskContext,
split
);
}
/**
* This method has a bug that transient task failures or network errors can create
* non-contiguous partitionIds in time chunks. When this happens, the segments this method creates
* will never become queryable (see {@link org.apache.druid.timeline.partition.PartitionHolder#isComplete()}.
* As a result, this method is deprecated in favor of {@link #allocateNewSegment(String, DateTime, String, String)}.
* However, we keep this method to support rolling upgrade without downtime of batch ingestion
* where you can have mixed versions of middleManagers/indexers.
*/
@Deprecated
public SegmentIdWithShardSpec allocateNewSegment(String dataSource, DateTime timestamp) throws IOException
{
NonnullPair<Interval, String> intervalAndVersion = findIntervalAndVersion(timestamp);
final int partitionNum = Counters.getAndIncrementInt(partitionNumCountersPerInterval, intervalAndVersion.lhs);
return new SegmentIdWithShardSpec(
dataSource,
intervalAndVersion.lhs,
intervalAndVersion.rhs,
new BuildingNumberedShardSpec(partitionNum)
);
}
/**
* Allocate a new segment for the given timestamp locally. This method is called when dynamic partitioning is used
* and {@link LockGranularity} is {@code TIME_CHUNK}.
*
* The allocation algorithm is similar to the Overlord-based segment allocation. It keeps the segment allocation
* history per sequenceName. If the prevSegmentId is found in the segment allocation history, this method
* returns the next segmentId right after the prevSegmentId in the history. Since the sequenceName is unique
* per {@link SubTaskSpec} (it is the ID of subtaskSpec), this algorithm guarantees that the same set of segmentIds
* are created in the same order for the same subtaskSpec.
*
* @see org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator#allocatePendingSegmentWithSegmentLineageCheck
*/
public SegmentIdWithShardSpec allocateNewSegment(
String dataSource,
DateTime timestamp,
String sequenceName,
@Nullable String prevSegmentId
) throws IOException
{
NonnullPair<Interval, String> intervalAndVersion = findIntervalAndVersion(timestamp);
MutableObject<SegmentIdWithShardSpec> segmentIdHolder = new MutableObject<>();
sequenceToSegmentIds.compute(sequenceName, (k, v) -> {
final int prevSegmentIdIndex;
final List<String> segmentIds;
if (prevSegmentId == null) {
prevSegmentIdIndex = -1;
segmentIds = v == null ? new ArrayList<>() : v;
} else {
segmentIds = v;
if (segmentIds == null) {
throw new ISE("Can't find previous segmentIds for sequence[%s]", sequenceName);
}
prevSegmentIdIndex = segmentIds.indexOf(prevSegmentId);
if (prevSegmentIdIndex == -1) {
throw new ISE("Can't find previously allocated segmentId[%s] for sequence[%s]", prevSegmentId, sequenceName);
}
}
final int nextSegmentIdIndex = prevSegmentIdIndex + 1;
final SegmentIdWithShardSpec newSegmentId;
if (nextSegmentIdIndex < segmentIds.size()) {
SegmentId segmentId = SegmentId.tryParse(dataSource, segmentIds.get(nextSegmentIdIndex));
if (segmentId == null) {
throw new ISE("Illegal segmentId format [%s]", segmentIds.get(nextSegmentIdIndex));
}
newSegmentId = new SegmentIdWithShardSpec(
segmentId.getDataSource(),
segmentId.getInterval(),
segmentId.getVersion(),
new BuildingNumberedShardSpec(segmentId.getPartitionNum())
);
} else {
final int partitionNum = Counters.getAndIncrementInt(partitionNumCountersPerInterval, intervalAndVersion.lhs);
newSegmentId = new SegmentIdWithShardSpec(
dataSource,
intervalAndVersion.lhs,
intervalAndVersion.rhs,
new BuildingNumberedShardSpec(partitionNum)
);
segmentIds.add(newSegmentId.toString());
}
segmentIdHolder.setValue(newSegmentId);
return segmentIds;
});
return segmentIdHolder.getValue();
}
NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp) throws IOException
{
TaskLockType taskLockType = TaskLocks.determineLockTypeForAppend(getContext());
return AbstractBatchIndexTask.findIntervalAndVersion(getToolbox(), ingestionSchema, timestamp, taskLockType);
}
@Override
public Runnable getSubtaskCompletionCallback(SubTaskCompleteEvent<?> event)
{
return () -> {
if (event.getLastState().isSuccess()) {
sequenceToSegmentIds.remove(event.getSpec().getId());
}
};
}
}