compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11510)
* fix compaction status api
* fix checkstyle
* address comment
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index a9ba0d5..9bb6f9c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -48,7 +48,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -108,9 +107,7 @@
final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
final CoordinatorStats stats = new CoordinatorStats();
- final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
- updateAutoCompactionSnapshot(compactionConfigList, currentRunAutoCompactionSnapshotBuilders);
if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources =
params.getUsedSegmentsTimelinesPerDataSource();
@@ -197,7 +194,7 @@
);
stats.addToGlobalStat(AVAILABLE_COMPACTION_TASK_SLOT, numAvailableCompactionTaskSlots);
stats.addToGlobalStat(MAX_COMPACTION_TASK_SLOT, compactionTaskCapacity);
-
+ final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
if (numAvailableCompactionTaskSlots > 0) {
stats.accumulate(
doRun(
@@ -303,23 +300,6 @@
return tuningConfig.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
}
- private void updateAutoCompactionSnapshot(
- List<DataSourceCompactionConfig> compactionConfigList,
- Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
- {
-
- Set<String> enabledDatasources = compactionConfigList.stream()
- .map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
- .collect(Collectors.toSet());
- // Create and Update snapshot for dataSource that has auto compaction enabled
- for (String compactionConfigDataSource : enabledDatasources) {
- currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
- compactionConfigDataSource,
- k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
- );
- }
- }
-
private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses)
{
return taskStatuses
@@ -351,7 +331,11 @@
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
// As these segments will be compacted, we will aggregates the statistic to the Compacted statistics
- AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+
+ AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+ dataSourceName,
+ k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+ );
snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
@@ -426,7 +410,10 @@
final List<DataSegment> segmentsToCompact = iterator.next();
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
- AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+ AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+ dataSourceName,
+ k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+ );
snapshotBuilder.incrementBytesAwaitingCompaction(
segmentsToCompact.stream()
.mapToLong(DataSegment::getSize)
@@ -444,26 +431,35 @@
// Statistics of all segments considered compacted after this run
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
+ for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) {
+ final String dataSource = compactionStatisticsEntry.getKey();
+ final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue();
+ AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+ dataSource,
+ k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+ );
+ builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
+ builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
+ builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
+ }
+
// Statistics of all segments considered skipped after this run
Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
+ for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allSkippedStatistics.entrySet()) {
+ final String dataSource = compactionStatisticsEntry.getKey();
+ final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue();
+ AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+ dataSource,
+ k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+ );
+ builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
+ builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
+ builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
+ }
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
- CompactionStatistics dataSourceCompactedStatistics = allCompactedStatistics.get(dataSource);
- CompactionStatistics dataSourceSkippedStatistics = allSkippedStatistics.get(dataSource);
-
- if (dataSourceCompactedStatistics != null) {
- builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
- builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
- builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
- }
-
- if (dataSourceSkippedStatistics != null) {
- builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
- builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
- builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
- }
// Build the complete snapshot for the datasource
AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 635b2fa..f4cdd9c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -315,7 +315,7 @@
Assert.assertEquals(0, autoCompactionSnapshots.size());
for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
- assertCompactSegmentStatistics(compactSegments, compactionRunCount);
+ doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
@@ -466,6 +466,73 @@
}
@Test
+ public void testMakeStatsWithDeactivatedDatasource()
+ {
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+
+ // Before any compaction, we do not have any snapshot of compactions
+ Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+ Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+ for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
+ doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
+ }
+ // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+ final CoordinatorStats stats = doCompactSegments(compactSegments);
+ Assert.assertEquals(
+ 0,
+ stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+ );
+ for (int i = 0; i < 3; i++) {
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ DATA_SOURCE_PREFIX + i,
+ 0,
+ TOTAL_BYTE_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE / 2,
+ 0
+ );
+ }
+
+ // Deactivate one datasource (datasource 0 no longer exist in timeline)
+ dataSources.remove(DATA_SOURCE_PREFIX + 0);
+
+ // Test run auto compaction with one datasource deactivated
+ // Snapshot should not contain deactivated datasource
+ doCompactSegments(compactSegments);
+ for (int i = 1; i < 3; i++) {
+ verifySnapshot(
+ compactSegments,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ DATA_SOURCE_PREFIX + i,
+ 0,
+ TOTAL_BYTE_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_INTERVAL_PER_DATASOURCE,
+ 0,
+ 0,
+ TOTAL_SEGMENT_PER_DATASOURCE / 2,
+ 0
+ );
+ }
+
+ Assert.assertEquals(2, compactSegments.getAutoCompactionSnapshot().size());
+ Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 1));
+ Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 2));
+ Assert.assertFalse(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 0));
+ }
+
+ @Test
public void testMakeStatsForDataSourceWithSkipped()
{
// Only test and validate for one datasource for simplicity.
@@ -1025,7 +1092,7 @@
Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
}
- private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
+ private void doCompactionAndAssertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
{
for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
// One compaction task triggered