Add feature to automatically remove audit logs based on retention period (#11084)

* add docs

* add impl

* fix checkstyle

* fix test

* add test

* fix checkstyle

* fix checkstyle

* fix test

* Address comments

* Address comments

* fix spelling

* fix docs
diff --git a/core/src/main/java/org/apache/druid/audit/AuditManager.java b/core/src/main/java/org/apache/druid/audit/AuditManager.java
index 73804d7..f403423 100644
--- a/core/src/main/java/org/apache/druid/audit/AuditManager.java
+++ b/core/src/main/java/org/apache/druid/audit/AuditManager.java
@@ -91,4 +91,12 @@
    * @return list of AuditEntries satisfying the passed parameters
    */
   List<AuditEntry> fetchAuditHistory(String type, int limit);
+
+  /**
+   * Remove audit logs created older than the given timestamp.
+   *
+   * @param timestamp timestamp in milliseconds
+   * @return number of audit logs removed
+   */
+  int removeAuditLogsOlderThan(long timestamp);
 }
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index cd26a9e..d49bf75 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -742,6 +742,15 @@
 |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator process should act like an Overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone Overlord processes. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
 |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord processes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
 
+##### Metadata Management
+
+|Property|Description|Required?|Default|
+|--------|-----------|---------|-------|
+|`druid.coordinator.period.metadataStoreManagementPeriod`|How often to run metadata management tasks in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. |No | `PT1H`|
+|`druid.coordinator.kill.audit.on`| Boolean value for whether to enable automatic deletion of audit logs. If set to true, Coordinator will periodically remove audit logs from the audit table entries in metadata storage.| No | False| 
+|`druid.coordinator.kill.audit.period`| How often to do automatic deletion of audit logs in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.audit.on` is set to True.| No| `P1D`|
+|`druid.coordinator.kill.audit.durationToRetain`| Duration of audit logs to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.audit.on` is set to True.| Yes if `druid.coordinator.kill.audit.on` is set to True| None|
+
 ##### Segment Management
 |Property|Possible Values|Description|Default|
 |--------|---------------|-----------|-------|
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 9a1d2c7..5a2d5fb 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -256,6 +256,8 @@
 |`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
 |`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
 |`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
+|`metadata/kill/audit/count`|Total number of audit logs automatically deleted from metadata store audit table per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on if more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.|
+
 
 If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class
 `org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will have extra information on balancing
diff --git a/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java
new file mode 100644
index 0000000..47cde2b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice.annotations;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ */
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface CoordinatorMetadataStoreManagementDuty
+{
+}
diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
index 1302043..91a5a21 100644
--- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
+++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
@@ -41,6 +41,7 @@
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.Update;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 
 import java.io.IOException;
@@ -229,6 +230,24 @@
     return fetchAuditHistoryLastEntries(null, type, limit);
   }
 
+  @Override
+  public int removeAuditLogsOlderThan(final long timestamp)
+  {
+    DateTime dateTime = DateTimes.utc(timestamp);
+    return dbi.withHandle(
+        handle -> {
+          Update sql = handle.createStatement(
+              StringUtils.format(
+                  "DELETE FROM %s WHERE created_date < :date_time",
+                  getAuditTable()
+              )
+          );
+          return sql.bind("date_time", dateTime.toString())
+                    .execute();
+        }
+    );
+  }
+
   private List<AuditEntry> fetchAuditHistoryLastEntries(final String key, final String type, int limit)
       throws IllegalArgumentException
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 17a3794..7da4fe6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -47,6 +47,7 @@
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
+import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
@@ -150,6 +151,7 @@
   private final ServiceAnnouncer serviceAnnouncer;
   private final DruidNode self;
   private final Set<CoordinatorDuty> indexingServiceDuties;
+  private final Set<CoordinatorDuty> metadataStoreManagementDuties;
   private final BalancerStrategyFactory factory;
   private final LookupCoordinatorManager lookupCoordinatorManager;
   private final DruidLeaderSelector coordLeaderSelector;
@@ -164,6 +166,7 @@
   private ListeningExecutorService balancerExec;
 
   private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
+  private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
   private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
   private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";
 
@@ -182,6 +185,7 @@
       LoadQueueTaskMaster taskMaster,
       ServiceAnnouncer serviceAnnouncer,
       @Self DruidNode self,
+      @CoordinatorMetadataStoreManagementDuty Set<CoordinatorDuty> metadataStoreManagementDuties,
       @CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties,
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
@@ -206,6 +210,7 @@
         self,
         new ConcurrentHashMap<>(),
         indexingServiceDuties,
+        metadataStoreManagementDuties,
         factory,
         lookupCoordinatorManager,
         coordLeaderSelector,
