compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11510)

* fix compaction status api

* fix checkstyle

* address comment
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index a9ba0d5..9bb6f9c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -48,7 +48,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -108,9 +107,7 @@
 
     final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
     final CoordinatorStats stats = new CoordinatorStats();
-    final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
     List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
-    updateAutoCompactionSnapshot(compactionConfigList, currentRunAutoCompactionSnapshotBuilders);
     if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
       Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources =
           params.getUsedSegmentsTimelinesPerDataSource();
@@ -197,7 +194,7 @@
         );
         stats.addToGlobalStat(AVAILABLE_COMPACTION_TASK_SLOT, numAvailableCompactionTaskSlots);
         stats.addToGlobalStat(MAX_COMPACTION_TASK_SLOT, compactionTaskCapacity);
-
+        final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
         if (numAvailableCompactionTaskSlots > 0) {
           stats.accumulate(
               doRun(
@@ -303,23 +300,6 @@
     return tuningConfig.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
   }
 
-  private void updateAutoCompactionSnapshot(
-      List<DataSourceCompactionConfig> compactionConfigList,
-      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
-  {
-
-    Set<String> enabledDatasources = compactionConfigList.stream()
-                                                         .map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
-                                                         .collect(Collectors.toSet());
-    // Create and Update snapshot for dataSource that has auto compaction enabled
-    for (String compactionConfigDataSource : enabledDatasources) {
-      currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
-          compactionConfigDataSource,
-          k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
-      );
-    }
-  }
-
   private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses)
   {
     return taskStatuses
@@ -351,7 +331,11 @@
       if (!segmentsToCompact.isEmpty()) {
         final String dataSourceName = segmentsToCompact.get(0).getDataSource();
         // As these segments will be compacted, we will aggregates the statistic to the Compacted statistics
-        AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+
+        AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+            dataSourceName,
+            k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+        );
         snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
         snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
         snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
@@ -426,7 +410,10 @@
       final List<DataSegment> segmentsToCompact = iterator.next();
       if (!segmentsToCompact.isEmpty()) {
         final String dataSourceName = segmentsToCompact.get(0).getDataSource();
-        AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+        AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+            dataSourceName,
+            k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+        );
         snapshotBuilder.incrementBytesAwaitingCompaction(
             segmentsToCompact.stream()
                              .mapToLong(DataSegment::getSize)
@@ -444,26 +431,35 @@
 
     // Statistics of all segments considered compacted after this run
     Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
+    for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) {
+      final String dataSource = compactionStatisticsEntry.getKey();
+      final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue();
+      AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+          dataSource,
+          k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+      );
+      builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
+      builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
+      builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
+    }
+
     // Statistics of all segments considered skipped after this run
     Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
+    for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allSkippedStatistics.entrySet()) {
+      final String dataSource = compactionStatisticsEntry.getKey();
+      final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue();
+      AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+          dataSource,
+          k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+      );
+      builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
+      builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
+      builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
+    }
 
     for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
       final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
       final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
-      CompactionStatistics dataSourceCompactedStatistics = allCompactedStatistics.get(dataSource);
-      CompactionStatistics dataSourceSkippedStatistics = allSkippedStatistics.get(dataSource);
-
-      if (dataSourceCompactedStatistics != null) {
-        builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
-        builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
-        builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
-      }
-
-      if (dataSourceSkippedStatistics != null) {
-        builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
-        builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
-        builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
-      }
 
       // Build the complete snapshot for the datasource
       AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 635b2fa..f4cdd9c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -315,7 +315,7 @@
     Assert.assertEquals(0, autoCompactionSnapshots.size());
 
     for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
-      assertCompactSegmentStatistics(compactSegments, compactionRunCount);
+      doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
     }
     // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
     final CoordinatorStats stats = doCompactSegments(compactSegments);
@@ -466,6 +466,73 @@
   }
 
   @Test
+  public void testMakeStatsWithDeactivatedDatasource()
+  {
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+
+    // Before any compaction, we do not have any snapshot of compactions
+    Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+    Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+    for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
+      doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
+    }
+    // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+    final CoordinatorStats stats = doCompactSegments(compactSegments);
+    Assert.assertEquals(
+        0,
+        stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+    );
+    for (int i = 0; i < 3; i++) {
+      verifySnapshot(
+          compactSegments,
+          AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+          DATA_SOURCE_PREFIX + i,
+          0,
+          TOTAL_BYTE_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_INTERVAL_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_SEGMENT_PER_DATASOURCE / 2,
+          0
+      );
+    }
+
+    // Deactivate one datasource (datasource 0 no longer exist in timeline)
+    dataSources.remove(DATA_SOURCE_PREFIX + 0);
+
+    // Test run auto compaction with one datasource deactivated
+    // Snapshot should not contain deactivated datasource
+    doCompactSegments(compactSegments);
+    for (int i = 1; i < 3; i++) {
+      verifySnapshot(
+          compactSegments,
+          AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+          DATA_SOURCE_PREFIX + i,
+          0,
+          TOTAL_BYTE_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_INTERVAL_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_SEGMENT_PER_DATASOURCE / 2,
+          0
+      );
+    }
+
+    Assert.assertEquals(2, compactSegments.getAutoCompactionSnapshot().size());
+    Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 1));
+    Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 2));
+    Assert.assertFalse(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 0));
+  }
+
+  @Test
   public void testMakeStatsForDataSourceWithSkipped()
   {
     // Only test and validate for one datasource for simplicity.
@@ -1025,7 +1092,7 @@
     Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
   }
 
-  private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
+  private void doCompactionAndAssertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
   {
     for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
       // One compaction task triggered