Auto-compaction snapshot status API (#10371)

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* fix when not all compacted segments are iterated

* add unit tests

* add unit tests

* add unit tests

* add unit tests

* add unit tests

* add unit tests

* add some tests to make code cov happy

* address comments

* address comments

* address comments

* address comments

* make code coverage happy

* address comments

* address comments

* address comments

* address comments
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
new file mode 100644
index 0000000..e39fde9
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private long bytesAwaitingCompaction;
+  @JsonProperty
+  private long bytesCompacted;
+  @JsonProperty
+  private long bytesSkipped;
+  @JsonProperty
+  private long segmentCountAwaitingCompaction;
+  @JsonProperty
+  private long segmentCountCompacted;
+  @JsonProperty
+  private long segmentCountSkipped;
+  @JsonProperty
+  private long intervalCountAwaitingCompaction;
+  @JsonProperty
+  private long intervalCountCompacted;
+  @JsonProperty
+  private long intervalCountSkipped;
+
+  @JsonCreator
+  public AutoCompactionSnapshot(
+      @JsonProperty @NotNull String dataSource,
+      @JsonProperty @NotNull AutoCompactionScheduleStatus scheduleStatus,
+      @JsonProperty long bytesAwaitingCompaction,
+      @JsonProperty long bytesCompacted,
+      @JsonProperty long bytesSkipped,
+      @JsonProperty long segmentCountAwaitingCompaction,
+      @JsonProperty long segmentCountCompacted,
+      @JsonProperty long segmentCountSkipped,
+      @JsonProperty long intervalCountAwaitingCompaction,
+      @JsonProperty long intervalCountCompacted,
+      @JsonProperty long intervalCountSkipped
+  )
+  {
+    this.dataSource = dataSource;
+    this.scheduleStatus = scheduleStatus;
+    this.bytesAwaitingCompaction = bytesAwaitingCompaction;
+    this.bytesCompacted = bytesCompacted;
+    this.bytesSkipped = bytesSkipped;
+    this.segmentCountAwaitingCompaction = segmentCountAwaitingCompaction;
+    this.segmentCountCompacted = segmentCountCompacted;
+    this.segmentCountSkipped = segmentCountSkipped;
+    this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction;
+    this.intervalCountCompacted = intervalCountCompacted;
+    this.intervalCountSkipped = intervalCountSkipped;
+  }
+
+  @NotNull
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @NotNull
+  public AutoCompactionScheduleStatus getScheduleStatus()
+  {
+    return scheduleStatus;
+  }
+
+  public long getBytesAwaitingCompaction()
+  {
+    return bytesAwaitingCompaction;
+  }
+
+  public long getBytesCompacted()
+  {
+    return bytesCompacted;
+  }
+
+  public long getBytesSkipped()
+  {
+    return bytesSkipped;
+  }
+
+  public long getSegmentCountAwaitingCompaction()
+  {
+    return segmentCountAwaitingCompaction;
+  }
+
+  public long getSegmentCountCompacted()
+  {
+    return segmentCountCompacted;
+  }
+
+  public long getSegmentCountSkipped()
+  {
+    return segmentCountSkipped;
+  }
+
+  public long getIntervalCountAwaitingCompaction()
+  {
+    return intervalCountAwaitingCompaction;
+  }
+
+  public long getIntervalCountCompacted()
+  {
+    return intervalCountCompacted;
+  }
+
+  public long getIntervalCountSkipped()
+  {
+    return intervalCountSkipped;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AutoCompactionSnapshot that = (AutoCompactionSnapshot) o;
+    return bytesAwaitingCompaction == that.bytesAwaitingCompaction &&
+           bytesCompacted == that.bytesCompacted &&
+           bytesSkipped == that.bytesSkipped &&
+           segmentCountAwaitingCompaction == that.segmentCountAwaitingCompaction &&
+           segmentCountCompacted == that.segmentCountCompacted &&
+           segmentCountSkipped == that.segmentCountSkipped &&
+           intervalCountAwaitingCompaction == that.intervalCountAwaitingCompaction &&
+           intervalCountCompacted == that.intervalCountCompacted &&
+           intervalCountSkipped == that.intervalCountSkipped &&
+           dataSource.equals(that.dataSource) &&
+           scheduleStatus == that.scheduleStatus;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        dataSource,
+        scheduleStatus,
+        bytesAwaitingCompaction,
+        bytesCompacted,
+        bytesSkipped,
+        segmentCountAwaitingCompaction,
+        segmentCountCompacted,
+        segmentCountSkipped,
+        intervalCountAwaitingCompaction,
+        intervalCountCompacted,
+        intervalCountSkipped
+    );
+  }
+
+  public static class Builder
+  {
+    private String dataSource;
+    private AutoCompactionScheduleStatus scheduleStatus;
+    private long bytesAwaitingCompaction;
+    private long bytesCompacted;
+    private long bytesSkipped;
+    private long segmentCountAwaitingCompaction;
+    private long segmentCountCompacted;
+    private long segmentCountSkipped;
+    private long intervalCountAwaitingCompaction;
+    private long intervalCountCompacted;
+    private long intervalCountSkipped;
+
+
+    public Builder(
+        @NotNull String dataSource,
+        @NotNull AutoCompactionScheduleStatus scheduleStatus
+    )
+    {
+      this.dataSource = dataSource;
+      this.scheduleStatus = scheduleStatus;
+      this.bytesAwaitingCompaction = 0;
+      this.bytesCompacted = 0;
+      this.bytesSkipped = 0;
+      this.segmentCountAwaitingCompaction = 0;
+      this.segmentCountCompacted = 0;
+      this.segmentCountSkipped = 0;
+      this.intervalCountAwaitingCompaction = 0;
+      this.intervalCountCompacted = 0;
+      this.intervalCountSkipped = 0;
+    }
+
+    public Builder incrementBytesAwaitingCompaction(long incrementValue)
+    {
+      this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue;
+      return this;
+    }
+
+    public Builder incrementBytesCompacted(long incrementValue)
+    {
+      this.bytesCompacted = this.bytesCompacted + incrementValue;
+      return this;
+    }
+
+    public Builder incrementSegmentCountAwaitingCompaction(long incrementValue)
+    {
+      this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue;
+      return this;
+    }
+
+    public Builder incrementSegmentCountCompacted(long incrementValue)
+    {
+      this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
+      return this;
+    }
+
+    public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
+    {
+      this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
+      return this;
+    }
+
+    public Builder incrementIntervalCountCompacted(long incrementValue)
+    {
+      this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
+      return this;
+    }
+
+    public Builder incrementBytesSkipped(long incrementValue)
+    {
+      this.bytesSkipped = this.bytesSkipped + incrementValue;
+      return this;
+    }
+
+    public Builder incrementSegmentCountSkipped(long incrementValue)
+    {
+      this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
+      return this;
+    }
+
+    public Builder incrementIntervalCountSkipped(long incrementValue)
+    {
+      this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
+      return this;
+    }
+
+    public AutoCompactionSnapshot build()
+    {
+      if (dataSource == null || dataSource.isEmpty()) {
+        throw new ISE("Invalid dataSource name");
+      }
+      if (scheduleStatus == null) {
+        throw new ISE("scheduleStatus cannot be null");
+      }
+      return new AutoCompactionSnapshot(
+          dataSource,
+          scheduleStatus,
+          bytesAwaitingCompaction,
+          bytesCompacted,
+          bytesSkipped,
+          segmentCountAwaitingCompaction,
+          segmentCountCompacted,
+          segmentCountSkipped,
+          intervalCountAwaitingCompaction,
+          intervalCountCompacted,
+          intervalCountSkipped
+      );
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java b/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
new file mode 100644
index 0000000..676630e
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+public class CompactionStatistics
+{
+  private long byteSum;
+  private long segmentNumberCountSum;
+  private long segmentIntervalCountSum;
+
+  public CompactionStatistics(
+      long byteSum,
+      long segmentNumberCountSum,
+      long segmentIntervalCountSum
+  )
+  {
+    this.byteSum = byteSum;
+    this.segmentNumberCountSum = segmentNumberCountSum;
+    this.segmentIntervalCountSum = segmentIntervalCountSum;
+  }
+
+  public static CompactionStatistics initializeCompactionStatistics()
+  {
+    return new CompactionStatistics(0, 0, 0);
+  }
+
+  public long getByteSum()
+  {
+    return byteSum;
+  }
+
+  public long getSegmentNumberCountSum()
+  {
+    return segmentNumberCountSum;
+  }
+
+  public long getSegmentIntervalCountSum()
+  {
+    return segmentIntervalCountSum;
+  }
+
+  public void incrementCompactedByte(long incrementValue)
+  {
+    byteSum = byteSum + incrementValue;
+  }
+
+  public void incrementCompactedSegments(long incrementValue)
+  {
+    segmentNumberCountSum = segmentNumberCountSum + incrementValue;
+  }
+
+  public void incrementCompactedIntervals(long incrementValue)
+  {
+    segmentIntervalCountSum = segmentIntervalCountSum + incrementValue;
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index a406761..3300e59 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -367,6 +367,17 @@
     return compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
   }
 
+  @Nullable
+  public AutoCompactionSnapshot getAutoCompactionSnapshotForDataSource(String dataSource)
+  {
+    return compactSegments.getAutoCompactionSnapshot(dataSource);
+  }
+
+  public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
+  {
+    return compactSegments.getAutoCompactionSnapshot();
+  }
+
   public CoordinatorDynamicConfig getDynamicConfigs()
   {
     return CoordinatorDynamicConfig.current(configManager);
@@ -589,7 +600,11 @@
       }
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
-        ScheduledExecutors.scheduleWithFixedDelay(
+        // CompactSegmentsDuty can takes a non trival amount of time to complete.
+        // Hence, we schedule at fixed rate to make sure the other tasks still run at approximately every
+        // config.getCoordinatorIndexingPeriod() period. Note that cautious should be taken
+        // if setting config.getCoordinatorIndexingPeriod() lower than the default value.
+        ScheduledExecutors.scheduleAtFixedRate(
             exec,
             config.getCoordinatorStartDelay(),
             dutiesRunnable.rhs,
@@ -657,8 +672,9 @@
   {
     List<CoordinatorDuty> duties = new ArrayList<>();
     duties.add(new LogUsedSegments());
-    duties.addAll(makeCompactSegmentsDuty());
     duties.addAll(indexingServiceDuties);
+    // CompactSegmentsDuty should be the last duty as it can take a long time to complete
+    duties.addAll(makeCompactSegmentsDuty());
 
     log.debug(
         "Done making indexing service duties %s",
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 876f4b6..50b31ca 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -22,7 +22,6 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -30,6 +29,8 @@
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -43,13 +44,28 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class CompactSegments implements CoordinatorDuty
 {
   static final String COMPACTION_TASK_COUNT = "compactTaskCount";
-  static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION = "segmentSizeWaitCompact";
+  static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot";
+  static final String MAX_COMPACTION_TASK_SLOT = "maxCompactionTaskSlot";
+
+  static final String TOTAL_SIZE_OF_SEGMENTS_SKIPPED = "segmentSizeSkippedCompact";
+  static final String TOTAL_COUNT_OF_SEGMENTS_SKIPPED = "segmentCountSkippedCompact";
+  static final String TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED = "segmentIntervalSkippedCompact";
+
+  static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING = "segmentSizeWaitCompact";
+  static final String TOTAL_COUNT_OF_SEGMENTS_AWAITING = "segmentCountWaitCompact";
+  static final String TOTAL_INTERVAL_OF_SEGMENTS_AWAITING = "segmentIntervalWaitCompact";
+
+  static final String TOTAL_SIZE_OF_SEGMENTS_COMPACTED = "segmentSizeCompacted";
+  static final String TOTAL_COUNT_OF_SEGMENTS_COMPACTED = "segmentCountCompacted";
+  static final String TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED = "segmentIntervalCompacted";
 
   /** Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. */
   public static final String COMPACTION_TASK_TYPE = "compact";
@@ -61,7 +77,9 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  // This variable is updated by the Coordinator thread executing duties and
+  // read by HTTP threads processing Coordinator API calls.
+  private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();
 
   @Inject
   public CompactSegments(
@@ -71,6 +89,7 @@
   {
     this.policy = new NewestSegmentFirstPolicy(objectMapper);
     this.indexingServiceClient = indexingServiceClient;
+    autoCompactionSnapshotPerDataSource.set(new HashMap<>());
   }
 
   @Override
@@ -80,12 +99,12 @@
 
     final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
     final CoordinatorStats stats = new CoordinatorStats();
-
+    final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
+    List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
+    updateAutoCompactionSnapshot(compactionConfigList, currentRunAutoCompactionSnapshotBuilders);
     if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
       Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources =
           params.getUsedSegmentsTimelinesPerDataSource();
-      List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
-
       if (compactionConfigList != null && !compactionConfigList.isEmpty()) {
         Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
             .stream()
@@ -136,16 +155,23 @@
             numAvailableCompactionTaskSlots,
             compactionTaskCapacity
         );
+        stats.addToGlobalStat(AVAILABLE_COMPACTION_TASK_SLOT, numAvailableCompactionTaskSlots);
+        stats.addToGlobalStat(MAX_COMPACTION_TASK_SLOT, compactionTaskCapacity);
+
         if (numAvailableCompactionTaskSlots > 0) {
-          stats.accumulate(doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator));
+          stats.accumulate(
+              doRun(compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, iterator)
+          );
         } else {
-          stats.accumulate(makeStats(0, iterator));
+          stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator));
         }
       } else {
         LOG.info("compactionConfig is empty. Skip.");
+        updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
       }
     } else {
       LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction");
+      updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
     }
 
     return params.buildFromExisting()
@@ -172,6 +198,33 @@
     }
   }
 
+  private void updateAutoCompactionSnapshot(
+      List<DataSourceCompactionConfig> compactionConfigList,
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
+  {
+
+    Set<String> enabledDatasources = compactionConfigList.stream()
+                                                         .map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
+                                                         .collect(Collectors.toSet());
+    // Update AutoCompactionScheduleStatus for dataSource that now has auto compaction disabled
+    for (Map.Entry<String, AutoCompactionSnapshot> snapshot : autoCompactionSnapshotPerDataSource.get().entrySet()) {
+      if (!enabledDatasources.contains(snapshot.getKey())) {
+        currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+            snapshot.getKey(),
+            k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
+        );
+      }
+    }
+
+    // Create and Update snapshot for dataSource that has auto compaction enabled
+    for (String compactionConfigDataSource : enabledDatasources) {
+      currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+          compactionConfigDataSource,
+          k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+      );
+    }
+  }
+
   private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses)
   {
     return taskStatuses
@@ -189,6 +242,7 @@
 
   private CoordinatorStats doRun(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
       int numAvailableCompactionTaskSlots,
       CompactionSegmentIterator iterator
   )
@@ -200,6 +254,12 @@
 
       if (!segmentsToCompact.isEmpty()) {
         final String dataSourceName = segmentsToCompact.get(0).getDataSource();
+        // As these segments will be compacted, we will aggregates the statistic to the Compacted statistics
+        AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+        snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
+        snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
+        snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
+
         final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
         // make tuningConfig
         final String taskId = indexingServiceClient.compactSegments(
@@ -209,6 +269,7 @@
             ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
             newAutoCompactionContext(config.getTaskContext())
         );
+
         LOG.info(
             "Submitted a compactionTask[%s] for %s segments",
             taskId,
@@ -222,7 +283,7 @@
       }
     }
 
-    return makeStats(numSubmittedTasks, iterator);
+    return makeStats(currentRunAutoCompactionSnapshotBuilders, numSubmittedTasks, iterator);
   }
 
   private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext)