@@ -230,6 +235,7 @@
       DruidNode self,
       ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
       Set<CoordinatorDuty> indexingServiceDuties,
+      Set<CoordinatorDuty> metadataStoreManagementDuties,
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       DruidLeaderSelector coordLeaderSelector,
@@ -255,6 +261,7 @@
     this.serviceAnnouncer = serviceAnnouncer;
     this.self = self;
     this.indexingServiceDuties = indexingServiceDuties;
+    this.metadataStoreManagementDuties = metadataStoreManagementDuties;
 
     this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
 
@@ -665,6 +672,12 @@
             )
         );
       }
+      dutiesRunnables.add(
+          Pair.of(
+              new DutiesRunnable(makeMetadataStoreManagementDuties(), startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+              config.getCoordinatorMetadataStoreManagementPeriod()
+          )
+      );
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
         // CompactSegmentsDuty can takes a non trival amount of time to complete.
@@ -750,6 +763,19 @@
     return ImmutableList.copyOf(duties);
   }
 
+  private List<CoordinatorDuty> makeMetadataStoreManagementDuties()
+  {
+    List<CoordinatorDuty> duties = ImmutableList.<CoordinatorDuty>builder()
+                                                .addAll(metadataStoreManagementDuties)
+                                                .build();
+
+    log.debug(
+        "Done making metadata store management duties %s",
+        duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
+    );
+    return ImmutableList.copyOf(duties);
+  }
+
   private List<CoordinatorDuty> makeCompactSegmentsDuty()
   {
     return ImmutableList.of(compactSegments);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index 130d7e8..933b974 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -39,6 +39,10 @@
   @Default("PT1800s")
   public abstract Duration getCoordinatorIndexingPeriod();
 
+  @Config("druid.coordinator.period.metadataStoreManagementPeriod")
+  @Default("PT1H")
+  public abstract Duration getCoordinatorMetadataStoreManagementPeriod();
+
   @Config("druid.coordinator.kill.period")
   @Default("P1D")
   public abstract Duration getCoordinatorKillPeriod();
@@ -51,6 +55,14 @@
   @Default("0")
   public abstract int getCoordinatorKillMaxSegments();
 
+  @Config("druid.coordinator.kill.audit.period")
+  @Default("P1D")
+  public abstract Duration getCoordinatorAuditKillPeriod();
+
+  @Config("druid.coordinator.kill.audit.durationToRetain")
+  @Default("PT-1s")
+  public abstract Duration getCoordinatorAuditKillDurationToRetain();
+
   @Config("druid.coordinator.load.timeout")
   public Duration getLoadTimeoutDelay()
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
new file mode 100644
index 0000000..651cf6b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+
+public class KillAuditLog implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(KillAuditLog.class);
+
+  private final long period;
+  private final long retainDuration;
+  private long lastKillTime = 0;
+
+  private final AuditManager auditManager;
+
+  @Inject
+  public KillAuditLog(
+      AuditManager auditManager,
+      DruidCoordinatorConfig config
+  )
+  {
+    this.period = config.getCoordinatorAuditKillPeriod().getMillis();
+    Preconditions.checkArgument(
+        this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
+        "coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
+    );
+    this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis();
+    Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0");
+    log.debug(
+        "Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]",
+        this.period,
+        this.retainDuration
+    );
+    this.auditManager = auditManager;
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
+  {
+    if ((lastKillTime + period) < System.currentTimeMillis()) {
+      lastKillTime = System.currentTimeMillis();
+
+      long timestamp = System.currentTimeMillis() - retainDuration;
+      int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
+      ServiceEmitter emitter = params.getEmitter();
+      emitter.emit(
+          new ServiceMetricEvent.Builder().build(
+              "metadata/kill/audit/count",
+              auditRemoved
+          )
+      );
+      log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved);
+    }
+    return params;
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index 429402e..d336e84 100644
--- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -128,15 +128,15 @@
   public void testAuditMetricEventBuilderConfig()
   {
     AuditEntry entry = new AuditEntry(
-            "testKey",
-            "testType",
-            new AuditInfo(
-                    "testAuthor",
-                    "testComment",
-                    "127.0.0.1"
-            ),
-            "testPayload",
-            DateTimes.of("2013-01-01T00:00:00Z")
+        "testKey",
+        "testType",
+        new AuditInfo(
+            "testAuthor",
+            "testComment",
+            "127.0.0.1"
+        ),
+        "testPayload",
+        DateTimes.of("2013-01-01T00:00:00Z")
     );
 
     SQLAuditManager auditManagerWithPayloadAsDimension = new SQLAuditManager(
@@ -257,6 +257,83 @@
   }
 
   @Test(timeout = 60_000L)
+  public void testRemoveAuditLogsOlderThanWithEntryOlderThanTime() throws IOException
+  {
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+    String entry1Payload = "testPayload";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+    byte[] payload = connector.lookup(
+        derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+
+    // Do delete
+    auditManager.removeAuditLogsOlderThan(System.currentTimeMillis());
+    // Verify the delete
+    payload = connector.lookup(
+        derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    Assert.assertNull(payload);
+  }
+
+  @Test(timeout = 60_000L)
+  public void testRemoveAuditLogsOlderThanWithEntryNotOlderThanTime() throws IOException
+  {
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+    String entry1Payload = "testPayload";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+    byte[] payload = connector.lookup(
+        derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+    // Do delete
+    auditManager.removeAuditLogsOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis());
+    // Verify that entry was not delete
+    payload = connector.lookup(
+        derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+  }
+
+  @Test(timeout = 60_000L)
   public void testFetchAuditHistoryByTypeWithLimit()
   {
     String entry1Key = "testKey";
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index dd382e3..a8deb66 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -169,8 +169,11 @@
         new Duration(COORDINATOR_PERIOD),
         null,
         null,
+        null,
         new Duration(COORDINATOR_PERIOD),
         null,
+        null,
+        null,
         10,
         new Duration("PT0s")
     );
@@ -247,6 +250,7 @@
         druidNode,
         loadManagementPeons,
         null,
+        null,
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
@@ -546,6 +550,7 @@
         druidNode,
         loadManagementPeons,
         null,
+        null,
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 36f1bf6..3aaef64 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -141,8 +141,11 @@
         new Duration(COORDINATOR_PERIOD),
         null,
         null,
+        null,
         new Duration(COORDINATOR_PERIOD),
         null,
+        null,
+        null,
         10,
         new Duration("PT0s")
     );
@@ -212,6 +215,7 @@
         druidNode,
         loadManagementPeons,
         null,
+        new HashSet<>(),
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
@@ -738,6 +742,7 @@
         null,
         null,
         null,
+        null,
         ZkEnablementConfig.ENABLED
     );
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 10d7ba2..7b07d7f 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -80,6 +80,9 @@
       null,
       null,
       null,
+      null,
+      null,
+      null,
       10,
       Duration.ZERO
   )
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 4a2a25f..24e6806 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -95,6 +95,9 @@
             null,
             null,
             null,
+            null,
+            null,
+            null,
             10,
             Duration.millis(0)
         )
@@ -287,9 +290,12 @@
             null,
             null,
             null,
+            null,
             new Duration(1),
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
@@ -339,9 +345,12 @@
             null,
             null,
             null,
+            null,
             new Duration(1),
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index fb92873..5185452 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -41,9 +41,12 @@
             null,
             null,
             null,
+            null,
             new Duration(1),
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index fef244a..135f8d0 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -27,9 +27,12 @@
   private final Duration coordinatorStartDelay;
   private final Duration coordinatorPeriod;
   private final Duration coordinatorIndexingPeriod;
+  private final Duration metadataStoreManagementPeriod;
   private final Duration loadTimeoutDelay;
   private final Duration coordinatorKillPeriod;
   private final Duration coordinatorKillDurationToRetain;
+  private final Duration coordinatorAuditKillPeriod;
+  private final Duration coordinatorAuditKillDurationToRetain;
   private final Duration getLoadQueuePeonRepeatDelay;
   private final int coordinatorKillMaxSegments;
 
@@ -37,9 +40,12 @@
       Duration coordinatorStartDelay,
       Duration coordinatorPeriod,
       Duration coordinatorIndexingPeriod,
+      Duration metadataStoreManagementPeriod,
       Duration loadTimeoutDelay,
       Duration coordinatorKillPeriod,
       Duration coordinatorKillDurationToRetain,
+      Duration coordinatorAuditKillPeriod,
+      Duration coordinatorAuditKillDurationToRetain,
       int coordinatorKillMaxSegments,
       Duration getLoadQueuePeonRepeatDelay
   )
@@ -47,9 +53,12 @@
     this.coordinatorStartDelay = coordinatorStartDelay;
     this.coordinatorPeriod = coordinatorPeriod;
     this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
+    this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
     this.loadTimeoutDelay = loadTimeoutDelay;
     this.coordinatorKillPeriod = coordinatorKillPeriod;
     this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
+    this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
+    this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
     this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
   }
