blob: 62a9f26ba49b355f880f6108b8eba2984ce43716 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* The client representation of this task is {@link ClientCompactionTaskQuery}. JSON
* serialization fields of this class must correspond to those of {@link
* ClientCompactionTaskQuery}.
*/
public class CompactionTask extends AbstractBatchIndexTask
{
private static final Logger log = new Logger(CompactionTask.class);
/**
* The CompactionTask creates and runs multiple IndexTask instances. When the {@link AppenderatorsManager}
* is asked to clean up, it does so on a per-task basis keyed by task ID. However, the subtask IDs of the
* CompactionTask are not externally visible. This context flag is used to ensure that all the appenderators
* created for the CompactionTasks's subtasks are tracked under the ID of the parent CompactionTask.
* The CompactionTask may change in the future and no longer require this behavior (e.g., reusing the same
* Appenderator across subtasks, or allowing the subtasks to use the same ID). The CompactionTask is also the only
* task type that currently creates multiple appenderators. Thus, a context flag is used to handle this case
* instead of a more general approach such as new methods on the Task interface.
*/
public static final String CTX_KEY_APPENDERATOR_TRACKING_TASK_ID = "appenderatorTrackingTaskId";
private static final String TYPE = "compact";
private static final boolean STORE_COMPACTION_STATE = true;
static {
Verify.verify(TYPE.equals(CompactSegments.COMPACTION_TASK_TYPE));
}
private final CompactionIOConfig ioConfig;
@Nullable
private final DimensionsSpec dimensionsSpec;
@Nullable
private final AggregatorFactory[] metricsSpec;
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
@JsonIgnore
private final SegmentProvider segmentProvider;
@JsonIgnore
private final PartitionConfigurationManager partitionConfigurationManager;
@JsonIgnore
private final SegmentLoaderFactory segmentLoaderFactory;
@JsonIgnore
private final RetryPolicyFactory retryPolicyFactory;
@JsonIgnore
private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder(
(taskObject, config) -> {
final ParallelIndexSupervisorTask indexTask = (ParallelIndexSupervisorTask) taskObject;
indexTask.stopGracefully(config);
}
);
@JsonCreator
public CompactionTask(
@JsonProperty("id") @Nullable final String id,
@JsonProperty("resource") @Nullable final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") @Deprecated @Nullable final Interval interval,
@JsonProperty("segments") @Deprecated @Nullable final List<DataSegment> segments,
@JsonProperty("ioConfig") @Nullable CompactionIOConfig ioConfig,
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@JacksonInject RetryPolicyFactory retryPolicyFactory
)
{
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("ioConfig", ioConfig),
new Property<>("interval", interval),
new Property<>("segments", segments)
)
);
if (ioConfig != null) {
this.ioConfig = ioConfig;
} else if (interval != null) {
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null));
} else {
// We already checked segments is not null or empty above.
//noinspection ConstantConditions
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments));
}
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
this.segmentLoaderFactory = segmentLoaderFactory;
this.retryPolicyFactory = retryPolicyFactory;
}
@VisibleForTesting
static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig)
{
if (tuningConfig instanceof ParallelIndexTuningConfig) {
return (ParallelIndexTuningConfig) tuningConfig;
} else if (tuningConfig instanceof IndexTuningConfig) {
final IndexTuningConfig indexTuningConfig = (IndexTuningConfig) tuningConfig;
return new ParallelIndexTuningConfig(
null,
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxBytesInMemory(),
indexTuningConfig.getMaxTotalRows(),
indexTuningConfig.getNumShards(),
null,
indexTuningConfig.getPartitionsSpec(),
indexTuningConfig.getIndexSpec(),
indexTuningConfig.getIndexSpecForIntermediatePersists(),
indexTuningConfig.getMaxPendingPersists(),
indexTuningConfig.isForceGuaranteedRollup(),
indexTuningConfig.isReportParseExceptions(),
indexTuningConfig.getPushTimeout(),
indexTuningConfig.getSegmentWriteOutMediumFactory(),
null,
null,
null,
null,
null,
null,
null,
null,
indexTuningConfig.isLogParseExceptions(),
indexTuningConfig.getMaxParseExceptions(),
indexTuningConfig.getMaxSavedParseExceptions()
);
} else {
throw new ISE(
"Unknown tuningConfig type: [%s], Must be either [%s] or [%s]",
tuningConfig.getClass().getName(),
ParallelIndexTuningConfig.class.getName(),
IndexTuningConfig.class.getName()
);
}
}
@VisibleForTesting
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
return currentSubTaskHolder;
}
@JsonProperty
public CompactionIOConfig getIoConfig()
{
return ioConfig;
}
@JsonProperty
@Nullable
public DimensionsSpec getDimensionsSpec()
{
return dimensionsSpec;
}
@JsonProperty
@Nullable
public AggregatorFactory[] getMetricsSpec()
{
return metricsSpec;
}
@JsonProperty
@Nullable
@Override
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
@Nullable
@JsonProperty
public ParallelIndexTuningConfig getTuningConfig()
{
return tuningConfig;
}
@Override
public String getType()
{
return TYPE;
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final List<DataSegment> segments = segmentProvider.findSegments(taskActionClient);
return determineLockGranularityandTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
}
@Override
public boolean requireLockExistingSegments()
{
return true;
}
@Override
public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
throws IOException
{
return ImmutableList.copyOf(
taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), null, intervals, Segments.ONLY_VISIBLE))
);
}
@Override
public boolean isPerfectRollup()
{
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = createIngestionSchema(
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
segmentProvider,
partitionConfigurationManager,
dimensionsSpec,
metricsSpec,
segmentGranularity,
toolbox.getCoordinatorClient(),
segmentLoaderFactory,
retryPolicyFactory
);
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
.mapToObj(i -> {
// taskId is used for different purposes in parallel indexing and local indexing.
// In parallel indexing, it's the taskId of the supervisor task. This supervisor taskId must be
// a valid taskId to communicate with sub tasks properly. We use the ID of the compaction task in this case.
//
// In local indexing, it's used as the sequence name for Appenderator. Even though a compaction task can run
// multiple index tasks (one per interval), the appenderator is not shared by those tasks. Each task creates
// a new Appenderator on its own instead. As a result, they should use different sequence names to allocate
// new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details.
// In this case, we use different fake IDs for each created index task.
final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1
? createIndexTaskSpecId(i)
: getId();
return newTask(subtaskId, ingestionSpecs.get(i));
})
.collect(Collectors.toList());
if (indexTaskSpecs.isEmpty()) {
log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec());
return TaskStatus.failure(getId());
} else {
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
final int totalNumSpecs = indexTaskSpecs.size();
log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0;
for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
if (!currentSubTaskHolder.setTask(eachSpec)) {
log.info("Task is asked to stop. Finish as failed.");
return TaskStatus.failure(getId());
}
try {
if (eachSpec.isReady(toolbox.getTaskActionClient())) {
log.info("Running indexSpec: " + json);
final TaskStatus eachResult = eachSpec.run(toolbox);
if (!eachResult.isSuccess()) {
failCnt++;
log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}
} else {
failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json);
}
}
catch (Exception e) {
failCnt++;
log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}
}
log.info("Run [%d] specs, [%d] succeeded, [%d] failed", totalNumSpecs, totalNumSpecs - failCnt, failCnt);
return failCnt == 0 ? TaskStatus.success(getId()) : TaskStatus.failure(getId());
}
}
@VisibleForTesting
ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec)
{
return new ParallelIndexSupervisorTask(
taskId,
getGroupId(),
getTaskResource(),
ingestionSpec,
createContextForSubtask()
);
}
@VisibleForTesting
Map<String, Object> createContextForSubtask()
{
final Map<String, Object> newContext = new HashMap<>(getContext());
newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId());
newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, STORE_COMPACTION_STATE);
// Set the priority of the compaction task.
newContext.put(Tasks.PRIORITY_KEY, getPriority());
return newContext;
}
private String createIndexTaskSpecId(int i)
{
return StringUtils.format("%s_%d", getId(), i);
}
/**
* Generate {@link ParallelIndexIngestionSpec} from input segments.
*
* @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@VisibleForTesting
static List<ParallelIndexIngestionSpec> createIngestionSchema(
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final SegmentProvider segmentProvider,
final PartitionConfigurationManager partitionConfigurationManager,
@Nullable final DimensionsSpec dimensionsSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final Granularity segmentGranularity,
final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory,
final RetryPolicyFactory retryPolicyFactory
) throws IOException, SegmentLoadingException
{
NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
toolbox,
segmentProvider,
lockGranularityInUse
);
final Map<DataSegment, File> segmentFileMap = pair.lhs;
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
if (timelineSegments.size() == 0) {
return Collections.emptyList();
}
// find metadata for interval
// queryableIndexAndSegments is sorted by the interval of the dataSegment
final List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
toolbox.getIndexIO()
);
final ParallelIndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
if (segmentGranularity == null) {
// original granularity
final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
queryableIndexAndSegments.forEach(
p -> intervalToSegments.computeIfAbsent(p.rhs.getInterval(), k -> new ArrayList<>())
.add(p)
);
// unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
List<NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
new ArrayList<>();
Interval union = null;
List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Entry<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
Interval cur = entry.getKey();
if (union == null) {
union = cur;
segments.addAll(entry.getValue());
} else if (union.overlaps(cur)) {
union = Intervals.utc(union.getStartMillis(), Math.max(union.getEndMillis(), cur.getEndMillis()));
segments.addAll(entry.getValue());
} else {
intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
union = cur;
segments = new ArrayList<>(entry.getValue());
}
}
intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegmentsUnified.size());
for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentsToCompact,
dimensionsSpec,
metricsSpec,
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity()
);
specs.add(
new ParallelIndexIngestionSpec(
dataSchema,
createIoConfig(
toolbox,
dataSchema,
interval,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
compactionTuningConfig
)
);
}
return specs;
} else {
// given segment granularity
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
queryableIndexAndSegments,
dimensionsSpec,
metricsSpec,
segmentGranularity
);
return Collections.singletonList(
new ParallelIndexIngestionSpec(
dataSchema,
createIoConfig(
toolbox,
dataSchema,
segmentProvider.interval,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
compactionTuningConfig
)
);
}
}
private static ParallelIndexIOConfig createIoConfig(
TaskToolbox toolbox,
DataSchema dataSchema,
Interval interval,
CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory
)
{
return new ParallelIndexIOConfig(
null,
new DruidInputSource(
dataSchema.getDataSource(),
interval,
null,
null,
dataSchema.getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
toolbox.getIndexIO(),
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
null,
false
);
}
private static NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
) throws IOException, SegmentLoadingException
{
final List<DataSegment> usedSegments = segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
return new NonnullPair<>(segmentFileMap, timelineSegments);
}
private static DataSchema createDataSchema(
String dataSource,
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable AggregatorFactory[] metricsSpec,
Granularity segmentGranularity
)
{
// check index metadata
for (NonnullPair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
}
}
// find granularity spec
// set rollup only if rollup is set for all segments
final boolean rollup = queryableIndexAndSegments.stream().allMatch(pair -> {
// We have already checked getMetadata() doesn't return null
final Boolean isRollup = pair.lhs.getMetadata().isRollup();
return isRollup != null && isRollup;
});
final Interval totalInterval = JodaUtils.umbrellaInterval(
queryableIndexAndSegments.stream().map(p -> p.rhs.getInterval()).collect(Collectors.toList())
);
final GranularitySpec granularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(segmentGranularity),
Granularities.NONE,
rollup,
Collections.singletonList(totalInterval)
);
// find unique dimensions
final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null
? createDimensionsSpec(queryableIndexAndSegments)
: dimensionsSpec;
final AggregatorFactory[] finalMetricsSpec = metricsSpec == null
? createMetricsSpec(queryableIndexAndSegments)
: convertToCombiningFactories(metricsSpec);
return new DataSchema(
dataSource,
new TimestampSpec(null, null, null),
finalDimensionsSpec,
finalMetricsSpec,
granularitySpec,
null
);
}
private static AggregatorFactory[] createMetricsSpec(
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
.stream()
.map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
.collect(Collectors.toList());
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);
if (mergedAggregators == null) {
throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
}
return mergedAggregators;
}
private static AggregatorFactory[] convertToCombiningFactories(AggregatorFactory[] metricsSpec)
{
return Arrays.stream(metricsSpec)
.map(AggregatorFactory::getCombiningFactory)
.toArray(AggregatorFactory[]::new);
}
private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableIndex, DataSegment>> queryableIndices)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
// Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be
// optimized for performance.
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
// frequently, and thus the performance should be optimized for recent ones rather than old ones.
// sort timelineSegments in order of interval, see https://github.com/apache/druid/pull/9905
queryableIndices.sort(
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o1.rhs.getInterval(), o2.rhs.getInterval())
);
int index = 0;
for (NonnullPair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
final QueryableIndex queryableIndex = pair.lhs;
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
for (String dimension : queryableIndex.getAvailableDimensions()) {
final ColumnHolder columnHolder = Preconditions.checkNotNull(
queryableIndex.getColumnHolder(dimension),
"Cannot find column for dimension[%s]",
dimension
);
if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
dimension
);
uniqueDims.put(dimension, index++);
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
columnHolder.getCapabilities().getType(),
dimension,
dimensionHandler.getMultivalueHandling(),
columnHolder.getCapabilities().hasBitmapIndexes()
)
);
}
}
}
final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
final List<DimensionSchema> dimensionSchemas = IntStream.range(0, orderedDims.size())
.mapToObj(i -> {
final String dimName = orderedDims.get(i);
return Preconditions.checkNotNull(
dimensionSchemaMap.get(dimName),
"Cannot find dimension[%s] from dimensionSchemaMap",
dimName
);
})
.collect(Collectors.toList());
return new DimensionsSpec(dimensionSchemas, null, null);
}
private static List<NonnullPair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
final List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
final DataSegment segment = chunk.getObject();
final QueryableIndex queryableIndex = indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
);
segments.add(new NonnullPair<>(queryableIndex, segment));
}
}
return segments;
}
private static DimensionSchema createDimensionSchema(
ValueType type,
String name,
MultiValueHandling multiValueHandling,
boolean hasBitmapIndexes
)
{
switch (type) {
case FLOAT:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for float type yet",
name
);
return new FloatDimensionSchema(name);
case LONG:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for long type yet",
name
);
return new LongDimensionSchema(name);
case DOUBLE:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for double type yet",
name
);
return new DoubleDimensionSchema(name);
case STRING:
return new StringDimensionSchema(name, multiValueHandling, hasBitmapIndexes);
default:
throw new ISE("Unsupported value type[%s] for dimension[%s]", type, name);
}
}
@VisibleForTesting
static class SegmentProvider
{
private final String dataSource;
private final CompactionInputSpec inputSpec;
private final Interval interval;
SegmentProvider(String dataSource, CompactionInputSpec inputSpec)
{
this.dataSource = Preconditions.checkNotNull(dataSource);
this.inputSpec = inputSpec;
this.interval = inputSpec.findInterval(dataSource);
}
List<DataSegment> findSegments(TaskActionClient actionClient) throws IOException
{
return new ArrayList<>(
actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, interval, null, Segments.ONLY_VISIBLE))
);
}
void checkSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments)
{
if (!inputSpec.validateSegments(lockGranularityInUse, latestSegments)) {
throw new ISE(
"Specified segments in the spec are different from the current used segments. "
+ "Possibly new segments would have been added or some segments have been unpublished."
);
}
}
}
@VisibleForTesting
static class PartitionConfigurationManager
{
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
PartitionConfigurationManager(@Nullable ParallelIndexTuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
}
@Nullable
ParallelIndexTuningConfig computeTuningConfig()
{
ParallelIndexTuningConfig newTuningConfig = tuningConfig == null
? ParallelIndexTuningConfig.defaultConfig()
: tuningConfig;
PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec();
if (partitionsSpec instanceof DynamicPartitionsSpec) {
final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) partitionsSpec;
partitionsSpec = new DynamicPartitionsSpec(
dynamicPartitionsSpec.getMaxRowsPerSegment(),
// Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment.
// If this is set to something too small, compactionTask can generate small segments
// which need to be compacted again, which in turn making auto compaction stuck in the same interval.
dynamicPartitionsSpec.getMaxTotalRowsOr(Long.MAX_VALUE)
);
}
return newTuningConfig.withPartitionsSpec(partitionsSpec);
}
}
public static class Builder
{
private final String dataSource;
private final SegmentLoaderFactory segmentLoaderFactory;
private final RetryPolicyFactory retryPolicyFactory;
private CompactionIOConfig ioConfig;
@Nullable
private DimensionsSpec dimensionsSpec;
@Nullable
private AggregatorFactory[] metricsSpec;
@Nullable
private Granularity segmentGranularity;
@Nullable
private TuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;
public Builder(
String dataSource,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory
)
{
this.dataSource = dataSource;
this.segmentLoaderFactory = segmentLoaderFactory;
this.retryPolicyFactory = retryPolicyFactory;
}
public Builder interval(Interval interval)
{
return inputSpec(new CompactionIntervalSpec(interval, null));
}
public Builder segments(List<DataSegment> segments)
{
return inputSpec(SpecificSegmentsSpec.fromSegments(segments));
}
public Builder inputSpec(CompactionInputSpec inputSpec)
{
this.ioConfig = new CompactionIOConfig(inputSpec);
return this;
}
public Builder dimensionsSpec(DimensionsSpec dimensionsSpec)
{
this.dimensionsSpec = dimensionsSpec;
return this;
}
public Builder metricsSpec(AggregatorFactory[] metricsSpec)
{
this.metricsSpec = metricsSpec;
return this;
}
public Builder segmentGranularity(Granularity segmentGranularity)
{
this.segmentGranularity = segmentGranularity;
return this;
}
public Builder tuningConfig(TuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
return this;
}
public Builder context(Map<String, Object> context)
{
this.context = context;
return this;
}
public CompactionTask build()
{
return new CompactionTask(
null,
null,
dataSource,
null,
null,
ioConfig,
null,
dimensionsSpec,
metricsSpec,
segmentGranularity,
tuningConfig,
context,
segmentLoaderFactory,
retryPolicyFactory
);
}
}
}