blob: f5aec08c3062b8dd10aa6cdd14394fa4530e976e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* The client representation of this task is {@link ClientCompactionTaskQuery}. JSON
* serialization fields of this class must correspond to those of {@link
* ClientCompactionTaskQuery}.
*/
public class CompactionTask extends AbstractBatchIndexTask implements PendingSegmentAllocatingTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final Clock UTC_CLOCK = Clock.systemUTC();
/**
* The CompactionTask creates and runs multiple IndexTask instances. When the {@link AppenderatorsManager}
* is asked to clean up, it does so on a per-task basis keyed by task ID. However, the subtask IDs of the
* CompactionTask are not externally visible. This context flag is used to ensure that all the appenderators
* created for the CompactionTasks's subtasks are tracked under the ID of the parent CompactionTask.
* The CompactionTask may change in the future and no longer require this behavior (e.g., reusing the same
* Appenderator across subtasks, or allowing the subtasks to use the same ID). The CompactionTask is also the only
* task type that currently creates multiple appenderators. Thus, a context flag is used to handle this case
* instead of a more general approach such as new methods on the Task interface.
*/
public static final String CTX_KEY_APPENDERATOR_TRACKING_TASK_ID = "appenderatorTrackingTaskId";
private static final String TYPE = "compact";
private static final boolean STORE_COMPACTION_STATE = true;
static {
Verify.verify(TYPE.equals(CompactSegments.COMPACTION_TASK_TYPE));
}
private final CompactionIOConfig ioConfig;
@Nullable
private final DimensionsSpec dimensionsSpec;
@Nullable
private final ClientCompactionTaskTransformSpec transformSpec;
@Nullable
private final AggregatorFactory[] metricsSpec;
@Nullable
private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final CompactionTuningConfig tuningConfig;
@JsonIgnore
private final SegmentProvider segmentProvider;
@JsonIgnore
private final PartitionConfigurationManager partitionConfigurationManager;
@JsonIgnore
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
@JsonIgnore
private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder(
(taskObject, config) -> {
final ParallelIndexSupervisorTask indexTask = (ParallelIndexSupervisorTask) taskObject;
indexTask.stopGracefully(config);
}
);
@JsonCreator
public CompactionTask(
@JsonProperty("id") @Nullable final String id,
@JsonProperty("resource") @Nullable final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") @Deprecated @Nullable final Interval interval,
@JsonProperty("segments") @Deprecated @Nullable final List<DataSegment> segments,
@JsonProperty("ioConfig") @Nullable CompactionIOConfig ioConfig,
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("transformSpec") @Nullable final ClientCompactionTaskTransformSpec transformSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Deprecated @Nullable final Granularity segmentGranularity,
@JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory
)
{
super(
getOrMakeId(id, TYPE, dataSource),
null,
taskResource,
dataSource,
context,
-1,
computeCompactionIngestionMode(ioConfig)
);
Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("ioConfig", ioConfig),
new Property<>("interval", interval),
new Property<>("segments", segments)
)
);
if (ioConfig != null) {
this.ioConfig = ioConfig;
} else if (interval != null) {
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), false, null);
} else {
// We already checked segments is not null or empty above.
//noinspection ConstantConditions
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), false, null);
}
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.transformSpec = transformSpec;
this.metricsSpec = metricsSpec;
// Prior to apache/druid#10843 users could specify segmentGranularity using `segmentGranularity`
// Now users should prefer to use `granularitySpec`
// In case users accidentally specify both, and they are conflicting, warn the user instead of proceeding
// by picking one or another.
if (granularitySpec != null
&& segmentGranularity != null
&& !segmentGranularity.equals(granularitySpec.getSegmentGranularity())) {
throw new IAE(StringUtils.format(
"Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n"
+ "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity",
segmentGranularity,
granularitySpec.getSegmentGranularity()
));
}
if (granularitySpec == null && segmentGranularity != null) {
this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null, null);
} else {
this.granularitySpec = granularitySpec;
}
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
}
@VisibleForTesting
static CompactionTuningConfig getTuningConfig(TuningConfig tuningConfig)
{
if (tuningConfig instanceof CompactionTuningConfig) {
return (CompactionTuningConfig) tuningConfig;
} else if (tuningConfig instanceof ParallelIndexTuningConfig) {
final ParallelIndexTuningConfig parallelIndexTuningConfig = (ParallelIndexTuningConfig) tuningConfig;
return new CompactionTuningConfig(
null,
parallelIndexTuningConfig.getMaxRowsPerSegment(),
parallelIndexTuningConfig.getAppendableIndexSpec(),
parallelIndexTuningConfig.getMaxRowsInMemory(),
parallelIndexTuningConfig.getMaxBytesInMemory(),
parallelIndexTuningConfig.isSkipBytesInMemoryOverheadCheck(),
parallelIndexTuningConfig.getMaxTotalRows(),
parallelIndexTuningConfig.getNumShards(),
parallelIndexTuningConfig.getSplitHintSpec(),
parallelIndexTuningConfig.getPartitionsSpec(),
parallelIndexTuningConfig.getIndexSpec(),
parallelIndexTuningConfig.getIndexSpecForIntermediatePersists(),
parallelIndexTuningConfig.getMaxPendingPersists(),
parallelIndexTuningConfig.isForceGuaranteedRollup(),
parallelIndexTuningConfig.isReportParseExceptions(),
parallelIndexTuningConfig.getPushTimeout(),
parallelIndexTuningConfig.getSegmentWriteOutMediumFactory(),
null,
parallelIndexTuningConfig.getMaxNumConcurrentSubTasks(),
parallelIndexTuningConfig.getMaxRetry(),
parallelIndexTuningConfig.getTaskStatusCheckPeriodMs(),
parallelIndexTuningConfig.getChatHandlerTimeout(),
parallelIndexTuningConfig.getChatHandlerNumRetries(),
parallelIndexTuningConfig.getMaxNumSegmentsToMerge(),
parallelIndexTuningConfig.getTotalNumMergeTasks(),
parallelIndexTuningConfig.isLogParseExceptions(),
parallelIndexTuningConfig.getMaxParseExceptions(),
parallelIndexTuningConfig.getMaxSavedParseExceptions(),
parallelIndexTuningConfig.getMaxColumnsToMerge(),
parallelIndexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(),
parallelIndexTuningConfig.getNumPersistThreads()
);
} else if (tuningConfig instanceof IndexTuningConfig) {
final IndexTuningConfig indexTuningConfig = (IndexTuningConfig) tuningConfig;
return new CompactionTuningConfig(
null,
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getAppendableIndexSpec(),
indexTuningConfig.getMaxRowsInMemory(),
indexTuningConfig.getMaxBytesInMemory(),
indexTuningConfig.isSkipBytesInMemoryOverheadCheck(),
indexTuningConfig.getMaxTotalRows(),
indexTuningConfig.getNumShards(),
null,
indexTuningConfig.getPartitionsSpec(),
indexTuningConfig.getIndexSpec(),
indexTuningConfig.getIndexSpecForIntermediatePersists(),
indexTuningConfig.getMaxPendingPersists(),
indexTuningConfig.isForceGuaranteedRollup(),
indexTuningConfig.isReportParseExceptions(),
indexTuningConfig.getPushTimeout(),
indexTuningConfig.getSegmentWriteOutMediumFactory(),
null,
null,
null,
null,
null,
null,
null,
null,
indexTuningConfig.isLogParseExceptions(),
indexTuningConfig.getMaxParseExceptions(),
indexTuningConfig.getMaxSavedParseExceptions(),
indexTuningConfig.getMaxColumnsToMerge(),
indexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(),
indexTuningConfig.getNumPersistThreads()
);
} else {
throw new ISE(
"Unknown tuningConfig type: [%s], Must be in [%s, %s, %s]",
tuningConfig.getClass().getName(),
CompactionTuningConfig.class.getName(),
ParallelIndexTuningConfig.class.getName(),
IndexTuningConfig.class.getName()
);
}
}
@VisibleForTesting
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
return currentSubTaskHolder;
}
@JsonProperty
public CompactionIOConfig getIoConfig()
{
return ioConfig;
}
@JsonProperty
@Nullable
public DimensionsSpec getDimensionsSpec()
{
return dimensionsSpec;
}
@JsonProperty
@Nullable
public ClientCompactionTaskTransformSpec getTransformSpec()
{
return transformSpec;
}
@JsonProperty
@Nullable
public AggregatorFactory[] getMetricsSpec()
{
return metricsSpec;
}
@JsonInclude(Include.NON_NULL)
@JsonProperty
@Nullable
@Override
public Granularity getSegmentGranularity()
{
return granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
}
@JsonProperty
@Nullable
public ClientCompactionTaskGranularitySpec getGranularitySpec()
{
return granularitySpec;
}
@Nullable
@JsonProperty
public ParallelIndexTuningConfig getTuningConfig()
{
return tuningConfig;
}
@Override
public String getType()
{
return TYPE;
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final List<DataSegment> segments = segmentProvider.findSegments(taskActionClient);
return determineLockGranularityAndTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
}
@Override
public boolean requireLockExistingSegments()
{
return true;
}
@Override
public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
throws IOException
{
return ImmutableList.copyOf(
taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), intervals))
);
}
@Override
public boolean isPerfectRollup()
{
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}
@VisibleForTesting
void emitCompactIngestionModeMetrics(
ServiceEmitter emitter,
boolean isDropExisting
)
{
if (emitter == null) {
return;
}
emitMetric(emitter, "ingest/count", 1);
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
// emit metric for compact ingestion mode:
emitCompactIngestionModeMetrics(toolbox.getEmitter(), ioConfig.isDropExisting());
final List<ParallelIndexIngestionSpec> ingestionSpecs = createIngestionSchema(
UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
ioConfig,
segmentProvider,
partitionConfigurationManager,
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec,
toolbox.getCoordinatorClient(),
segmentCacheManagerFactory,
getMetricBuilder()
);
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
.mapToObj(i -> {
// The ID of SubtaskSpecs is used as the base sequenceName in segment allocation protocol.
// The indexing tasks generated by the compaction task should use different sequenceNames
// so that they can allocate valid segment IDs with no duplication.
ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
final String baseSequenceName = createIndexTaskSpecId(i);
return newTask(baseSequenceName, ingestionSpec);
})
.collect(Collectors.toList());
if (indexTaskSpecs.isEmpty()) {
String msg = StringUtils.format(
"Can't find segments from inputSpec[%s], nothing to do.",
ioConfig.getInputSpec()
);
log.warn(msg);
return TaskStatus.failure(getId(), msg);
} else {
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
final int totalNumSpecs = indexTaskSpecs.size();
log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0;
final TaskReport.ReportMap completionReports = new TaskReport.ReportMap();
for (int i = 0; i < indexTaskSpecs.size(); i++) {
ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
if (!currentSubTaskHolder.setTask(eachSpec)) {
String errMsg = "Task was asked to stop. Finish as failed.";
log.info(errMsg);
return TaskStatus.failure(getId(), errMsg);
}
try {
if (eachSpec.isReady(toolbox.getTaskActionClient())) {
log.info("Running indexSpec: " + json);
final TaskStatus eachResult = eachSpec.run(toolbox);
if (!eachResult.isSuccess()) {
failCnt++;
log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}
String reportKeySuffix = "_" + i;
Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
reports -> completionReports.putAll(
CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix)
)
);
} else {
failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json);
}
}
catch (Exception e) {
failCnt++;
log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}
}
String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] failed",
totalNumSpecs, totalNumSpecs - failCnt, failCnt
);
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
log.info(msg);
return failCnt == 0 ? TaskStatus.success(getId()) : TaskStatus.failure(getId(), msg);
}
}
@VisibleForTesting
ParallelIndexSupervisorTask newTask(String baseSequenceName, ParallelIndexIngestionSpec ingestionSpec)
{
return new ParallelIndexSupervisorTask(
getId(),
getGroupId(),
getTaskResource(),
ingestionSpec,
baseSequenceName,
createContextForSubtask(),
true
);
}
@VisibleForTesting
Map<String, Object> createContextForSubtask()
{
final Map<String, Object> newContext = new HashMap<>(getContext());
newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId());
newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, STORE_COMPACTION_STATE);
// Set the priority of the compaction task.
newContext.put(Tasks.PRIORITY_KEY, getPriority());
return newContext;
}
private String createIndexTaskSpecId(int i)
{
return StringUtils.format("%s_%d", getId(), i);
}
/**
* Generate {@link ParallelIndexIngestionSpec} from input segments.
*
* @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@VisibleForTesting
static List<ParallelIndexIngestionSpec> createIngestionSchema(
final Clock clock,
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final CompactionIOConfig ioConfig,
final SegmentProvider segmentProvider,
final PartitionConfigurationManager partitionConfigurationManager,
@Nullable final DimensionsSpec dimensionsSpec,
@Nullable final ClientCompactionTaskTransformSpec transformSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient,
final SegmentCacheManagerFactory segmentCacheManagerFactory,
final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = retrieveRelevantTimelineHolders(
toolbox,
segmentProvider,
lockGranularityInUse
);
if (timelineSegments.size() == 0) {
return Collections.emptyList();
}
final CompactionTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
final List<ParallelIndexIngestionSpec> specs = new ArrayList<>();
// original granularity
final Map<Interval, List<DataSegment>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
for (final DataSegment dataSegment : VersionedIntervalTimeline.getAllObjects(timelineSegments)) {
intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new ArrayList<>())
.add(dataSegment);
}
// unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
List<NonnullPair<Interval, List<DataSegment>>> intervalToSegmentsUnified = new ArrayList<>();
Interval union = null;
List<DataSegment> segments = new ArrayList<>();
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
Interval cur = entry.getKey();
if (union == null) {
union = cur;
segments.addAll(entry.getValue());
} else if (union.overlaps(cur)) {
union = Intervals.utc(union.getStartMillis(), Math.max(union.getEndMillis(), cur.getEndMillis()));
segments.addAll(entry.getValue());
} else {
intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
union = cur;
segments = new ArrayList<>(entry.getValue());
}
}
intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
for (NonnullPair<Interval, List<DataSegment>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
final List<DataSegment> segmentsToCompact = entry.rhs;
// If granularitySpec is not null, then set segmentGranularity. Otherwise,
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
final DataSchema dataSchema = createDataSchema(
clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
interval,
lazyFetchSegments(segmentsToCompact, toolbox.getSegmentCacheManager(), toolbox.getIndexIO()),
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec == null
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null)
: granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);
specs.add(
new ParallelIndexIngestionSpec(
dataSchema,
createIoConfig(
toolbox,
dataSchema,
interval,
coordinatorClient,
segmentCacheManagerFactory,
ioConfig
),
compactionTuningConfig
)
);
}
return specs;
} else {
// given segment granularity
final DataSchema dataSchema = createDataSchema(
clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
VersionedIntervalTimeline.getAllObjects(timelineSegments),
DataSegment::getInterval
)
),
lazyFetchSegments(
VersionedIntervalTimeline.getAllObjects(timelineSegments),
toolbox.getSegmentCacheManager(),
toolbox.getIndexIO()
),
dimensionsSpec,
transformSpec,
metricsSpec,
granularitySpec
);
return Collections.singletonList(
new ParallelIndexIngestionSpec(
dataSchema,
createIoConfig(
toolbox,
dataSchema,
segmentProvider.interval,
coordinatorClient,
segmentCacheManagerFactory,
ioConfig
),
compactionTuningConfig
)
);
}
}
private static ParallelIndexIOConfig createIoConfig(
TaskToolbox toolbox,
DataSchema dataSchema,
Interval interval,
CoordinatorClient coordinatorClient,
SegmentCacheManagerFactory segmentCacheManagerFactory,
CompactionIOConfig compactionIOConfig
)
{
if (!compactionIOConfig.isAllowNonAlignedInterval()) {
// Validate interval alignment.
final Granularity segmentGranularity = dataSchema.getGranularitySpec().getSegmentGranularity();
final Interval widenedInterval = Intervals.utc(
segmentGranularity.bucketStart(interval.getStart()).getMillis(),
segmentGranularity.bucketEnd(interval.getEnd().minus(1)).getMillis()
);
if (!interval.equals(widenedInterval)) {
throw new IAE(
"Interval[%s] to compact is not aligned with segmentGranularity[%s]",
interval,
segmentGranularity
);
}
}
return new ParallelIndexIOConfig(
null,
new DruidInputSource(
dataSchema.getDataSource(),
interval,
null,
null,
null,
null,
toolbox.getIndexIO(),
coordinatorClient,
segmentCacheManagerFactory,
toolbox.getConfig()
).withTaskToolbox(toolbox),
null,
false,
compactionIOConfig.isDropExisting()
);
}
private static List<TimelineObjectHolder<String, DataSegment>> retrieveRelevantTimelineHolders(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
) throws IOException
{
final List<DataSegment> usedSegments =
segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = SegmentTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
return timelineSegments;
}
private static DataSchema createDataSchema(
Clock clock,
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder,
String dataSource,
Interval totalInterval,
Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec
)
{
// Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity
final ExistingSegmentAnalyzer existingSegmentAnalyzer = new ExistingSegmentAnalyzer(
segments,
granularitySpec.isRollup() == null,
granularitySpec.getQueryGranularity() == null,
dimensionsSpec == null,
metricsSpec == null
);
long start = clock.millis();
try {
existingSegmentAnalyzer.fetchAndProcessIfNeeded();
}
finally {
if (emitter != null) {
emitter.emit(metricBuilder.setMetric("compact/segmentAnalyzer/fetchAndProcessMillis", clock.millis() - start));
}
}
final Granularity queryGranularityToUse;
if (granularitySpec.getQueryGranularity() == null) {
queryGranularityToUse = existingSegmentAnalyzer.getQueryGranularity();
log.info("Generate compaction task spec with segments original query granularity [%s]", queryGranularityToUse);
} else {
queryGranularityToUse = granularitySpec.getQueryGranularity();
log.info(
"Generate compaction task spec with new query granularity overrided from input [%s]",
queryGranularityToUse
);
}
final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(granularitySpec.getSegmentGranularity()),
queryGranularityToUse,
granularitySpec.isRollup() == null ? existingSegmentAnalyzer.getRollup() : granularitySpec.isRollup(),
Collections.singletonList(totalInterval)
);
// find unique dimensions
final DimensionsSpec finalDimensionsSpec;
if (dimensionsSpec == null) {
finalDimensionsSpec = existingSegmentAnalyzer.getDimensionsSpec();
} else {
finalDimensionsSpec = dimensionsSpec;
}
final AggregatorFactory[] finalMetricsSpec;
if (metricsSpec == null) {
finalMetricsSpec = existingSegmentAnalyzer.getMetricsSpec();
} else {
finalMetricsSpec = metricsSpec;
}
return new DataSchema(
dataSource,
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,
transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null)
);
}
/**
* Lazily fetch and load {@link QueryableIndex}, skipping tombstones.
*/
private static Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> lazyFetchSegments(
Iterable<DataSegment> dataSegments,
SegmentCacheManager segmentCacheManager,
IndexIO indexIO
)
{
return Iterables.transform(
Iterables.filter(dataSegments, dataSegment -> !dataSegment.isTombstone()),
dataSegment -> fetchSegment(dataSegment, segmentCacheManager, indexIO)
);
}
// Broken out into a separate function because Some tools can't infer the
// pair type, but if the type is given explicitly, IntelliJ inspections raises
// an error. Creating a function keeps everyone happy.
private static Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> fetchSegment(
DataSegment dataSegment,
SegmentCacheManager segmentCacheManager,
IndexIO indexIO
)
{
return Pair.of(
dataSegment,
() -> {
try {
final Closer closer = Closer.create();
final File file = segmentCacheManager.getSegmentFiles(dataSegment);
closer.register(() -> segmentCacheManager.cleanup(dataSegment));
final QueryableIndex queryableIndex = closer.register(indexIO.loadIndex(file));
return new ResourceHolder<QueryableIndex>()
{
@Override
public QueryableIndex get()
{
return queryableIndex;
}
@Override
public void close()
{
try {
closer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
);
}
/**
* Class for fetching and analyzing existing segments in order to generate reingestion specs.
*/
static class ExistingSegmentAnalyzer
{
private final Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segmentsIterable;
private final boolean needRollup;
private final boolean needQueryGranularity;
private final boolean needDimensionsSpec;
private final boolean needMetricsSpec;
// For processRollup:
private boolean rollup = true;
// For processQueryGranularity:
private Granularity queryGranularity;
// For processDimensionsSpec:
private final BiMap<String, Integer> uniqueDims = HashBiMap.create();
private final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
// For processMetricsSpec:
private final Set<List<AggregatorFactory>> aggregatorFactoryLists = new HashSet<>();
ExistingSegmentAnalyzer(
final Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segmentsIterable,
final boolean needRollup,
final boolean needQueryGranularity,
final boolean needDimensionsSpec,
final boolean needMetricsSpec
)
{
this.segmentsIterable = segmentsIterable;
this.needRollup = needRollup;
this.needQueryGranularity = needQueryGranularity;
this.needDimensionsSpec = needDimensionsSpec;
this.needMetricsSpec = needMetricsSpec;
}
public void fetchAndProcessIfNeeded()
{
if (!needRollup && !needQueryGranularity && !needDimensionsSpec && !needMetricsSpec) {
// Nothing to do; short-circuit and don't fetch segments.
return;
}
final List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segments = sortSegmentsListNewestFirst();
for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> segmentPair : segments) {
final DataSegment dataSegment = segmentPair.lhs;
try (final ResourceHolder<QueryableIndex> queryableIndexHolder = segmentPair.rhs.get()) {
final QueryableIndex index = queryableIndexHolder.get();
if (index != null) { // Avoid tombstones (null QueryableIndex)
if (index.getMetadata() == null) {
throw new RE(
"Index metadata doesn't exist for segment [%s]. Try providing explicit rollup, "
+ "queryGranularity, dimensionsSpec, and metricsSpec.", dataSegment.getId()
);
}
processRollup(index);
processQueryGranularity(index);
processDimensionsSpec(index);
processMetricsSpec(index);
}
}
}
}
public Boolean getRollup()
{
if (!needRollup) {
throw new ISE("Not computing rollup");
}
return rollup;
}
public Granularity getQueryGranularity()
{
if (!needQueryGranularity) {
throw new ISE("Not computing queryGranularity");
}
return queryGranularity;
}
public DimensionsSpec getDimensionsSpec()
{
if (!needDimensionsSpec) {
throw new ISE("Not computing dimensionsSpec");
}
final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
final List<DimensionSchema> dimensionSchemas =
IntStream.range(0, orderedDims.size())
.mapToObj(i -> {
final String dimName = orderedDims.get(i);
return Preconditions.checkNotNull(
dimensionSchemaMap.get(dimName),
"Cannot find dimension[%s] from dimensionSchemaMap",
dimName
);
})
.collect(Collectors.toList());
return new DimensionsSpec(dimensionSchemas);
}
public AggregatorFactory[] getMetricsSpec()
{
if (!needMetricsSpec) {
throw new ISE("Not computing metricsSpec");
}
if (aggregatorFactoryLists.isEmpty()) {
return new AggregatorFactory[0];
}
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(
aggregatorFactoryLists.stream()
.map(xs -> xs.toArray(new AggregatorFactory[0]))
.collect(Collectors.toList())
);
if (mergedAggregators == null) {
throw new ISE(
"Failed to merge existing aggregators when generating metricsSpec; "
+ "try providing explicit metricsSpec"
);
}
return mergedAggregators;
}
/**
* Sort {@link #segmentsIterable} in order, such that we look at later segments prior to earlier ones. Useful when
* analyzing dimensions, as it allows us to take the latest value we see, and therefore prefer types from more
* recent segments, if there was a change.
*
* Returns a List copy of the original Iterable.
*/
private List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> sortSegmentsListNewestFirst()
{
final List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segments =
Lists.newArrayList(segmentsIterable);
segments.sort(
Comparator.comparing(
o -> o.lhs.getInterval(),
Comparators.intervalsByStartThenEnd().reversed()
)
);
return segments;
}
private void processRollup(final QueryableIndex index)
{
if (!needRollup) {
return;
}
// carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:
// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
final Boolean isIndexRollup = index.getMetadata().isRollup();
rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
}
private void processQueryGranularity(final QueryableIndex index)
{
if (!needQueryGranularity) {
return;
}
// Pick the finer, non-null, of the query granularities of the segments being compacted
Granularity current = index.getMetadata().getQueryGranularity();
queryGranularity = compareWithCurrent(queryGranularity, current);
}
private void processDimensionsSpec(final QueryableIndex index)
{
if (!needDimensionsSpec) {
return;
}
final Map<String, DimensionHandler> dimensionHandlerMap = index.getDimensionHandlers();
for (String dimension : index.getAvailableDimensions()) {
final ColumnHolder columnHolder = Preconditions.checkNotNull(
index.getColumnHolder(dimension),
"Cannot find column for dimension[%s]",
dimension
);
if (!uniqueDims.containsKey(dimension)) {
Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
dimension
);
uniqueDims.put(dimension, uniqueDims.size());
dimensionSchemaMap.put(
dimension,
columnHolder.getColumnFormat().getColumnSchema(dimension)
);
}
}
}
private void processMetricsSpec(final QueryableIndex index)
{
if (!needMetricsSpec) {
return;
}
final AggregatorFactory[] aggregators = index.getMetadata().getAggregators();
if (aggregators != null) {
// aggregatorFactoryLists is a Set: we don't want to store tons of copies of the same aggregator lists from
// different segments.
aggregatorFactoryLists.add(Arrays.asList(aggregators));
}
}
static Granularity compareWithCurrent(Granularity queryGranularity, Granularity current)
{
if (queryGranularity == null && current != null) {
queryGranularity = current;
} else if (queryGranularity != null
&& current != null
&& Granularity.IS_FINER_THAN.compare(current, queryGranularity) < 0) {
queryGranularity = current;
}
// we never propagate nulls when there is at least one non-null granularity thus
// do nothing for the case queryGranularity != null && current == null
return queryGranularity;
}
}
@VisibleForTesting
static class SegmentProvider
{
private final String dataSource;
private final CompactionInputSpec inputSpec;
private final Interval interval;
SegmentProvider(String dataSource, CompactionInputSpec inputSpec)
{
this.dataSource = Preconditions.checkNotNull(dataSource);
this.inputSpec = inputSpec;
this.interval = inputSpec.findInterval(dataSource);
}
List<DataSegment> findSegments(TaskActionClient actionClient) throws IOException
{
return new ArrayList<>(
actionClient.submit(
new RetrieveUsedSegmentsAction(dataSource, ImmutableList.of(interval))
)
);
}
void checkSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments)
{
if (latestSegments.isEmpty()) {
throw new ISE("No segments found for compaction. Please check that datasource name and interval are correct.");
}
if (!inputSpec.validateSegments(lockGranularityInUse, latestSegments)) {
throw new ISE(
"Specified segments in the spec are different from the current used segments. "
+ "Possibly new segments would have been added or some segments have been unpublished."
);
}
}
}
@VisibleForTesting
static class PartitionConfigurationManager
{
@Nullable
private final CompactionTuningConfig tuningConfig;
PartitionConfigurationManager(@Nullable CompactionTuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
}
@Nullable
CompactionTuningConfig computeTuningConfig()
{
CompactionTuningConfig newTuningConfig = tuningConfig == null
? CompactionTuningConfig.defaultConfig()
: tuningConfig;
PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec();
if (partitionsSpec instanceof DynamicPartitionsSpec) {
final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) partitionsSpec;
partitionsSpec = new DynamicPartitionsSpec(
dynamicPartitionsSpec.getMaxRowsPerSegment(),
dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_COMPACTION_MAX_TOTAL_ROWS)
);
}
return newTuningConfig.withPartitionsSpec(partitionsSpec);
}
}
public static class Builder
{
private final String dataSource;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
private final RetryPolicyFactory retryPolicyFactory;
private CompactionIOConfig ioConfig;
@Nullable
private DimensionsSpec dimensionsSpec;
@Nullable
private ClientCompactionTaskTransformSpec transformSpec;
@Nullable
private AggregatorFactory[] metricsSpec;
@Nullable
private Granularity segmentGranularity;
@Nullable
private ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private TuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;
public Builder(
String dataSource,
SegmentCacheManagerFactory segmentCacheManagerFactory,
RetryPolicyFactory retryPolicyFactory
)
{
this.dataSource = dataSource;
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
this.retryPolicyFactory = retryPolicyFactory;
}
public Builder interval(Interval interval)
{
return inputSpec(new CompactionIntervalSpec(interval, null));
}
public Builder segments(List<DataSegment> segments)
{
return inputSpec(SpecificSegmentsSpec.fromSegments(segments));
}
public Builder ioConfig(CompactionIOConfig ioConfig)
{
this.ioConfig = ioConfig;
return this;
}
public Builder inputSpec(CompactionInputSpec inputSpec)
{
this.ioConfig = new CompactionIOConfig(inputSpec, false, null);
return this;
}
public Builder inputSpec(CompactionInputSpec inputSpec, Boolean dropExisting)
{
this.ioConfig = new CompactionIOConfig(inputSpec, false, dropExisting);
return this;
}
public Builder dimensionsSpec(DimensionsSpec dimensionsSpec)
{
this.dimensionsSpec = dimensionsSpec;
return this;
}
public Builder transformSpec(ClientCompactionTaskTransformSpec transformSpec)
{
this.transformSpec = transformSpec;
return this;
}
public Builder metricsSpec(AggregatorFactory[] metricsSpec)
{
this.metricsSpec = metricsSpec;
return this;
}
public Builder segmentGranularity(Granularity segmentGranularity)
{
this.segmentGranularity = segmentGranularity;
return this;
}
public Builder granularitySpec(ClientCompactionTaskGranularitySpec granularitySpec)
{
this.granularitySpec = granularitySpec;
return this;
}
public Builder tuningConfig(TuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
return this;
}
public Builder context(Map<String, Object> context)
{
this.context = context;
return this;
}
public CompactionTask build()
{
return new CompactionTask(
null,
null,
dataSource,
null,
null,
ioConfig,
null,
dimensionsSpec,
transformSpec,
metricsSpec,
segmentGranularity,
granularitySpec,
tuningConfig,
context,
segmentCacheManagerFactory
);
}
}
/**
* Compcation Task Tuning Config.
*
* An extension of ParallelIndexTuningConfig. As of now, all this TuningConfig
* does is fail if the TuningConfig contains
* `awaitSegmentAvailabilityTimeoutMillis` that is != 0 since it is not
* supported for Compcation Tasks.
*/
public static class CompactionTuningConfig extends ParallelIndexTuningConfig
{
public static final String TYPE = "compaction";
public static CompactionTuningConfig defaultConfig()
{
return new CompactionTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
0L,
null
);
}
@JsonCreator
public CompactionTuningConfig(
@JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@JsonProperty("numShards") @Deprecated @Nullable Integer numShards,
@JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
@JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup,
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("pushTimeout") @Nullable Long pushTimeout,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("maxNumSubTasks") @Deprecated @Nullable Integer maxNumSubTasks,
@JsonProperty("maxNumConcurrentSubTasks") @Nullable Integer maxNumConcurrentSubTasks,
@JsonProperty("maxRetry") @Nullable Integer maxRetry,
@JsonProperty("taskStatusCheckPeriodMs") @Nullable Long taskStatusCheckPeriodMs,
@JsonProperty("chatHandlerTimeout") @Nullable Duration chatHandlerTimeout,
@JsonProperty("chatHandlerNumRetries") @Nullable Integer chatHandlerNumRetries,
@JsonProperty("maxNumSegmentsToMerge") @Nullable Integer maxNumSegmentsToMerge,
@JsonProperty("totalNumMergeTasks") @Nullable Integer totalNumMergeTasks,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
)
{
super(
targetPartitionSize,
maxRowsPerSegment,
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
skipBytesInMemoryOverheadCheck,
maxTotalRows,
numShards,
splitHintSpec,
partitionsSpec,
indexSpec,
indexSpecForIntermediatePersists,
maxPendingPersists,
forceGuaranteedRollup,
reportParseExceptions,
pushTimeout,
segmentWriteOutMediumFactory,
maxNumSubTasks,
maxNumConcurrentSubTasks,
maxRetry,
taskStatusCheckPeriodMs,
chatHandlerTimeout,
chatHandlerNumRetries,
maxNumSegmentsToMerge,
totalNumMergeTasks,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
maxColumnsToMerge,
awaitSegmentAvailabilityTimeoutMillis,
null,
numPersistThreads
);
Preconditions.checkArgument(
awaitSegmentAvailabilityTimeoutMillis == null || awaitSegmentAvailabilityTimeoutMillis == 0,
"awaitSegmentAvailabilityTimeoutMillis is not supported for Compcation Task"
);
}
@Override
public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new CompactionTuningConfig(
null,
null,
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
isSkipBytesInMemoryOverheadCheck(),
null,
null,
getSplitHintSpec(),
partitionsSpec,
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
getMaxPendingPersists(),
isForceGuaranteedRollup(),
isReportParseExceptions(),
getPushTimeout(),
getSegmentWriteOutMediumFactory(),
null,
getMaxNumConcurrentSubTasks(),
getMaxRetry(),
getTaskStatusCheckPeriodMs(),
getChatHandlerTimeout(),
getChatHandlerNumRetries(),
getMaxNumSegmentsToMerge(),
getTotalNumMergeTasks(),
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getMaxColumnsToMerge(),
getAwaitSegmentAvailabilityTimeoutMillis(),
getNumPersistThreads()
);
}
}
}