@@ -234,29 +295,174 @@
     return newContext;
   }
 
-  private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator)
+  /**
+   * This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
+   * no compaction task is schedule in this run. Currently, this method does not update compaction statistics
+   * (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
+   * on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
+   * remains the same as the previous snapshot).
+   */
+  private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
+  )
   {
+    Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
+    for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+      final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+      AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
+      if (previousSnapshot != null) {
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
+        autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
+      }
+    }
+
+    Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
+        currentRunAutoCompactionSnapshotBuilders,
+        AutoCompactionSnapshot.Builder::build
+    );
+    // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
+    autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+  }
+
+  private CoordinatorStats makeStats(
+      Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
+      int numCompactionTasks,
+      CompactionSegmentIterator iterator
+  )
+  {
+    final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Iterate through all the remaining segments in the iterator.
+    // As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates
+    // the statistic to the AwaitingCompaction statistics
+    while (iterator.hasNext()) {
+      final List<DataSegment> segmentsToCompact = iterator.next();
+      if (!segmentsToCompact.isEmpty()) {
+        final String dataSourceName = segmentsToCompact.get(0).getDataSource();
+        AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
+        snapshotBuilder.incrementBytesAwaitingCompaction(
+            segmentsToCompact.stream()
+                             .mapToLong(DataSegment::getSize)
+                             .sum()
+        );
+        snapshotBuilder.incrementIntervalCountAwaitingCompaction(
+            segmentsToCompact.stream()
+                             .map(DataSegment::getInterval)
+                             .distinct()
+                             .count()
+        );
+        snapshotBuilder.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
+      }
+    }
+
+    // Statistics of all segments considered compacted after this run
+    Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
+    // Statistics of all segments considered skipped after this run
+    Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
+
+    for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+      final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+      final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
+      CompactionStatistics dataSourceCompactedStatistics = allCompactedStatistics.get(dataSource);
+      CompactionStatistics dataSourceSkippedStatistics = allSkippedStatistics.get(dataSource);
+
+      if (dataSourceCompactedStatistics != null) {
+        builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
+        builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
+        builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
+      }
+
+      if (dataSourceSkippedStatistics != null) {
+        builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
+        builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
+        builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
+      }
+
+      // Build the complete snapshot for the datasource
+      AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
+      currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
+
+      // Use the complete snapshot to emits metrics
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+          dataSource,
+          autoCompactionSnapshot.getBytesAwaitingCompaction()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+          dataSource,
+          autoCompactionSnapshot.getSegmentCountAwaitingCompaction()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+          dataSource,
+          autoCompactionSnapshot.getIntervalCountAwaitingCompaction()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          autoCompactionSnapshot.getBytesCompacted()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          autoCompactionSnapshot.getSegmentCountCompacted()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          autoCompactionSnapshot.getIntervalCountCompacted()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_SKIPPED,
+          dataSource,
+          autoCompactionSnapshot.getBytesSkipped()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_SKIPPED,
+          dataSource,
+          autoCompactionSnapshot.getSegmentCountSkipped()
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED,
+          dataSource,
+          autoCompactionSnapshot.getIntervalCountSkipped()
+      );
+    }
+
+    // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
+    autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+
     return stats;
   }
 
