blob: 0258cd0069f77c4f93d5f4f54f4eef4bb1409bc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.helper;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.client.indexing.ClientCompactQuery;
import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
{
static final String COMPACT_TASK_COUNT = "compactTaskCount";
static final String SEGMENT_SIZE_WAIT_COMPACT = "segmentSizeWaitCompact";
// Should be synced with CompactionTask.TYPE
private static final String COMPACT_TASK_TYPE = "compact";
private static final Logger LOG = new Logger(DruidCoordinatorSegmentCompactor.class);
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy();
private final IndexingServiceClient indexingServiceClient;
private Object2LongMap<String> remainingSegmentSizeBytes;
@Inject
public DruidCoordinatorSegmentCompactor(IndexingServiceClient indexingServiceClient)
{
this.indexingServiceClient = indexingServiceClient;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
LOG.info("Run coordinator segment compactor");
final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
final CoordinatorStats stats = new CoordinatorStats();
if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = params.getDataSources();
List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
if (compactionConfigList != null && !compactionConfigList.isEmpty()) {
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
indexingServiceClient.getRunningTasks(),
indexingServiceClient.getPendingTasks(),
indexingServiceClient.getWaitingTasks()
);
// dataSource -> list of intervals of compact tasks
final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
for (TaskStatusPlus status : compactTasks) {
final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId());
if (response == null) {
throw new ISE("WTH? got a null paylord from overlord for task[%s]", status.getId());
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
final Interval interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
}
}
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, compactTaskIntervals);
final int compactionTaskCapacity = (int) Math.min(
indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
dynamicConfig.getMaxCompactionTaskSlots()
);
final int numNonCompleteCompactionTasks = compactTasks.size();
final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0
? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks)
// compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
// This guarantees that at least one slot is available if
// compaction is enabled and numRunningCompactTasks is 0.
: Math.max(1, compactionTaskCapacity);
LOG.info(
"Found [%d] available task slots for compaction out of [%d] max compaction task capacity",
numAvailableCompactionTaskSlots,
compactionTaskCapacity
);
if (numAvailableCompactionTaskSlots > 0) {
stats.accumulate(doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator));
} else {
stats.accumulate(makeStats(0, iterator));
}
} else {
LOG.info("compactionConfig is empty. Skip.");
}
} else {
LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction");
}
return params.buildFromExisting()
.withCoordinatorStats(stats)
.build();
}
@SafeVarargs
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
{
final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
return allTaskStatusPlus
.stream()
.filter(status -> {
final String taskType = status.getType();
// taskType can be null if middleManagers are running with an older version. Here, we consevatively regard
// the tasks of the unknown taskType as the compactionTask. This is because it's important to not run
// compactionTasks more than the configured limit at any time which might impact to the ingestion
// performance.
return taskType == null || COMPACT_TASK_TYPE.equals(taskType);
})
.collect(Collectors.toList());
}
private CoordinatorStats doRun(
Map<String, DataSourceCompactionConfig> compactionConfigs,
int numAvailableCompactionTaskSlots,
CompactionSegmentIterator iterator
)
{
int numSubmittedTasks = 0;
for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) {
final List<DataSegment> segmentsToCompact = iterator.next();
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
if (segmentsToCompact.size() > 1) {
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
// make tuningConfig
final String taskId = indexingServiceClient.compactSegments(
segmentsToCompact,
config.isKeepSegmentGranularity(),
config.getTargetCompactionSizeBytes(),
config.getTaskPriority(),
ClientCompactQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
config.getTaskContext()
);
LOG.info(
"Submitted a compactTask[%s] for segments %s",
taskId,
Iterables.transform(segmentsToCompact, DataSegment::getId)
);
} else if (segmentsToCompact.size() == 1) {
throw new ISE("Found one segments[%s] to compact", segmentsToCompact);
} else {
throw new ISE("Failed to find segments for dataSource[%s]", dataSourceName);
}
}
return makeStats(numSubmittedTasks, iterator);
}
private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator)
{
final CoordinatorStats stats = new CoordinatorStats();
stats.addToGlobalStat(COMPACT_TASK_COUNT, numCompactionTasks);
remainingSegmentSizeBytes = iterator.remainingSegmentSizeBytes();
iterator.remainingSegmentSizeBytes().object2LongEntrySet().fastForEach(
entry -> {
final String dataSource = entry.getKey();
final long numSegmentsWaitCompact = entry.getLongValue();
stats.addToDataSourceStat(SEGMENT_SIZE_WAIT_COMPACT, dataSource, numSegmentsWaitCompact);
}
);
return stats;
}
@Nullable
public long getRemainingSegmentSizeBytes(String dataSource)
{
return remainingSegmentSizeBytes.getLong(dataSource);
}
}