@@ -73,6 +82,12 @@
   }
 
   @Override
+  public Duration getCoordinatorMetadataStoreManagementPeriod()
+  {
+    return metadataStoreManagementPeriod;
+  }
+
+  @Override
   public Duration getCoordinatorKillPeriod()
   {
     return coordinatorKillPeriod;
@@ -85,6 +100,18 @@
   }
 
   @Override
+  public Duration getCoordinatorAuditKillPeriod()
+  {
+    return coordinatorAuditKillPeriod;
+  }
+
+  @Override
+  public Duration getCoordinatorAuditKillDurationToRetain()
+  {
+    return coordinatorAuditKillDurationToRetain;
+  }
+
+  @Override
   public int getCoordinatorKillMaxSegments()
   {
     return coordinatorKillMaxSegments;
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
new file mode 100644
index 0000000..b0f273c
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KillAuditLogTest
+{
+  @Mock
+  private AuditManager mockAuditManager;
+
+  @Mock
+  private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
+
+  @Mock
+  private ServiceEmitter mockServiceEmitter;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private KillAuditLog killAuditLog;
+
+  @Test
+  public void testRunSkipIfLastRunLessThanPeriod()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration(Long.MAX_VALUE),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+    killAuditLog.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verifyZeroInteractions(mockAuditManager);
+  }
+
+  @Test
+  public void testRunNotSkipIfLastRunMoreThanPeriod()
+  {
+    Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration("PT6S"),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+    killAuditLog.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong());
+    Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+  }
+
+  @Test
+  public void testConstructorFailIfInvalidPeriod()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration("PT3S"),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+  }
+
+  @Test
+  public void testConstructorFailIfInvalidRetainDuration()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration("PT6S"),
+        new Duration("PT-1S"),
+        10,
+        null
+    );
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("coordinator audit kill retainDuration must be >= 0");
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 38aa78e..cd76666 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -105,9 +105,12 @@
             null,
             null,
             Duration.parse("PT76400S"),