-  @SuppressWarnings("deprecation") // Intentionally using boxing get() to return null if dataSource is unknown
   @Nullable
   public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
   {
-    return totalSizesOfSegmentsAwaitingCompactionPerDataSource.get(dataSource);
+    AutoCompactionSnapshot autoCompactionSnapshot = autoCompactionSnapshotPerDataSource.get().get(dataSource);
+    if (autoCompactionSnapshot == null) {
+      return null;
+    }
+    return autoCompactionSnapshot.getBytesAwaitingCompaction();
+  }
+
+  @Nullable
+  public AutoCompactionSnapshot getAutoCompactionSnapshot(String dataSource)
+  {
+    return autoCompactionSnapshotPerDataSource.get().get(dataSource);
+  }
+
+  public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
+  {
+    return autoCompactionSnapshotPerDataSource.get();
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
index 0e692f2..64f5c16 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
@@ -19,11 +19,12 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
@@ -31,10 +32,20 @@
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>
 {
-  long UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE = -1L;
   /**
-   * Return a map of (dataSource, total size of remaining segments) for all dataSources.
-   * This method should consider all segments except the segments returned by {@link #next()}.
+   * Return a map of dataSourceName to CompactionStatistics.
+   * This method returns the aggregated statistics of segments that was already compacted and does not need to be compacted
+   * again. Hence, segment that were not returned by the {@link Iterator#next()} becuase it does not needs compaction.
+   * Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
    */
-  Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes();
+  Map<String, CompactionStatistics> totalCompactedStatistics();
+
+  /**
+   * Return a map of dataSourceName to CompactionStatistics.
+   * This method returns the aggregated statistics of segments that was skipped as it cannot be compacted.
+   * Hence, segment that were not returned by the {@link Iterator#next()} becuase it cannot be compacted.
+   * Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
+   */
+  Map<String, CompactionStatistics> totalSkippedStatistics();
+
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 658017a..70bfb04 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -301,8 +301,33 @@
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compactTask/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compactTask/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
     stats.forEachDataSourceStat(
-        "segmentsWaitCompact",
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/bytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
         (final String dataSource, final long count) -> {
           emitter.emit(
               new ServiceMetricEvent.Builder()
@@ -312,6 +337,83 @@
         }
     );
 
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("interval/waitCompact/count", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_SKIPPED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/skipCompact/bytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_SKIPPED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/skipCompact/count", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("interval/skipCompact/count", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/bytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/count", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("interval/compacted/count", count)
+          );
+        }
+    );
+
     // Emit segment metrics
     params.getUsedSegmentsTimelinesPerDataSource().forEach(
         (String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index f335455..c03c4b4 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -22,9 +22,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Maps;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -35,6 +33,7 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -51,6 +50,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +68,8 @@
 
   private final ObjectMapper objectMapper;
   private final Map<String, DataSourceCompactionConfig> compactionConfigs;
-  private final Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
+  private final Map<String, CompactionStatistics> compactedSegments = new HashMap<>();
+  private final Map<String, CompactionStatistics> skippedSegments = new HashMap<>();
 
   // dataSource -> intervalToFind
   // searchIntervals keeps track of the current state of which interval should be considered to search segments to
@@ -88,7 +89,6 @@
   {
     this.objectMapper = objectMapper;
     this.compactionConfigs = compactionConfigs;
-    this.dataSources = dataSources;
     this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size());
 
     dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, DataSegment> timeline) -> {
@@ -112,27 +112,15 @@
   }
 
   @Override
-  public Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes()
+  public Map<String, CompactionStatistics> totalCompactedStatistics()
   {
-    final Object2LongOpenHashMap<String> resultMap = new Object2LongOpenHashMap<>();
-    resultMap.defaultReturnValue(UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE);
-    for (QueueEntry entry : queue) {
-      final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(entry.getDataSource());
-      final Interval interval = new Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
+    return compactedSegments;
+  }
 
-      final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(interval);
-
-      long size = 0;
-      for (DataSegment segment : FluentIterable
-          .from(holders)
-          .transformAndConcat(TimelineObjectHolder::getObject)
-          .transform(PartitionChunk::getObject)) {
-        size += segment.getSize();
-      }
-
-      resultMap.put(entry.getDataSource(), size);
-    }
-    return resultMap;
+  @Override
+  public Map<String, CompactionStatistics> totalSkippedStatistics()
+  {
+    return skippedSegments;
   }
 
   @Override
@@ -159,6 +147,7 @@
     Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
 
     final String dataSource = resultSegments.get(0).getDataSource();
+
     updateQueue(dataSource, compactionConfigs.get(dataSource));
 
     return resultSegments;
@@ -181,6 +170,7 @@
     }
 
     final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(
+        dataSourceName,
         compactibleTimelineObjectHolderCursor,
         config
     );
@@ -336,6 +326,7 @@
    * @return segments to compact
    */
   private SegmentsToCompact findSegmentsToCompact(
+      final String dataSourceName,
       final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
@@ -355,7 +346,13 @@
         if (isCompactibleSize && needsCompaction) {
           return candidates;
         } else {
-          if (!isCompactibleSize) {
+          if (!needsCompaction) {
+            // Collect statistic for segments that is already compacted
+            collectSegmentStatistics(compactedSegments, dataSourceName, candidates);
+          } else {
+            // Collect statistic for segments that is skipped
+            // Note that if segments does not need compaction then we do not double count here
+            collectSegmentStatistics(skippedSegments, dataSourceName, candidates);
             log.warn(
                 "total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
                 + " Continue to the next interval.",
@@ -374,6 +371,20 @@
     return new SegmentsToCompact();
   }
 
+  private void collectSegmentStatistics(
+      Map<String, CompactionStatistics> statisticsMap,
+      String dataSourceName,
+      SegmentsToCompact segments)
+  {
+    CompactionStatistics statistics = statisticsMap.computeIfAbsent(
+        dataSourceName,
+        v -> CompactionStatistics.initializeCompactionStatistics()
+    );
+    statistics.incrementCompactedByte(segments.getTotalSize());
+    statistics.incrementCompactedIntervals(segments.getNumberOfIntervals());
+    statistics.incrementCompactedSegments(segments.getNumberOfSegments());
+  }
+
   /**
    * Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
    *
@@ -563,6 +574,16 @@
       return totalSize;
     }
 
+    private long getNumberOfSegments()
+    {
+      return segments.size();
+    }
+
+    private long getNumberOfIntervals()
+    {
+      return segments.stream().map(DataSegment::getInterval).distinct().count();
+    }
+
     @Override
     public String toString()
     {
diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
index fd4cf4f..8c99514 100644
--- a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java
@@ -20,9 +20,11 @@
 package org.apache.druid.server.http;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.http.security.ConfigResourceFilter;
 import org.apache.druid.server.http.security.StateResourceFilter;
@@ -34,6 +36,7 @@
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.util.Collection;
 
 @Path("/druid/coordinator/v1/compaction")
 public class CompactionResource
@@ -76,4 +79,25 @@
       return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
     }
   }
+
+  @GET
+  @Path("/status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public Response getCompactionSnapshotForDataSource(
+      @QueryParam("dataSource") String dataSource
+  )
+  {
+    final Collection<AutoCompactionSnapshot> snapshots;
+    if (dataSource == null || dataSource.isEmpty()) {
+      snapshots = coordinator.getAutoCompactionSnapshot().values();
+    } else {
+      AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
+      if (autoCompactionSnapshot == null) {
+        return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
+      }
+      snapshots = ImmutableList.of(autoCompactionSnapshot);
+    }
+    return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build();
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
new file mode 100644
index 0000000..9415a81
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AutoCompactionSnapshotTest
+{
+  @Test
+  public void testAutoCompactionSnapshotBuilder()
+  {
+    final String expectedDataSource = "data";
+    final AutoCompactionSnapshot.AutoCompactionScheduleStatus expectedStatus = AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING;
+    AutoCompactionSnapshot.Builder builder = new AutoCompactionSnapshot.Builder(expectedDataSource, expectedStatus);
+
+    // Increment every stats twice
+    for (int i = 0; i < 2; i++) {
+      builder.incrementIntervalCountSkipped(13);
+      builder.incrementBytesSkipped(13);
+      builder.incrementSegmentCountSkipped(13);
+
+      builder.incrementIntervalCountCompacted(13);
+      builder.incrementBytesCompacted(13);
+      builder.incrementSegmentCountCompacted(13);
+
+      builder.incrementIntervalCountAwaitingCompaction(13);
+      builder.incrementBytesAwaitingCompaction(13);
+      builder.incrementSegmentCountAwaitingCompaction(13);
+    }
+
+    AutoCompactionSnapshot actual = builder.build();
+
+    Assert.assertNotNull(actual);
+    Assert.assertEquals(26, actual.getSegmentCountSkipped());
+    Assert.assertEquals(26, actual.getIntervalCountSkipped());
+    Assert.assertEquals(26, actual.getBytesSkipped());
+    Assert.assertEquals(26, actual.getBytesCompacted());
+    Assert.assertEquals(26, actual.getIntervalCountCompacted());
+    Assert.assertEquals(26, actual.getSegmentCountCompacted());
+    Assert.assertEquals(26, actual.getBytesAwaitingCompaction());
+    Assert.assertEquals(26, actual.getIntervalCountAwaitingCompaction());
+    Assert.assertEquals(26, actual.getSegmentCountAwaitingCompaction());
+    Assert.assertEquals(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, actual.getScheduleStatus());
+    Assert.assertEquals(expectedDataSource, actual.getDataSource());
+
+    AutoCompactionSnapshot expected = new AutoCompactionSnapshot(
+        expectedDataSource,
+        AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+        26,
+        26,
+        26,
+        26,
+        26,
+        26,
+        26,
+        26,
+        26
+    );
+    Assert.assertEquals(expected, actual);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 91aebdc..2051231 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -49,6 +49,7 @@
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
 import org.apache.druid.server.coordinator.CoordinatorStats;
@@ -98,6 +99,10 @@
 {
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
   private static final String DATA_SOURCE_PREFIX = "dataSource_";
+  // Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
+  private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
+  private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
+  private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
 
   @Parameterized.Parameters(name = "{0}")
   public static Collection<Object[]> constructorFeeder()
@@ -105,7 +110,7 @@
     final MutableInt nextRangePartitionBoundary = new MutableInt(0);
     return ImmutableList.of(
         new Object[]{
-            new DynamicPartitionsSpec(300000, null),
+            new DynamicPartitionsSpec(300000, Long.MAX_VALUE),
             (BiFunction<Integer, Integer, ShardSpec>) NumberedShardSpec::new
         },
         new Object[]{
@@ -270,12 +275,363 @@
     assertLastSegmentNotCompacted(compactSegments);
   }
 
+  @Test
+  public void testMakeStats()
+  {
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+    // Before any compaction, we do not have any snapshot of compactions
+    Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+    Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+    for (int compaction_run_count = 0; compaction_run_count < 11; compaction_run_count++) {
+      assertCompactSegmentStatistics(compactSegments, compaction_run_count);
+    }
+    // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+    final CoordinatorStats stats = doCompactSegments(compactSegments);
+    Assert.assertEquals(
+        0,
+        stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+    );
+    for (int i = 0; i < 3; i++) {
+      verifySnapshot(
+          compactSegments,
+          AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+          DATA_SOURCE_PREFIX + i,
+          0,
+          TOTAL_BYTE_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_INTERVAL_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_SEGMENT_PER_DATASOURCE / 2,
+          0
+      );
+    }
+
+    // Run auto compaction without any dataSource in the compaction config
+    // Should still populate the result of everything fully compacted
+    doCompactSegments(compactSegments, new ArrayList<>());
+    Assert.assertEquals(
+        0,
+        stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+    );
+    for (int i = 0; i < 3; i++) {
+      verifySnapshot(
+          compactSegments,
+          AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED,
+          DATA_SOURCE_PREFIX + i,
+          0,
+          TOTAL_BYTE_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_INTERVAL_PER_DATASOURCE,
+          0,
+          0,
+          TOTAL_SEGMENT_PER_DATASOURCE / 2,
+          0
+      );
+    }
+
+    assertLastSegmentNotCompacted(compactSegments);
+  }
+
+  @Test
+  public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIntervals()
+  {
+    // Only test and validate for one datasource for simplicity.
+    // This dataSource has three intervals already compacted (3 intervals, 120 byte, 12 segments already compacted)
+    String dataSourceName = DATA_SOURCE_PREFIX + 1;
+    List<DataSegment> segments = new ArrayList<>();
+    for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
+      for (int k = 0; k < 4; k++) {
+        DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
+        DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
+        if (j == 3) {
+          // Make two intervals on this day compacted (two compacted intervals back-to-back)
+          beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
+          afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
+        }
+        if (j == 1) {
+          // Make one interval on this day compacted
+          afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
+        }
+        segments.add(beforeNoon);
+        segments.add(afterNoon);
+      }
+    }
+
+    dataSources = DataSourcesSnapshot
+        .fromUsedSegments(segments, ImmutableMap.of())
+        .getUsedSegmentsTimelinesPerDataSource();
+
+
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+    // Before any compaction, we do not have any snapshot of compactions
+    Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+    Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+    // 3 intervals, 120 byte, 12 segments already compacted before the run
+    for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+      // Do a cycle of auto compaction which creates one compaction task
+      final CoordinatorStats stats = doCompactSegments(compactSegments);
+      Assert.assertEquals(
+          1,
+          stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+      );
+
+      verifySnapshot(
+          compactSegments,
+          AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+          dataSourceName,
+          TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
+          120 + 40 * (compaction_run_count + 1),
+          0,
+          TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
+          3 + (compaction_run_count + 1),
+          0,
+          TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
+          // 12 segments was compressed before any auto compaction
+          // 4 segments was compressed in this run of auto compaction
+          // Each previous auto compaction run resulted in 2 compacted segments (4 segments compacted into 2 segments)
+          12 + 4 + 2 * (compaction_run_count),
+          0
+      );
+    }
+
+    // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+    final CoordinatorStats stats = doCompactSegments(compactSegments);
+    Assert.assertEquals(
+        0,
+        stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+    );
+    verifySnapshot(
+        compactSegments,
+        AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+        dataSourceName,
+        0,
+        TOTAL_BYTE_PER_DATASOURCE,
+        0,
+        0,
+        TOTAL_INTERVAL_PER_DATASOURCE,
+        0,
+        0,
+        // 12 segments was compressed before any auto compaction
+        // 32 segments needs compaction which is now compacted into 16 segments (4 segments compacted into 2 segments each run)
+        12 + 16,
+        0
+    );
+  }
+
+  @Test
+  public void testMakeStatsForDataSourceWithSkipped()
+  {
+    // Only test and validate for one datasource for simplicity.
+    // This dataSource has three intervals skipped (3 intervals, 1200 byte, 12 segments skipped by auto compaction)
+    // Note that these segment used to be 10 bytes each in other tests, we are increasing it to 100 bytes each here
+    // so that they will be skipped by the auto compaction.
+    String dataSourceName = DATA_SOURCE_PREFIX + 1;
+    List<DataSegment> segments = new ArrayList<>();
+    for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
+      for (int k = 0; k < 4; k++) {
+        DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
+        DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
+        if (j == 3) {
+          // Make two intervals on this day skipped (two skipped intervals back-to-back)
+          beforeNoon = beforeNoon.withSize(100);
+          afterNoon = afterNoon.withSize(100);
+        }
+        if (j == 1) {
+          // Make one interval on this day skipped
+          afterNoon = afterNoon.withSize(100);
+        }
+        segments.add(beforeNoon);
+        segments.add(afterNoon);
+      }
+    }
+
+    dataSources = DataSourcesSnapshot
+        .fromUsedSegments(segments, ImmutableMap.of())
+        .getUsedSegmentsTimelinesPerDataSource();
+
+
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+    // Before any compaction, we do not have any snapshot of compactions
+    Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+    Assert.assertEquals(0, autoCompactionSnapshots.size());
+
+    // 3 intervals, 1200 byte (each segment is 100 bytes), 12 segments will be skipped by auto compaction
+    for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+      // Do a cycle of auto compaction which creates one compaction task
+      final CoordinatorStats stats = doCompactSegments(compactSegments);
+      Assert.assertEquals(
+          1,
+          stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+      );
+
+      verifySnapshot(
+          compactSegments,
+          AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+          dataSourceName,
+          // Minus 120 bytes accounting for the three skipped segments' original size
+          TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
+          40 * (compaction_run_count + 1),
+          1200,
+          TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
+          (compaction_run_count + 1),
+          3,
+          TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
+          4 + 2 * (compaction_run_count),
+          12
+      );
+    }
+
+    // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
+    final CoordinatorStats stats = doCompactSegments(compactSegments);
+    Assert.assertEquals(
+        0,
+        stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+    );
+    verifySnapshot(
+        compactSegments,
+        AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+        dataSourceName,
+        0,
+        // Minus 120 bytes accounting for the three skipped segments' original size
+        TOTAL_BYTE_PER_DATASOURCE - 120,
+        1200,
+        0,
+        TOTAL_INTERVAL_PER_DATASOURCE - 3,
+        3,
+        0,
+        16,
+        12
+    );
+  }
+
+  private void verifySnapshot(
+      CompactSegments compactSegments,
+      AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
+      String dataSourceName,
+      long expectedByteCountAwaitingCompaction,
+      long expectedByteCountCompressed,
+      long expectedByteCountSkipped,
+      long expectedIntervalCountAwaitingCompaction,
+      long expectedIntervalCountCompressed,
+      long expectedIntervalCountSkipped,
+      long expectedSegmentCountAwaitingCompaction,
+      long expectedSegmentCountCompressed,
+      long expectedSegmentCountSkipped
+  )
+  {
+    Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+    AutoCompactionSnapshot snapshot = autoCompactionSnapshots.get(dataSourceName);
+    Assert.assertEquals(dataSourceName, snapshot.getDataSource());
+    Assert.assertEquals(scheduleStatus, snapshot.getScheduleStatus());
+    Assert.assertEquals(expectedByteCountAwaitingCompaction, snapshot.getBytesAwaitingCompaction());
+    Assert.assertEquals(expectedByteCountCompressed, snapshot.getBytesCompacted());
+    Assert.assertEquals(expectedByteCountSkipped, snapshot.getBytesSkipped());
+    Assert.assertEquals(expectedIntervalCountAwaitingCompaction, snapshot.getIntervalCountAwaitingCompaction());
+    Assert.assertEquals(expectedIntervalCountCompressed, snapshot.getIntervalCountCompacted());
+    Assert.assertEquals(expectedIntervalCountSkipped, snapshot.getIntervalCountSkipped());
+    Assert.assertEquals(expectedSegmentCountAwaitingCompaction, snapshot.getSegmentCountAwaitingCompaction());
+    Assert.assertEquals(expectedSegmentCountCompressed, snapshot.getSegmentCountCompacted());
+    Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
+  }
+
+  private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compaction_run_count)
+  {
+    for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
+      // One compaction task triggered
+      final CoordinatorStats stats = doCompactSegments(compactSegments);
+      Assert.assertEquals(
+          1,
+          stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
+      );
+      Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
+      // Note: Subsequent compaction run after the dataSource was compacted will show different numbers than
+      // on the run it was compacted. For example, in a compaction run, if a dataSource had 4 segments compacted,
+      // on the same compaction run the segment compressed count will be 4 but on subsequent run it might be 2
+      // (assuming the 4 segments was compacted into 2 segments).
+      for (int i = 0; i <= dataSourceIndex; i++) {
+        // dataSource up to dataSourceIndex now compacted. Check that the stats match the expectedAfterCompaction values
+        // This verify that dataSource which got slot to compact has correct statistics
+        if (i != dataSourceIndex) {
+          verifySnapshot(
+              compactSegments,
+              AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+              DATA_SOURCE_PREFIX + i,
+              TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
+              40 * (compaction_run_count + 1),
+              0,
+              TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
+              (compaction_run_count + 1),
+              0,
+              TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
+              2 * (compaction_run_count + 1),
+              0
+          );
+        } else {
+          verifySnapshot(
+              compactSegments,
+              AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+              DATA_SOURCE_PREFIX + i,
+              TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
+              40 * (compaction_run_count + 1),
+              0,
+              TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
+              (compaction_run_count + 1),
+              0,
+              TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
+              2 * compaction_run_count + 4,
+              0
+          );
+        }
+      }
+      for (int i = dataSourceIndex + 1; i < 3; i++) {
+        // dataSource after dataSourceIndex is not yet compacted. Check that the stats match the expectedBeforeCompaction values
+        // This verify that dataSource that ran out of slot has correct statistics
+        verifySnapshot(
+            compactSegments,
+            AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+            DATA_SOURCE_PREFIX + i,
+            TOTAL_BYTE_PER_DATASOURCE - 40 * compaction_run_count,
+            40 * compaction_run_count,
+            0,
+            TOTAL_INTERVAL_PER_DATASOURCE - compaction_run_count,
+            compaction_run_count,
+            0,
+            TOTAL_SEGMENT_PER_DATASOURCE - 4 * compaction_run_count,
+            2 * compaction_run_count,
+            0
+        );
+      }
+    }
+  }
+
   private CoordinatorStats doCompactSegments(CompactSegments compactSegments)
   {
+    return doCompactSegments(compactSegments, createCompactionConfigs());
+  }
+
+  private CoordinatorStats doCompactSegments(CompactSegments compactSegments, List<DataSourceCompactionConfig> compactionConfigs)
+  {
     DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
         .newBuilder()
         .withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
-        .withCompactionConfig(CoordinatorCompactionConfig.from(createCompactionConfigs()))
+        .withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs))
         .build();
     return compactSegments.run(params).getCoordinatorStats();
   }
@@ -300,9 +656,9 @@
         // If expectedRemainingSegments is positive, we check how many dataSources have the segments waiting for
         // compaction.
         long numDataSourceOfExpectedRemainingSegments = stats
-            .getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION)
+            .getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING)
             .stream()
-            .mapToLong(ds -> stats.getDataSourceStat(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION, ds))
+            .mapToLong(ds -> stats.getDataSourceStat(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING, ds))
             .filter(stat -> stat == expectedRemainingSegments)
             .count();
         Assert.assertEquals(i + 1, numDataSourceOfExpectedRemainingSegments);
@@ -310,7 +666,7 @@
         // Otherwise, we check how many dataSources are in the coordinator stats.
         Assert.assertEquals(
             2 - i,
-            stats.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION).size()
+            stats.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING).size()
         );
       }
     }
diff --git a/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
new file mode 100644
index 0000000..9e7cfd6
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/http/CompactionResourceTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.Map;
+
+public class CompactionResourceTest
+{
+  private DruidCoordinator mock;
+  private String dataSourceName = "datasource_1";
+  private AutoCompactionSnapshot expectedSnapshot = new AutoCompactionSnapshot(
+      dataSourceName,
+      AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+      1,
+      1,
+      1,
+      1,
+      1,
+      1,
+      1,
+      1,
+      1
+  );
+
+  @Before
+  public void setUp()
+  {
+    mock = EasyMock.createStrictMock(DruidCoordinator.class);
+  }
+
+  @After
+  public void tearDown()
+  {
+    EasyMock.verify(mock);
+  }
+
+  @Test
+  public void testGetCompactionSnapshotForDataSourceWithEmptyQueryParameter()
+  {
+    Map<String, AutoCompactionSnapshot> expected = ImmutableMap.of(
+        dataSourceName,
+        expectedSnapshot
+    );
+
+    EasyMock.expect(mock.getAutoCompactionSnapshot()).andReturn(expected).once();
+    EasyMock.replay(mock);
+
+    final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource("");
+    Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
+    Assert.assertEquals(200, response.getStatus());
+  }
+
+  @Test
+  public void testGetCompactionSnapshotForDataSourceWithNullQueryParameter()
+  {
+    String dataSourceName = "datasource_1";
+    Map<String, AutoCompactionSnapshot> expected = ImmutableMap.of(
+        dataSourceName,
+        expectedSnapshot
+    );
+
+    EasyMock.expect(mock.getAutoCompactionSnapshot()).andReturn(expected).once();
+    EasyMock.replay(mock);
+
+    final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(null);
+    Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
+    Assert.assertEquals(200, response.getStatus());
+  }
+
+  @Test
+  public void testGetCompactionSnapshotForDataSourceWithValidQueryParameter()
+  {
+    String dataSourceName = "datasource_1";
+
+    EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(expectedSnapshot).once();
+    EasyMock.replay(mock);
+
+    final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
+    Assert.assertEquals(ImmutableMap.of("latestStatus", ImmutableList.of(expectedSnapshot)), response.getEntity());
+    Assert.assertEquals(200, response.getStatus());
+  }
+
+  @Test
+  public void testGetCompactionSnapshotForDataSourceWithInvalidQueryParameter()
+  {
+    String dataSourceName = "invalid_datasource";
+
+    EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(null).once();
+    EasyMock.replay(mock);
+
+    final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
+    Assert.assertEquals(400, response.getStatus());
+  }
+}