Auto-compaction snapshot status API (#10371)
* Auto-compaction snapshot API
* Auto-compaction snapshot API
* Auto-compaction snapshot API
* Auto-compaction snapshot API
* Auto-compaction snapshot API
* Auto-compaction snapshot API
* Auto-compaction snapshot API
* fix when not all compacted segments are iterated
* add unit tests
* add unit tests
* add unit tests
* add unit tests
* add unit tests
* add unit tests
* add some tests to make code cov happy
* address comments
* address comments
* address comments
* address comments
* make code coverage happy
* address comments
* address comments
* address comments
* address comments
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
new file mode 100644
index 0000000..e39fde9
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+ public enum AutoCompactionScheduleStatus
+ {
+ NOT_ENABLED,
+ RUNNING
+ }
+
+ @JsonProperty
+ private String dataSource;
+ @JsonProperty
+ private AutoCompactionScheduleStatus scheduleStatus;
+ @JsonProperty
+ private long bytesAwaitingCompaction;
+ @JsonProperty
+ private long bytesCompacted;
+ @JsonProperty
+ private long bytesSkipped;
+ @JsonProperty
+ private long segmentCountAwaitingCompaction;
+ @JsonProperty
+ private long segmentCountCompacted;
+ @JsonProperty
+ private long segmentCountSkipped;
+ @JsonProperty
+ private long intervalCountAwaitingCompaction;
+ @JsonProperty
+ private long intervalCountCompacted;
+ @JsonProperty
+ private long intervalCountSkipped;
+
+ @JsonCreator
+ public AutoCompactionSnapshot(
+ @JsonProperty @NotNull String dataSource,
+ @JsonProperty @NotNull AutoCompactionScheduleStatus scheduleStatus,
+ @JsonProperty long bytesAwaitingCompaction,
+ @JsonProperty long bytesCompacted,
+ @JsonProperty long bytesSkipped,
+ @JsonProperty long segmentCountAwaitingCompaction,
+ @JsonProperty long segmentCountCompacted,
+ @JsonProperty long segmentCountSkipped,
+ @JsonProperty long intervalCountAwaitingCompaction,
+ @JsonProperty long intervalCountCompacted,
+ @JsonProperty long intervalCountSkipped
+ )
+ {
+ this.dataSource = dataSource;
+ this.scheduleStatus = scheduleStatus;
+ this.bytesAwaitingCompaction = bytesAwaitingCompaction;
+ this.bytesCompacted = bytesCompacted;
+ this.bytesSkipped = bytesSkipped;
+ this.segmentCountAwaitingCompaction = segmentCountAwaitingCompaction;
+ this.segmentCountCompacted = segmentCountCompacted;
+ this.segmentCountSkipped = segmentCountSkipped;
+ this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction;
+ this.intervalCountCompacted = intervalCountCompacted;
+ this.intervalCountSkipped = intervalCountSkipped;
+ }
+
+ @NotNull
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @NotNull
+ public AutoCompactionScheduleStatus getScheduleStatus()
+ {
+ return scheduleStatus;
+ }
+
+ public long getBytesAwaitingCompaction()
+ {
+ return bytesAwaitingCompaction;
+ }
+
+ public long getBytesCompacted()
+ {
+ return bytesCompacted;
+ }
+
+ public long getBytesSkipped()
+ {
+ return bytesSkipped;
+ }
+
+ public long getSegmentCountAwaitingCompaction()
+ {
+ return segmentCountAwaitingCompaction;
+ }
+
+ public long getSegmentCountCompacted()
+ {
+ return segmentCountCompacted;
+ }
+
+ public long getSegmentCountSkipped()
+ {
+ return segmentCountSkipped;
+ }
+
+ public long getIntervalCountAwaitingCompaction()
+ {
+ return intervalCountAwaitingCompaction;
+ }
+
+ public long getIntervalCountCompacted()
+ {
+ return intervalCountCompacted;
+ }
+
+ public long getIntervalCountSkipped()
+ {
+ return intervalCountSkipped;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AutoCompactionSnapshot that = (AutoCompactionSnapshot) o;
+ return bytesAwaitingCompaction == that.bytesAwaitingCompaction &&
+ bytesCompacted == that.bytesCompacted &&
+ bytesSkipped == that.bytesSkipped &&
+ segmentCountAwaitingCompaction == that.segmentCountAwaitingCompaction &&
+ segmentCountCompacted == that.segmentCountCompacted &&
+ segmentCountSkipped == that.segmentCountSkipped &&
+ intervalCountAwaitingCompaction == that.intervalCountAwaitingCompaction &&
+ intervalCountCompacted == that.intervalCountCompacted &&
+ intervalCountSkipped == that.intervalCountSkipped &&
+ dataSource.equals(that.dataSource) &&
+ scheduleStatus == that.scheduleStatus;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ dataSource,
+ scheduleStatus,
+ bytesAwaitingCompaction,
+ bytesCompacted,
+ bytesSkipped,
+ segmentCountAwaitingCompaction,
+ segmentCountCompacted,
+ segmentCountSkipped,
+ intervalCountAwaitingCompaction,
+ intervalCountCompacted,
+ intervalCountSkipped
+ );
+ }
+
+ public static class Builder
+ {
+ private String dataSource;
+ private AutoCompactionScheduleStatus scheduleStatus;
+ private long bytesAwaitingCompaction;
+ private long bytesCompacted;
+ private long bytesSkipped;
+ private long segmentCountAwaitingCompaction;
+ private long segmentCountCompacted;
+ private long segmentCountSkipped;
+ private long intervalCountAwaitingCompaction;
+ private long intervalCountCompacted;
+ private long intervalCountSkipped;
+
+
+ public Builder(
+ @NotNull String dataSource,
+ @NotNull AutoCompactionScheduleStatus scheduleStatus
+ )
+ {
+ this.dataSource = dataSource;
+ this.scheduleStatus = scheduleStatus;
+ this.bytesAwaitingCompaction = 0;
+ this.bytesCompacted = 0;
+ this.bytesSkipped = 0;
+ this.segmentCountAwaitingCompaction = 0;
+ this.segmentCountCompacted = 0;
+ this.segmentCountSkipped = 0;
+ this.intervalCountAwaitingCompaction = 0;
+ this.intervalCountCompacted = 0;
+ this.intervalCountSkipped = 0;
+ }
+
+ public Builder incrementBytesAwaitingCompaction(long incrementValue)
+ {
+ this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue;
+ return this;
+ }
+
+ public Builder incrementBytesCompacted(long incrementValue)
+ {
+ this.bytesCompacted = this.bytesCompacted + incrementValue;
+ return this;
+ }
+
+ public Builder incrementSegmentCountAwaitingCompaction(long incrementValue)
+ {
+ this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue;
+ return this;
+ }
+
+ public Builder incrementSegmentCountCompacted(long incrementValue)
+ {
+ this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
+ return this;
+ }
+
+ public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
+ {
+ this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
+ return this;
+ }
+
+ public Builder incrementIntervalCountCompacted(long incrementValue)
+ {
+ this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
+ return this;
+ }
+
+ public Builder incrementBytesSkipped(long incrementValue)
+ {
+ this.bytesSkipped = this.bytesSkipped + incrementValue;
+ return this;
+ }
+
+ public Builder incrementSegmentCountSkipped(long incrementValue)
+ {
+ this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
+ return this;
+ }
+
+ public Builder incrementIntervalCountSkipped(long incrementValue)
+ {
+ this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
+ return this;
+ }
+
+ public AutoCompactionSnapshot build()
+ {
+ if (dataSource == null || dataSource.isEmpty()) {
+ throw new ISE("Invalid dataSource name");
+ }
+ if (scheduleStatus == null) {
+ throw new ISE("scheduleStatus cannot be null");
+ }
+ return new AutoCompactionSnapshot(
+ dataSource,
+ scheduleStatus,
+ bytesAwaitingCompaction,
+ bytesCompacted,
+ bytesSkipped,
+ segmentCountAwaitingCompaction,
+ segmentCountCompacted,
+ segmentCountSkipped,
+ intervalCountAwaitingCompaction,
+ intervalCountCompacted,
+ intervalCountSkipped
+ );
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java b/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
new file mode 100644
index 0000000..676630e
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+public class CompactionStatistics
+{
+ private long byteSum;
+ private long segmentNumberCountSum;
+ private long segmentIntervalCountSum;
+
+ public CompactionStatistics(
+ long byteSum,
+ long segmentNumberCountSum,
+ long segmentIntervalCountSum
+ )
+ {
+ this.byteSum = byteSum;
+ this.segmentNumberCountSum = segmentNumberCountSum;
+ this.segmentIntervalCountSum = segmentIntervalCountSum;
+ }
+
+ public static CompactionStatistics initializeCompactionStatistics()
+ {
+ return new CompactionStatistics(0, 0, 0);
+ }
+
+ public long getByteSum()
+ {
+ return byteSum;
+ }
+
+ public long getSegmentNumberCountSum()
+ {
+ return segmentNumberCountSum;
+ }
+
+ public long getSegmentIntervalCountSum()
+ {
+ return segmentIntervalCountSum;
+ }
+
+ public void incrementCompactedByte(long incrementValue)
+ {
+ byteSum = byteSum + incrementValue;
+ }
+
+ public void incrementCompactedSegments(long incrementValue)
+ {
+ segmentNumberCountSum = segmentNumberCountSum + incrementValue;
+ }
+
+ public void incrementCompactedIntervals(long incrementValue)
+ {
+ segmentIntervalCountSum = segmentIntervalCountSum + incrementValue;
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index a406761..3300e59 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -367,6 +367,17 @@
return compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
}
+ @Nullable
+ public AutoCompactionSnapshot getAutoCompactionSnapshotForDataSource(String dataSource)
+ {
+ return compactSegments.getAutoCompactionSnapshot(dataSource);
+ }
+
+ public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
+ {
+ return compactSegments.getAutoCompactionSnapshot();
+ }
+
public CoordinatorDynamicConfig getDynamicConfigs()
{
return CoordinatorDynamicConfig.current(configManager);
@@ -589,7 +600,11 @@
}
for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
- ScheduledExecutors.scheduleWithFixedDelay(
+ // CompactSegmentsDuty can takes a non trival amount of time to complete.
+ // Hence, we schedule at fixed rate to make sure the other tasks still run at approximately every
+ // config.getCoordinatorIndexingPeriod() period. Note that cautious should be taken
+ // if setting config.getCoordinatorIndexingPeriod() lower than the default value.
+ ScheduledExecutors.scheduleAtFixedRate(
exec,
config.getCoordinatorStartDelay(),
dutiesRunnable.rhs,
@@ -657,8 +672,9 @@
{
List<CoordinatorDuty> duties = new ArrayList<>();
duties.add(new LogUsedSegments());
- duties.addAll(makeCompactSegmentsDuty());
duties.addAll(indexingServiceDuties);
+ // CompactSegmentsDuty should be the last duty as it can take a long time to complete
+ duties.addAll(makeCompactSegmentsDuty());
log.debug(
"Done making indexing service duties %s",
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 876f4b6..50b31ca 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -30,6 +29,8 @@
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -43,13 +44,28 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CompactSegments implements CoordinatorDuty
{
static final String COMPACTION_TASK_COUNT = "compactTaskCount";
- static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION = "segmentSizeWaitCompact";
+ static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot";
+ static final String MAX_COMPACTION_TASK_SLOT = "maxCompactionTaskSlot";
+
+ static final String TOTAL_SIZE_OF_SEGMENTS_SKIPPED = "segmentSizeSkippedCompact";
+ static final String TOTAL_COUNT_OF_SEGMENTS_SKIPPED = "segmentCountSkippedCompact";
+ static final String TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED = "segmentIntervalSkippedCompact";
+
+ static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING = "segmentSizeWaitCompact";
+ static final String TOTAL_COUNT_OF_SEGMENTS_AWAITING = "segmentCountWaitCompact";
+ static final String TOTAL_INTERVAL_OF_SEGMENTS_AWAITING = "segmentIntervalWaitCompact";
+
+ static final String TOTAL_SIZE_OF_SEGMENTS_COMPACTED = "segmentSizeCompacted";
+ static final String TOTAL_COUNT_OF_SEGMENTS_COMPACTED = "segmentCountCompacted";
+ static final String TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED = "segmentIntervalCompacted";
/** Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. */
public static final String COMPACTION_TASK_TYPE = "compact";
@@ -61,7 +77,9 @@
private final CompactionSegmentSearchPolicy policy;
private final IndexingServiceClient indexingServiceClient;
- private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+ // This variable is updated by the Coordinator thread executing duties and
+ // read by HTTP threads processing Coordinator API calls.
+ private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();
@Inject
public CompactSegments(
@@ -71,6 +89,7 @@
{
this.policy = new NewestSegmentFirstPolicy(objectMapper);
this.indexingServiceClient = indexingServiceClient;
+ autoCompactionSnapshotPerDataSource.set(new HashMap<>());
}
@Override
@@ -80,12 +99,12 @@
final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
final CoordinatorStats stats = new CoordinatorStats();
-
+ final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
+ List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
+ updateAutoCompactionSnapshot(compactionConfigList, currentRunAutoCompactionSnapshotBuilders);
if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources =
params.getUsedSegmentsTimelinesPerDataSource();
- List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
-
if (compactionConfigList != null && !compactionConfigList.isEmpty()) {
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
.stream()
@@ -136,16 +155,23 @@
numAvailableCompactionTaskSlots,
compactionTaskCapacity
);
+ stats.addToGlobalStat(AVAILABLE_COMPACTION_TASK_SLOT, numAvailableCompactionTaskSlots);
+ stats.addToGlobalStat(MAX_COMPACTION_TASK_SLOT, compactionTaskCapacity);
+
if (numAvailableCompactionTaskSlots > 0) {
- stats.accumulate(doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator));
+ stats.accumulate(
+ doRun(compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, iterator)
+ );
} else {
- stats.accumulate(makeStats(0, iterator));
+ stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator));
}
} else {
LOG.info("compactionConfig is empty. Skip.");
+ updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
}
} else {
LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction");
+ updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
}
return params.buildFromExisting()
@@ -172,6 +198,33 @@
}
}
+ private void updateAutoCompactionSnapshot(
+ List<DataSourceCompactionConfig> compactionConfigList,
+ Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
+ {
+
+ Set<String> enabledDatasources = compactionConfigList.stream()
+ .map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
+ .collect(Collectors.toSet());
+ // Update AutoCompactionScheduleStatus for dataSource that now has auto compaction disabled
+ for (Map.Entry<String, AutoCompactionSnapshot> snapshot : autoCompactionSnapshotPerDataSource.get().entrySet()) {
+ if (!enabledDatasources.contains(snapshot.getKey())) {
+ currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+ snapshot.getKey(),
+ k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
+ );
+ }
+ }
+
+ // Create and Update snapshot for dataSource that has auto compaction enabled
+ for (String compactionConfigDataSource : enabledDatasources) {
+ currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+ compactionConfigDataSource,
+ k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+ );
+ }
+ }
+
private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses)
{
return taskStatuses
@@ -189,6 +242,7 @@
private CoordinatorStats doRun(
Map<String, DataSourceCompactionConfig> compactionConfigs,
+ Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
int numAvailableCompactionTaskSlots,
CompactionSegmentIterator iterator
)
@@ -200,6 +254,12 @@
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
+ // As these segments will be compacted, we will aggregates the statistic to the Compacted statistics
+ AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+ snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
+ snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
+ snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
+
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
// make tuningConfig
final String taskId = indexingServiceClient.compactSegments(
@@ -209,6 +269,7 @@
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
newAutoCompactionContext(config.getTaskContext())
);
+
LOG.info(
"Submitted a compactionTask[%s] for %s segments",
taskId,
@@ -222,7 +283,7 @@
}
}
- return makeStats(numSubmittedTasks, iterator);
+ return makeStats(currentRunAutoCompactionSnapshotBuilders, numSubmittedTasks, iterator);
}
private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext)
@@ -234,29 +295,174 @@
return newContext;
}
- private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator)
+ /**
+ * This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
+ * no compaction task is schedule in this run. Currently, this method does not update compaction statistics
+ * (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
+ * on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
+ * remains the same as the previous snapshot).
+ */
+ private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
+ Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
+ )
{
+ Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
+ for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+ final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+ AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
+ if (previousSnapshot != null) {
+ autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
+ autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
+ }
+ }
+
+ Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
+ currentRunAutoCompactionSnapshotBuilders,
+ AutoCompactionSnapshot.Builder::build
+ );
+ // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
+ autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+ }
+
+ private CoordinatorStats makeStats(
+ Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
+ int numCompactionTasks,
+ CompactionSegmentIterator iterator
+ )
+ {
+ final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
final CoordinatorStats stats = new CoordinatorStats();
stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
- totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
- totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
- entry -> {
- final String dataSource = entry.getKey();
- final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
- stats.addToDataSourceStat(
- TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
- dataSource,
- totalSizeOfSegmentsAwaitingCompaction
- );
- }
- );
+
+ // Iterate through all the remaining segments in the iterator.
+ // As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates
+ // the statistic to the AwaitingCompaction statistics
+ while (iterator.hasNext()) {
+ final List<DataSegment> segmentsToCompact = iterator.next();
+ if (!segmentsToCompact.isEmpty()) {
+ final String dataSourceName = segmentsToCompact.get(0).getDataSource();
+ AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+ snapshotBuilder.incrementBytesAwaitingCompaction(
+ segmentsToCompact.stream()
+ .mapToLong(DataSegment::getSize)
+ .sum()
+ );
+ snapshotBuilder.incrementIntervalCountAwaitingCompaction(
+ segmentsToCompact.stream()
+ .map(DataSegment::getInterval)
+ .distinct()
+ .count()
+ );
+ snapshotBuilder.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
+ }
+ }
+
+ // Statistics of all segments considered compacted after this run
+ Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
+ // Statistics of all segments considered skipped after this run
+ Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
+
+ for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+ final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+ final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
+ CompactionStatistics dataSourceCompactedStatistics = allCompactedStatistics.get(dataSource);
+ CompactionStatistics dataSourceSkippedStatistics = allSkippedStatistics.get(dataSource);
+
+ if (dataSourceCompactedStatistics != null) {
+ builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
+ builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
+ builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
+ }
+
+ if (dataSourceSkippedStatistics != null) {
+ builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
+ builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
+ builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
+ }
+
+ // Build the complete snapshot for the datasource
+ AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
+ currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
+
+ // Use the complete snapshot to emits metrics
+ stats.addToDataSourceStat(
+ TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+ dataSource,
+ autoCompactionSnapshot.getBytesAwaitingCompaction()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+ dataSource,
+ autoCompactionSnapshot.getSegmentCountAwaitingCompaction()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+ dataSource,
+ autoCompactionSnapshot.getIntervalCountAwaitingCompaction()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+ dataSource,
+ autoCompactionSnapshot.getBytesCompacted()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+ dataSource,
+ autoCompactionSnapshot.getSegmentCountCompacted()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+ dataSource,
+ autoCompactionSnapshot.getIntervalCountCompacted()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_SIZE_OF_SEGMENTS_SKIPPED,
+ dataSource,
+ autoCompactionSnapshot.getBytesSkipped()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_COUNT_OF_SEGMENTS_SKIPPED,
+ dataSource,
+ autoCompactionSnapshot.getSegmentCountSkipped()
+ );
+ stats.addToDataSourceStat(
+ TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED,
+ dataSource,
+ autoCompactionSnapshot.getIntervalCountSkipped()
+ );
+ }
+
+ // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
+ autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+
return stats;
}
- @SuppressWarnings("deprecation") // Intentionally using boxing get() to return null if dataSource is unknown
@Nullable
public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
{
- return totalSizesOfSegmentsAwaitingCompactionPerDataSource.get(dataSource);
+ AutoCompactionSnapshot autoCompactionSnapshot = autoCompactionSnapshotPerDataSource.get().get(dataSource);
+ if (autoCompactionSnapshot == null) {
+ return null;
+ }
+ return autoCompactionSnapshot.getBytesAwaitingCompaction();
+ }
+
+ @Nullable
+ public AutoCompactionSnapshot getAutoCompactionSnapshot(String dataSource)
+ {
+ return autoCompactionSnapshotPerDataSource.get().get(dataSource);
+ }
+
+ public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
+ {
+ return autoCompactionSnapshotPerDataSource.get();
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
index 0e692f2..64f5c16 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
@@ -19,11 +19,12 @@
package org.apache.druid.server.coordinator.duty;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.timeline.DataSegment;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
/**
* Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
@@ -31,10 +32,20 @@
*/
public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>
{
- long UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE = -1L;
/**
- * Return a map of (dataSource, total size of remaining segments) for all dataSources.
- * This method should consider all segments except the segments returned by {@link #next()}.
+ * Return a map of dataSourceName to CompactionStatistics.
+ * This method returns the aggregated statistics of segments that was already compacted and does not need to be compacted
+ * again. Hence, segment that were not returned by the {@link Iterator#next()} becuase it does not needs compaction.
+ * Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
*/
- Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes();
+ Map<String, CompactionStatistics> totalCompactedStatistics();
+
+ /**
+ * Return a map of dataSourceName to CompactionStatistics.
+ * This method returns the aggregated statistics of segments that was skipped as it cannot be compacted.
+ * Hence, segment that were not returned by the {@link Iterator#next()} becuase it cannot be compacted.
+ * Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
+ */
+ Map<String, CompactionStatistics> totalSkippedStatistics();
+
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 658017a..70bfb04 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -301,8 +301,33 @@
)
);
+ emitter.emit(
+ new ServiceMetricEvent.Builder().build(
+ "compactTask/maxSlot/count",
+ stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+ )
+ );
+
+ emitter.emit(
+ new ServiceMetricEvent.Builder().build(
+ "compactTask/availableSlot/count",
+ stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+ )
+ );
+
stats.forEachDataSourceStat(
- "segmentsWaitCompact",
+ CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("segment/waitCompact/bytes", count)
+ );
+ }
+ );
+
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
@@ -312,6 +337,83 @@
}
);
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("interval/waitCompact/count", count)
+ );
+ }
+ );
+
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_SIZE_OF_SEGMENTS_SKIPPED,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("segment/skipCompact/bytes", count)
+ );
+ }
+ );
+
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_COUNT_OF_SEGMENTS_SKIPPED,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("segment/skipCompact/count", count)
+ );
+ }
+ );
+
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("interval/skipCompact/count", count)
+ );
+ }
+ );
+
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("segment/compacted/bytes", count)
+ );
+ }
+ );
+
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("segment/compacted/count", count)
+ );
+ }
+ );
+
+ stats.forEachDataSourceStat(
+ CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+ (final String dataSource, final long count) -> {
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .build("interval/compacted/count", count)
+ );
+ }
+ );
+
// Emit segment metrics
params.getUsedSegmentsTimelinesPerDataSource().forEach(
(String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index f335455..c03c4b4 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -22,9 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.Maps;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -35,6 +33,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -51,6 +50,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -68,7 +68,8 @@
private final ObjectMapper objectMapper;
private final Map<String, DataSourceCompactionConfig> compactionConfigs;
- private final Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
+ private final Map<String, CompactionStatistics> compactedSegments = new HashMap<>();
+ private final Map<String, CompactionStatistics> skippedSegments = new HashMap<>();
// dataSource -> intervalToFind
// searchIntervals keeps track of the current state of which interval should be considered to search segments to
@@ -88,7 +89,6 @@
{
this.objectMapper = objectMapper;
this.compactionConfigs = compactionConfigs;
- this.dataSources = dataSources;
this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size());
dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, DataSegment> timeline) -> {
@@ -112,27 +112,15 @@
}
@Override
- public Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes()
+ public Map<String, CompactionStatistics> totalCompactedStatistics()
{
- final Object2LongOpenHashMap<String> resultMap = new Object2LongOpenHashMap<>();
- resultMap.defaultReturnValue(UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE);
- for (QueueEntry entry : queue) {
- final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(entry.getDataSource());
- final Interval interval = new Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
+ return compactedSegments;
+ }
- final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(interval);
-
- long size = 0;
- for (DataSegment segment : FluentIterable
- .from(holders)
- .transformAndConcat(TimelineObjectHolder::getObject)
- .transform(PartitionChunk::getObject)) {
- size += segment.getSize();
- }
-
- resultMap.put(entry.getDataSource(), size);
- }
- return resultMap;
+ @Override
+ public Map<String, CompactionStatistics> totalSkippedStatistics()
+ {
+ return skippedSegments;
}
@Override
@@ -159,6 +147,7 @@
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
final String dataSource = resultSegments.get(0).getDataSource();
+
updateQueue(dataSource, compactionConfigs.get(dataSource));
return resultSegments;
@@ -181,6 +170,7 @@
}
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(
+ dataSourceName,
compactibleTimelineObjectHolderCursor,
config
);
@@ -336,6 +326,7 @@
* @return segments to compact
*/
private SegmentsToCompact findSegmentsToCompact(
+ final String dataSourceName,
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
final DataSourceCompactionConfig config
)
@@ -355,7 +346,13 @@
if (isCompactibleSize && needsCompaction) {
return candidates;
} else {
- if (!isCompactibleSize) {
+ if (!needsCompaction) {
+ // Collect statistic for segments that is already compacted
+ collectSegmentStatistics(compactedSegments, dataSourceName, candidates);
+ } else {
+ // Collect statistic for segments that is skipped
+ // Note that if segments does not need compaction then we do not double count here
+ collectSegmentStatistics(skippedSegments, dataSourceName, candidates);
log.warn(
"total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next interval.",
@@ -374,6 +371,20 @@
return new SegmentsToCompact();
}
+ private void collectSegmentStatistics(
+ Map<String, CompactionStatistics> statisticsMap,
+ String dataSourceName,
+ SegmentsToCompact segments)
+ {
+ CompactionStatistics statistics = statisticsMap.computeIfAbsent(
+ dataSourceName,
+ v -> CompactionStatistics.initializeCompactionStatistics()
+ );
+ statistics.incrementCompactedByte(segments.getTotalSize());
+ statistics.incrementCompactedIntervals(segments.getNumberOfIntervals());
+ statistics.incrementCompactedSegments(segments.getNumberOfSegments());
+ }
+
/**
* Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
*
@@ -563,6 +574,16 @@
return totalSize;
}
+ private long getNumberOfSegments()
+ {
+ return segments.size();
+ }
+
+ private long getNumberOfIntervals()
+ {
+ return segments.stream().map(DataSegment::getInterval).distinct().count();
+ }
+
@Override
public String toString()
{
diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
index fd4cf4f..8c99514 100644
--- a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
@@ -20,9 +20,11 @@
package org.apache.druid.server.http;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter;
@@ -34,6 +36,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.util.Collection;
@Path("/druid/coordinator/v1/compaction")
public class CompactionResource
@@ -76,4 +79,25 @@
return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
}
}
+
+ @GET
+ @Path("/status")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response getCompactionSnapshotForDataSource(
+ @QueryParam("dataSource") String dataSource
+ )
+ {
+ final Collection<AutoCompactionSnapshot> snapshots;
+ if (dataSource == null || dataSource.isEmpty()) {
+ snapshots = coordinator.getAutoCompactionSnapshot().values();
+ } else {
+ AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
+ if (autoCompactionSnapshot == null) {
+ return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+ }
+ snapshots = ImmutableList.of(autoCompactionSnapshot);
+ }
+ return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build();
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
new file mode 100644
index 0000000..9415a81
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AutoCompactionSnapshotTest
+{
+ @Test
+ public void testAutoCompactionSnapshotBuilder()
+ {
+ final String expectedDataSource = "data";
+ final AutoCompactionSnapshot.AutoCompactionScheduleStatus expectedStatus = AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING;
+ AutoCompactionSnapshot.Builder builder = new AutoCompactionSnapshot.Builder(expectedDataSource, expectedStatus);
+
+ // Increment every stats twice
+ for (int i = 0; i < 2; i++) {
+ builder.incrementIntervalCountSkipped(13);
+ builder.incrementBytesSkipped(13);
+ builder.incrementSegmentCountSkipped(13);
+
+ builder.incrementIntervalCountCompacted(13);
+ builder.incrementBytesCompacted(13);
+ builder.incrementSegmentCountCompacted(13);
+
+ builder.incrementIntervalCountAwaitingCompaction(13);
+ builder.incrementBytesAwaitingCompaction(13);
+ builder.incrementSegmentCountAwaitingCompaction(13);
+ }
+
+ AutoCompactionSnapshot actual = builder.build();
+
+ Assert.assertNotNull(actual);
+ Assert.assertEquals(26, actual.getSegmentCountSkipped());
+ Assert.assertEquals(26, actual.getIntervalCountSkipped());
+ Assert.assertEquals(26, actual.getBytesSkipped());
+ Assert.assertEquals(26, actual.getBytesCompacted());
+ Assert.assertEquals(26, actual.getIntervalCountCompacted());
+ Assert.assertEquals(26, actual.getSegmentCountCompacted());
+ Assert.assertEquals(26, actual.getBytesAwaitingCompaction());
+ Assert.assertEquals(26, actual.getIntervalCountAwaitingCompaction());
+ Assert.assertEquals(26, actual.getSegmentCountAwaitingCompaction());
+ Assert.assertEquals(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, actual.getScheduleStatus());
+ Assert.assertEquals(expectedDataSource, actual.getDataSource());
+
+ AutoCompactionSnapshot expected = new AutoCompactionSnapshot(
+ expectedDataSource,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ 26,
+ 26,
+ 26,
+ 26,
+ 26,
+ 26,
+ 26,
+ 26,
+ 26
+ );
+ Assert.assertEquals(expected, actual);
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 91aebdc..2051231 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -49,6 +49,7 @@
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats;
@@ -98,6 +99,10 @@
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String DATA_SOURCE_PREFIX = "dataSource_";
+ // Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
+ private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
+ private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
+ private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
@@ -105,7 +110,7 @@
final MutableInt nextRangePartitionBoundary = new MutableInt(0);
return ImmutableList.of(
new Object[]{
- new DynamicPartitionsSpec(300000, null),
+ new DynamicPartitionsSpec(300000, Long.MAX_VALUE),
(BiFunction<Integer, Integer, ShardSpec>) NumberedShardSpec::new
},
new Object[]{
@@ -270,12 +275,363 @@
assertLastSegmentNotCompacted(compactSegments);
}
+ @Test
+ public void testMakeStats()
+ {
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+ // Before any compaction, we do not have any snapshot of compactions
+ Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+ Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+ for (int compaction_run_count = 0; compaction_run_count < 11; compaction_run_count++) {
+ assertCompactSegmentStatistics(compactSegments, compaction_run_count);
+ }
+ // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+ final CoordinatorStats stats = doCompactSegments(compactSegments);
+ Assert.assertEquals(
+ 0,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+ for (int i = 0; i < 3; i++) {
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ DATA_SOURCE_PREFIX + i,
+ 0,
+ TOTAL_BYTE_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE / 2,
+ 0
+ );
+ }
+
+ // Run auto compaction without any dataSource in the compaction config
+ // Should still populate the result of everything fully compacted
+ doCompactSegments(compactSegments, new ArrayList<>());
+ Assert.assertEquals(
+ 0,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+ for (int i = 0; i < 3; i++) {
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED,
+ DATA_SOURCE_PREFIX + i,
+ 0,
+ TOTAL_BYTE_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE / 2,
+ 0
+ );
+ }
+
+ assertLastSegmentNotCompacted(compactSegments);
+ }
+
+ @Test
+ public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIntervals()
+ {
+ // Only test and validate for one datasource for simplicity.
+ // This dataSource has three intervals already compacted (3 intervals, 120 byte, 12 segments already compacted)
+ String dataSourceName = DATA_SOURCE_PREFIX + 1;
+ List<DataSegment> segments = new ArrayList<>();
+ for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
+ for (int k = 0; k < 4; k++) {
+ DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
+ DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
+ if (j == 3) {
+ // Make two intervals on this day compacted (two compacted intervals back-to-back)
+ beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
+ afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
+ }
+ if (j == 1) {
+ // Make one interval on this day compacted
+ afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
+ }
+ segments.add(beforeNoon);
+ segments.add(afterNoon);
+ }
+ }
+
+ dataSources = DataSourcesSnapshot
+ .fromUsedSegments(segments, ImmutableMap.of())
+ .getUsedSegmentsTimelinesPerDataSource();
+
+
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+ // Before any compaction, we do not have any snapshot of compactions
+ Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+ Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+ // 3 intervals, 120 byte, 12 segments already compacted before the run
+ for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+ // Do a cycle of auto compaction which creates one compaction task
+ final CoordinatorStats stats = doCompactSegments(compactSegments);
+ Assert.assertEquals(
+ 1,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ dataSourceName,
+ TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
+ 120 + 40 * (compaction_run_count + 1),
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
+ 3 + (compaction_run_count + 1),
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
+ // 12 segments was compressed before any auto compaction
+ // 4 segments was compressed in this run of auto compaction
+ // Each previous auto compaction run resulted in 2 compacted segments (4 segments compacted into 2 segments)
+ 12 + 4 + 2 * (compaction_run_count),
+ 0
+ );
+ }
+
+ // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+ final CoordinatorStats stats = doCompactSegments(compactSegments);
+ Assert.assertEquals(
+ 0,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ dataSourceName,
+ 0,
+ TOTAL_BYTE_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE,
+ 0,
+ 0,
+ // 12 segments was compressed before any auto compaction
+ // 32 segments needs compaction which is now compacted into 16 segments (4 segments compacted into 2 segments each run)
+ 12 + 16,
+ 0
+ );
+ }
+
+ @Test
+ public void testMakeStatsForDataSourceWithSkipped()
+ {
+ // Only test and validate for one datasource for simplicity.
+ // This dataSource has three intervals skipped (3 intervals, 1200 byte, 12 segments skipped by auto compaction)
+ // Note that these segment used to be 10 bytes each in other tests, we are increasing it to 100 bytes each here
+ // so that they will be skipped by the auto compaction.
+ String dataSourceName = DATA_SOURCE_PREFIX + 1;
+ List<DataSegment> segments = new ArrayList<>();
+ for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
+ for (int k = 0; k < 4; k++) {
+ DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
+ DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
+ if (j == 3) {
+ // Make two intervals on this day skipped (two skipped intervals back-to-back)
+ beforeNoon = beforeNoon.withSize(100);
+ afterNoon = afterNoon.withSize(100);
+ }
+ if (j == 1) {
+ // Make one interval on this day skipped
+ afterNoon = afterNoon.withSize(100);
+ }
+ segments.add(beforeNoon);
+ segments.add(afterNoon);
+ }
+ }
+
+ dataSources = DataSourcesSnapshot
+ .fromUsedSegments(segments, ImmutableMap.of())
+ .getUsedSegmentsTimelinesPerDataSource();
+
+
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+ // Before any compaction, we do not have any snapshot of compactions
+ Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+ Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+ // 3 intervals, 1200 byte (each segment is 100 bytes), 12 segments will be skipped by auto compaction
+ for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+ // Do a cycle of auto compaction which creates one compaction task
+ final CoordinatorStats stats = doCompactSegments(compactSegments);
+ Assert.assertEquals(
+ 1,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ dataSourceName,
+ // Minus 120 bytes accounting for the three skipped segments' original size
+ TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
+ 40 * (compaction_run_count + 1),
+ 1200,
+ TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
+ (compaction_run_count + 1),
+ 3,
+ TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
+ 4 + 2 * (compaction_run_count),
+ 12
+ );
+ }
+
+ // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+ final CoordinatorStats stats = doCompactSegments(compactSegments);
+ Assert.assertEquals(
+ 0,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ dataSourceName,
+ 0,
+ // Minus 120 bytes accounting for the three skipped segments' original size
+ TOTAL_BYTE_PER_DATASOURCE - 120,
+ 1200,
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE - 3,
+ 3,
+ 0,
+ 16,
+ 12
+ );
+ }
+
+ private void verifySnapshot(
+ CompactSegments compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
+ String dataSourceName,
+ long expectedByteCountAwaitingCompaction,
+ long expectedByteCountCompressed,
+ long expectedByteCountSkipped,
+ long expectedIntervalCountAwaitingCompaction,
+ long expectedIntervalCountCompressed,
+ long expectedIntervalCountSkipped,
+ long expectedSegmentCountAwaitingCompaction,
+ long expectedSegmentCountCompressed,
+ long expectedSegmentCountSkipped
+ )
+ {
+ Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+ AutoCompactionSnapshot snapshot = autoCompactionSnapshots.get(dataSourceName);
+ Assert.assertEquals(dataSourceName, snapshot.getDataSource());
+ Assert.assertEquals(scheduleStatus, snapshot.getScheduleStatus());
+ Assert.assertEquals(expectedByteCountAwaitingCompaction, snapshot.getBytesAwaitingCompaction());
+ Assert.assertEquals(expectedByteCountCompressed, snapshot.getBytesCompacted());
+ Assert.assertEquals(expectedByteCountSkipped, snapshot.getBytesSkipped());
+ Assert.assertEquals(expectedIntervalCountAwaitingCompaction, snapshot.getIntervalCountAwaitingCompaction());
+ Assert.assertEquals(expectedIntervalCountCompressed, snapshot.getIntervalCountCompacted());
+ Assert.assertEquals(expectedIntervalCountSkipped, snapshot.getIntervalCountSkipped());
+ Assert.assertEquals(expectedSegmentCountAwaitingCompaction, snapshot.getSegmentCountAwaitingCompaction());
+ Assert.assertEquals(expectedSegmentCountCompressed, snapshot.getSegmentCountCompacted());
+ Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
+ }
+
+ private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compaction_run_count)
+ {
+ for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
+ // One compaction task triggered
+ final CoordinatorStats stats = doCompactSegments(compactSegments);
+ Assert.assertEquals(
+ 1,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+ Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+ // Note: Subsequent compaction run after the dataSource was compacted will show different numbers than
+ // on the run it was compacted. For example, in a compaction run, if a dataSource had 4 segments compacted,
+ // on the same compaction run the segment compressed count will be 4 but on subsequent run it might be 2
+ // (assuming the 4 segments was compacted into 2 segments).
+ for (int i = 0; i <= dataSourceIndex; i++) {
+ // dataSource up to dataSourceIndex now compacted. Check that the stats match the expectedAfterCompaction values
+ // This verify that dataSource which got slot to compact has correct statistics
+ if (i != dataSourceIndex) {
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ DATA_SOURCE_PREFIX + i,
+ TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
+ 40 * (compaction_run_count + 1),
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
+ (compaction_run_count + 1),
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
+ 2 * (compaction_run_count + 1),
+ 0
+ );
+ } else {
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ DATA_SOURCE_PREFIX + i,
+ TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
+ 40 * (compaction_run_count + 1),
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
+ (compaction_run_count + 1),
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
+ 2 * compaction_run_count + 4,
+ 0
+ );
+ }
+ }
+ for (int i = dataSourceIndex + 1; i < 3; i++) {
+ // dataSource after dataSourceIndex is not yet compacted. Check that the stats match the expectedBeforeCompaction values
+ // This verify that dataSource that ran out of slot has correct statistics
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ DATA_SOURCE_PREFIX + i,
+ TOTAL_BYTE_PER_DATASOURCE - 40 * compaction_run_count,
+ 40 * compaction_run_count,
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE - compaction_run_count,
+ compaction_run_count,
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE - 4 * compaction_run_count,
+ 2 * compaction_run_count,
+ 0
+ );
+ }
+ }
+ }
+
private CoordinatorStats doCompactSegments(CompactSegments compactSegments)
{
+ return doCompactSegments(compactSegments, createCompactionConfigs());
+ }
+
+ private CoordinatorStats doCompactSegments(CompactSegments compactSegments, List<DataSourceCompactionConfig> compactionConfigs)
+ {
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
- .withCompactionConfig(CoordinatorCompactionConfig.from(createCompactionConfigs()))
+ .withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs))
.build();
return compactSegments.run(params).getCoordinatorStats();
}
@@ -300,9 +656,9 @@
// If expectedRemainingSegments is positive, we check how many dataSources have the segments waiting for
// compaction.
long numDataSourceOfExpectedRemainingSegments = stats
- .getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION)
+ .getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING)
.stream()
- .mapToLong(ds -> stats.getDataSourceStat(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION, ds))
+ .mapToLong(ds -> stats.getDataSourceStat(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING, ds))
.filter(stat -> stat == expectedRemainingSegments)
.count();
Assert.assertEquals(i + 1, numDataSourceOfExpectedRemainingSegments);
@@ -310,7 +666,7 @@
// Otherwise, we check how many dataSources are in the coordinator stats.
Assert.assertEquals(
2 - i,
- stats.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION).size()
+ stats.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING).size()
);
}
}
diff --git a/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
new file mode 100644
index 0000000..9e7cfd6
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.Map;
+
+public class CompactionResourceTest
+{
+ private DruidCoordinator mock;
+ private String dataSourceName = "datasource_1";
+ private AutoCompactionSnapshot expectedSnapshot = new AutoCompactionSnapshot(
+ dataSourceName,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1
+ );
+
+ @Before
+ public void setUp()
+ {
+ mock = EasyMock.createStrictMock(DruidCoordinator.class);
+ }
+
+ @After
+ public void tearDown()
+ {
+ EasyMock.verify(mock);
+ }
+
+ @Test
+ public void testGetCompactionSnapshotForDataSourceWithEmptyQueryParameter()
+ {
+ Map<String, AutoCompactionSnapshot> expected = ImmutableMap.of(
+ dataSourceName,
+ expectedSnapshot
+ );
+
+ EasyMock.expect(mock.getAutoCompactionSnapshot()).andReturn(expected).once();
+ EasyMock.replay(mock);
+
+ final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource("");
+ Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
+ Assert.assertEquals(200, response.getStatus());
+ }
+
+ @Test
+ public void testGetCompactionSnapshotForDataSourceWithNullQueryParameter()
+ {
+ String dataSourceName = "datasource_1";
+ Map<String, AutoCompactionSnapshot> expected = ImmutableMap.of(
+ dataSourceName,
+ expectedSnapshot
+ );
+
+ EasyMock.expect(mock.getAutoCompactionSnapshot()).andReturn(expected).once();
+ EasyMock.replay(mock);
+
+ final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(null);
+ Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
+ Assert.assertEquals(200, response.getStatus());
+ }
+
+ @Test
+ public void testGetCompactionSnapshotForDataSourceWithValidQueryParameter()
+ {
+ String dataSourceName = "datasource_1";
+
+ EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(expectedSnapshot).once();
+ EasyMock.replay(mock);
+
+ final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
+ Assert.assertEquals(ImmutableMap.of("latestStatus", ImmutableList.of(expectedSnapshot)), response.getEntity());
+ Assert.assertEquals(200, response.getStatus());
+ }
+
+ @Test
+ public void testGetCompactionSnapshotForDataSourceWithInvalidQueryParameter()
+ {
+ String dataSourceName = "invalid_datasource";
+
+ EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(null).once();
+ EasyMock.replay(mock);
+
+ final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
+ Assert.assertEquals(400, response.getStatus());
+ }
+}