+            null,
             new Duration(1),
             Duration.parse("PT86400S"),
             Duration.parse("PT86400S"),
+            null,
+            null,
             1000,
             Duration.ZERO
         )
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 1876a28..9e3c65b 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -46,6 +46,7 @@
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
+import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.http.JettyHttpClientModule;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -71,6 +72,7 @@
 import org.apache.druid.server.coordinator.KillStalePendingSegments;
 import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.KillAuditLog;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
 import org.apache.druid.server.http.ClusterResource;
 import org.apache.druid.server.http.CompactionResource;
@@ -214,14 +216,14 @@
             LifecycleModule.register(binder, Server.class);
             LifecycleModule.register(binder, DataSourcesResource.class);
 
-            final ConditionalMultibind<CoordinatorDuty> conditionalMultibind = ConditionalMultibind.create(
+            // Binding for Set of indexing service coordinator Ddty
+            final ConditionalMultibind<CoordinatorDuty> conditionalIndexingServiceDutyMultibind = ConditionalMultibind.create(
                 properties,
                 binder,
                 CoordinatorDuty.class,
                 CoordinatorIndexingServiceDuty.class
             );
-
-            if (conditionalMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) {
+            if (conditionalIndexingServiceDutyMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) {
               throw new UnsupportedOperationException(
                   "'druid.coordinator.merge.on' is not supported anymore. "
                   + "Please consider using Coordinator's automatic compaction instead. "
@@ -230,19 +232,31 @@
                   + "for more details about compaction."
               );
             }
-
-            conditionalMultibind.addConditionBinding(
+            conditionalIndexingServiceDutyMultibind.addConditionBinding(
                 "druid.coordinator.kill.on",
                 Predicates.equalTo("true"),
                 KillUnusedSegments.class
             );
-            conditionalMultibind.addConditionBinding(
+            conditionalIndexingServiceDutyMultibind.addConditionBinding(
                 "druid.coordinator.kill.pendingSegments.on",
                 "true",
                 Predicates.equalTo("true"),
                 KillStalePendingSegments.class
             );
 
+            // Binding for Set of metadata store management coordinator Ddty
+            final ConditionalMultibind<CoordinatorDuty> conditionalMetadataStoreManagementDutyMultibind = ConditionalMultibind.create(
+                properties,
+                binder,
+                CoordinatorDuty.class,
+                CoordinatorMetadataStoreManagementDuty.class
+            );
+            conditionalMetadataStoreManagementDutyMultibind.addConditionBinding(
+                "druid.coordinator.kill.audit.on",
+                Predicates.equalTo("true"),
+                KillAuditLog.class
+            );
+
             bindNodeRoleAndAnnouncer(
                 binder,
                 Coordinator.class,
diff --git a/website/.spelling b/website/.spelling
index bd2ecfa..315e5ff 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1683,6 +1683,7 @@
 PT24H
 PT300S
 PT30S
+PT3600S
 PT5M
 PT5S
 PT60S