Add last_compaction_state to sys.segments table (#10413)

* Add is_compacted to sys.segments table

* change is_compacted to last_compaction_state

* fix tests

* fix tests

* address comments
diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java
index e7be885..7fcbec5 100644
--- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java
+++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java
@@ -100,7 +100,7 @@
   /**
    * Stores some configurations of the compaction task which created this segment.
    * This field is filled in the metadata store only when "storeCompactionState" is set true in the context of the
-   * compaction task which is false by default.
+   * task. True by default see {@link org.apache.druid.indexing.common.task.Tasks#DEFAULT_STORE_COMPACTION_STATE}.
    * Also, this field can be pruned in many Druid modules when this class is loaded from the metadata store.
    * See {@link PruneLastCompactionState} for details.
    */
diff --git a/core/src/main/java/org/apache/druid/timeline/PruneLastCompactionState.java b/core/src/main/java/org/apache/druid/timeline/PruneLastCompactionState.java
index 9cbc31d..55c83f4 100644
--- a/core/src/main/java/org/apache/druid/timeline/PruneLastCompactionState.java
+++ b/core/src/main/java/org/apache/druid/timeline/PruneLastCompactionState.java
@@ -33,6 +33,7 @@
  *
  * - In auto compaction of the coordinator, "lastCompactionState" is used to determine whether the given
  *   segment needs further compaction or not.
+ * - In Metadata store information API of the coordinator, "lastCompactionState" is part of the sys.segments table
  * - In parallel indexing, "lastCompactionState" should be serialized and deserialized properly when
  *   the sub tasks report the pushed segments to the supervisor task.
  */
diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 94cfcca..702070d 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -1086,6 +1086,7 @@
 |shardSpec|STRING|The toString of specific `ShardSpec`|
 |dimensions|STRING|The dimensions of the segment|
 |metrics|STRING|The metrics of the segment|
+|last_compaction_state|STRING|The configurations of the compaction task which created this segment. May be null if segment was not created by compaction task.|
 
 For example to retrieve all segments for datasource "wikipedia", use the query:
 
@@ -1107,6 +1108,18 @@
 ORDER BY 2 DESC
 ```
 
+If you want to retrieve segment that was compacted (ANY compaction):
+
+```sql
+SELECT * FROM sys.segments WHERE last_compaction_state is not null
+```
+
+or if you want to retrieve segment that was compacted only by a particular compaction spec (such as that of the auto compaction):
+
+```sql
+SELECT * FROM sys.segments WHERE last_compaction_state == 'SELECT * FROM sys.segments where last_compaction_state = 'CompactionState{partitionsSpec=DynamicPartitionsSpec{maxRowsPerSegment=5000000, maxTotalRows=9223372036854775807}, indexSpec={bitmap={type=roaring, compressRunOnSerialization=true}, dimensionCompression=lz4, metricCompression=lz4, longEncoding=longs, segmentLoader=null}}'
+```
+
 *Caveat:* Note that a segment can be served by more than one stream ingestion tasks or Historical processes, in that case it would have multiple replicas. These replicas are weakly consistent with each other when served by multiple ingestion tasks, until a segment is eventually served by a Historical, at that point the segment is immutable. Broker prefers to query a segment from Historical over an ingestion task. But if a segment has multiple realtime replicas, for e.g.. Kafka index tasks, and one task is slower than other, then the sys.segments query results can vary for the duration of the tasks because only one of the ingestion tasks is queried by the Broker and it is not guaranteed that the same task gets picked every time. The `num_rows` column of segments table can have inconsistent values during this period. There is an open [issue](https://github.com/apache/druid/issues/5915) about this inconsistency with stream ingestion tasks.
 
 #### SERVERS table
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 041f359..62a9f26 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -126,6 +126,8 @@
 
   private static final String TYPE = "compact";
 
+  private static final boolean STORE_COMPACTION_STATE = true;
+
   static {
     Verify.verify(TYPE.equals(CompactSegments.COMPACTION_TASK_TYPE));
   }
@@ -422,6 +424,7 @@
   {
     final Map<String, Object> newContext = new HashMap<>(getContext());
     newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId());
+    newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, STORE_COMPACTION_STATE);
     // Set the priority of the compaction task.
     newContext.put(Tasks.PRIORITY_KEY, getPriority());
     return newContext;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 502eb85..abd5424 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -51,7 +51,7 @@
   public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
   public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
   /**
-   * This context is used in auto compaction. When it is set in the context, the segments created by the task
+   * This context is used in compaction. When it is set in the context, the segments created by the task
    * will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
    * See {@link org.apache.druid.timeline.DataSegment} and {@link
    * org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for more details.
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 3cc99c9..616dd24 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -140,7 +140,6 @@
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
         .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
-        .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true))
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
@@ -153,6 +152,7 @@
           lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
           segment.getShardSpec().getClass()
       );
+      // Expecte compaction state to exist as store compaction state by default
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
@@ -174,7 +174,6 @@
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
         .tuningConfig(newTuningConfig(new HashedPartitionsSpec(null, 3, null), 2, true))
-        .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true))
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
@@ -183,6 +182,7 @@
         compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
     );
     for (DataSegment segment : compactedSegments) {
+      // Expecte compaction state to exist as store compaction state by default
       Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
@@ -205,7 +205,6 @@
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
         .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 2, true))
-        .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true))
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
@@ -214,12 +213,41 @@
         compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
     );
     for (DataSegment segment : compactedSegments) {
+      // Expecte compaction state to exist as store compaction state by default
       Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
 
   @Test
+  public void testRunCompactionStateNotStoreIfContextSetToFalse()
+  {
+    runIndexTask(null, true);
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        getSegmentLoaderFactory(),
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
+        .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, false))
+        .build();
+
+    final Set<DataSegment> compactedSegments = runTask(compactionTask);
+
+    for (DataSegment segment : compactedSegments) {
+      Assert.assertSame(
+          lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
+          segment.getShardSpec().getClass()
+      );
+      // Expecte compaction state to exist as store compaction state by default
+      Assert.assertEquals(null, segment.getLastCompactionState());
+    }
+  }
+
+  @Test
   public void testCompactHashAndDynamicPartitionedSegments()
   {
     runIndexTask(new HashedPartitionsSpec(null, 2, null), false);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 719fba1..08cd765 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -130,6 +130,8 @@
       false,
       0
   );
+
+  // Expecte compaction state to exist after compaction as we store compaction state by default
   private static CompactionState DEFAULT_COMPACTION_STATE;
 
   private static final List<String> TEST_ROWS = ImmutableList.of(
@@ -761,6 +763,9 @@
         null
     );
 
+    // This is a regular index so we need to explicitly add this context to store the CompactionState
+    indexTask.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true);
+
     final Pair<TaskStatus, List<DataSegment>> resultPair = runTask(indexTask);
 
     Assert.assertTrue(resultPair.lhs.isSuccess());
@@ -853,7 +858,6 @@
     final TaskToolbox box = createTaskToolbox(objectMapper, task);
 
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
-    task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true);
     if (task.isReady(box.getTaskActionClient())) {
       if (readyLatchToCountDown != null) {
         readyLatchToCountDown.countDown();
diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index d59b14f..b09e9de 100644
--- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -15,6 +15,7 @@
     "is_overshadowed": 0,
     "shardSpec": "NoneShardSpec",
     "dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]",
-    "metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]"
+    "metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]",
+    "last_compaction_state": null
   }
 ]
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 88d3ff9..6e89577 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -67,7 +67,6 @@
 import org.apache.druid.server.metrics.QueryCountStatsProvider;
 import org.apache.druid.server.router.TieredBrokerConfig;
 import org.apache.druid.sql.guice.SqlModule;
-import org.apache.druid.timeline.PruneLastCompactionState;
 import org.apache.druid.timeline.PruneLoadSpec;
 import org.eclipse.jetty.server.Server;
 
@@ -102,7 +101,6 @@
           binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082);
           binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8282);
           binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true);
-          binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true);
           binder.bind(ResponseContextConfig.class).toInstance(ResponseContextConfig.newConfig(false));
 
           binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 0237b86..16e5c0a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -145,6 +145,7 @@
       .add("shardSpec", ValueType.STRING)
       .add("dimensions", ValueType.STRING)
       .add("metrics", ValueType.STRING)
+      .add("last_compaction_state", ValueType.STRING)
       .build();
 
   static final RowSignature SERVERS_SIGNATURE = RowSignature
@@ -311,7 +312,8 @@
                 val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
                 segment.getShardSpec(),
                 segment.getDimensions(),
-                segment.getMetrics()
+                segment.getMetrics(),
+                segment.getLastCompactionState()
             };
           });
 
@@ -343,7 +345,8 @@
                 IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
                 val.getValue().getSegment().getShardSpec(),
                 val.getValue().getSegment().getDimensions(),
-                val.getValue().getSegment().getMetrics()
+                val.getValue().getSegment().getMetrics(),
+                null // unpublished segments from realtime tasks will not be compacted yet
             };
           });
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index a6db177..f943aaf 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -47,6 +47,7 @@
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -89,6 +90,7 @@
 import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.sql.calcite.util.TestServerInventoryView;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
+import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
@@ -271,8 +273,12 @@
     );
   }
 
+  private final CompactionState expectedCompactionState = new CompactionState(
+      new DynamicPartitionsSpec(null, null),
+      Collections.singletonMap("test", "map")
+  );
 
-  private final DataSegment publishedSegment1 = new DataSegment(
+  private final DataSegment publishedCompactedSegment1 = new DataSegment(
       "wikipedia1",
       Intervals.of("2007/2008"),
       "version1",
@@ -280,10 +286,11 @@
       ImmutableList.of("dim1", "dim2"),
       ImmutableList.of("met1", "met2"),
       null,
+      expectedCompactionState,
       1,
       53000L
   );
-  private final DataSegment publishedSegment2 = new DataSegment(
+  private final DataSegment publishedCompactedSegment2 = new DataSegment(
       "wikipedia2",
       Intervals.of("2008/2009"),
       "version2",
@@ -291,10 +298,11 @@
       ImmutableList.of("dim1", "dim2"),
       ImmutableList.of("met1", "met2"),
       null,
+      expectedCompactionState,
       1,
       83000L
   );
-  private final DataSegment publishedSegment3 = new DataSegment(
+  private final DataSegment publishedUncompactedSegment3 = new DataSegment(
       "wikipedia3",
       Intervals.of("2009/2010"),
       "version3",
@@ -302,6 +310,7 @@
       ImmutableList.of("dim1", "dim2"),
       ImmutableList.of("met1", "met2"),
       null,
+      null,
       1,
       47000L
   );
@@ -495,7 +504,7 @@
     final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
     final List<RelDataTypeField> fields = rowType.getFieldList();
 
-    Assert.assertEquals(16, fields.size());
+    Assert.assertEquals(17, fields.size());
 
     final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
     final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
@@ -518,9 +527,9 @@
   {
     final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper);
     final Set<SegmentWithOvershadowedStatus> publishedSegments = new HashSet<>(Arrays.asList(
-        new SegmentWithOvershadowedStatus(publishedSegment1, true),
-        new SegmentWithOvershadowedStatus(publishedSegment2, false),
-        new SegmentWithOvershadowedStatus(publishedSegment3, false),
+        new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true),
+        new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false),
+        new SegmentWithOvershadowedStatus(publishedUncompactedSegment3, false),
         new SegmentWithOvershadowedStatus(segment1, true),
         new SegmentWithOvershadowedStatus(segment2, false)
     ));
@@ -576,7 +585,8 @@
         1L, //is_published
         1L, //is_available
         0L, //is_realtime
-        1L //is_overshadowed
+        1L, //is_overshadowed
+        null //is_compacted
     );
 
     verifyRow(
@@ -589,7 +599,8 @@
         1L, //is_published
         1L, //is_available
         0L, //is_realtime
-        0L //is_overshadowed
+        0L, //is_overshadowed,
+        null //is_compacted
     );
 
     //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
@@ -603,7 +614,8 @@
         0L, //is_published
         1L, //is_available
         0L, //is_realtime
-        0L //is_overshadowed
+        0L, //is_overshadowed
+        null //is_compacted
     );
 
     verifyRow(
@@ -616,7 +628,8 @@
         0L, //is_published
         1L, //is_available
         1L, //is_realtime
-        0L //is_overshadowed
+        0L, //is_overshadowed
+        null //is_compacted
     );
 
     verifyRow(
@@ -629,10 +642,12 @@
         0L, //is_published
         1L, //is_available
         1L, //is_realtime
-        0L //is_overshadowed
+        0L, //is_overshadowed
+        null //is_compacted
     );
 
     // wikipedia segments are published and unavailable, num_replicas is 0
+    // wikipedia segment 1 and 2 are compacted while 3 are not compacted
     verifyRow(
         rows.get(5),
         "wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1",
@@ -643,7 +658,8 @@
         1L, //is_published
         0L, //is_available
         0L, //is_realtime
-        1L //is_overshadowed
+        1L, //is_overshadowed
+        expectedCompactionState //is_compacted
     );
 
     verifyRow(
@@ -656,7 +672,8 @@
         1L, //is_published
         0L, //is_available
         0L, //is_realtime
-        0L //is_overshadowed
+        0L, //is_overshadowed
+        expectedCompactionState //is_compacted
     );
 
     verifyRow(
@@ -669,7 +686,8 @@
         1L, //is_published
         0L, //is_available
         0L, //is_realtime
-        0L //is_overshadowed
+        0L, //is_overshadowed
+        null //is_compacted
     );
 
     // Verify value types.
@@ -686,7 +704,8 @@
       long isPublished,
       long isAvailable,
       long isRealtime,
-      long isOvershadowed
+      long isOvershadowed,
+      CompactionState compactionState
   )
   {
     Assert.assertEquals(segmentId, row[0].toString());
@@ -703,6 +722,7 @@
     Assert.assertEquals(isAvailable, row[10]);
     Assert.assertEquals(isRealtime, row[11]);
     Assert.assertEquals(isOvershadowed, row[12]);
+    Assert.assertEquals(compactionState, row[16]);
   }
 
   @Test
@@ -1271,6 +1291,8 @@
               expectedClass = SegmentId.class;
             } else if (signature.getColumnName(i).equals("shardSpec")) {
               expectedClass = ShardSpec.class;
+            } else if (signature.getColumnName(i).equals("last_compaction_state")) {
+              expectedClass = CompactionState.class;
             } else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) {
               expectedClass = List.class;
             } else {
diff --git a/website/.spelling b/website/.spelling
index 02333c9..9c3f1a0 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1498,6 +1498,7 @@
 is_published
 is_realtime
 java.sql.Types
+last_compaction_state
 max_size
 num_replicas
 num_rows