Add feature to automatically remove datasource metadata based on retention period (#11227)

* add auto clean up datasource metadata

* add test

* fix checkstyle

* add comments

* fix error

* address comments

* Address comments

* fix test

* fix test

* fix typo

* add comment

* fix test

* fix test
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 986a448..fdb9365 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -756,6 +756,9 @@
 |`druid.coordinator.kill.rule.on`| Boolean value for whether to enable automatic deletion of rules. If set to true, Coordinator will periodically remove rules of inactive datasource (datasource with no used and unused segments) from the rule table in metadata storage.| No | False| 
 |`druid.coordinator.kill.rule.period`| How often to do automatic deletion of rules in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than  `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.rule.on` is set to "True".| No| `P1D`|
 |`druid.coordinator.kill.rule.durationToRetain`| Duration of rules to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.rule.on` is set to "True".| Yes if `druid.coordinator.kill.rule.on` is set to "True".| None|
+|`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable automatic deletion of datasource metadata (Note: datasource metadata only exists for datasource created from supervisor). If set to true, Coordinator will periodically remove datasource metadata of terminated supervisor from the datasource table in metadata storage.  | No | False| 
+|`druid.coordinator.kill.datasource.period`| How often to do automatic deletion of datasource metadata in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than  `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.datasource.on` is set to "True".| No| `P1D`|
+|`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource metadata to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.datasource.on` is set to "True".| Yes if `druid.coordinator.kill.datasource.on` is set to "True".| None|
 
 ##### Segment Management
 |Property|Possible Values|Description|Default|
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index b68dcca..a4aa9ab 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -259,6 +259,7 @@
 |`metadata/kill/supervisor/count`|Total number of terminated supervisors that were automatically deleted from metadata store per each Coordinator kill supervisor duty run. This metric can help adjust `druid.coordinator.kill.supervisor.durationToRetain` configuration based on whether more or less terminated supervisors need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.supervisor.on` is set to true.| |Varies.|
 |`metadata/kill/audit/count`|Total number of audit logs that were automatically deleted from metadata store per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on whether more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.|
 |`metadata/kill/rule/count`|Total number of rules that were automatically deleted from metadata store per each Coordinator kill rule duty run. This metric can help adjust `druid.coordinator.kill.rule.durationToRetain` configuration based on whether more or less rules need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.rule.on` is set to true.| |Varies.|
+|`metadata/kill/datasource/count`|Total number of datasource metadata that were automatically deleted from metadata store per each Coordinator kill datasource duty run (Note: datasource metadata only exists for datasource created from supervisor). This metric can help adjust `druid.coordinator.kill.datasource.durationToRetain` configuration based on whether more or less datasource metadata need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.datasource.on` is set to true.| |Varies.|
 
 
 If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index decaff8..fb274f3 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -34,6 +34,7 @@
 import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -140,6 +141,13 @@
   }
 
   @Override
+  public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set<String> excludeDatasources)
+  {
+    throw new UnsupportedOperationException("Not implemented, no test uses this currently.");
+  }
+
+
+  @Override
   public SegmentIdWithShardSpec allocatePendingSegment(
       String dataSource,
       String sequenceName,
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 3a1d257..513f5b9 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -26,6 +26,7 @@
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -268,6 +269,15 @@
   boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata);
 
   /**
+   * Remove datasource metadata created before the given timestamp and not in given excludeDatasources set.
+   *
+   * @param timestamp timestamp in milliseconds
+   * @param excludeDatasources set of datasource names to exclude from removal
+   * @return number of datasource metadata removed
+   */
+  int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set<String> excludeDatasources);
+
+  /**
    * Similar to {@link #announceHistoricalSegments(Set)}, but meant for streaming ingestion tasks for handling
    * the case where the task ingested no records and created no segments, but still needs to update the metadata
    * with the progress that the task made.
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 05fcb56..46d6c0e 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -57,6 +57,7 @@
 import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.PartitionIds;
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.chrono.ISOChronology;
 import org.skife.jdbi.v2.Batch;
@@ -73,6 +74,7 @@
 import org.skife.jdbi.v2.util.ByteArrayMapper;
 
 import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.util.ArrayList;
@@ -1423,4 +1425,39 @@
             .execute()
     );
   }
+
+  @Override
+  public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set<String> excludeDatasources)
+  {
+    DateTime dateTime = DateTimes.utc(timestamp);
+    List<String> datasourcesToDelete = connector.getDBI().withHandle(
+        handle -> handle
+            .createQuery(
+                StringUtils.format(
+                    "SELECT dataSource FROM %1$s WHERE created_date < '%2$s'",
+                    dbTables.getDataSourceTable(),
+                    dateTime.toString()
+                )
+            )
+            .mapTo(String.class)
+            .list()
+    );
+    datasourcesToDelete.removeAll(excludeDatasources);
+    return connector.getDBI().withHandle(
+        handle -> {
+          final PreparedBatch batch = handle.prepareBatch(
+              StringUtils.format(
+                  "DELETE FROM %1$s WHERE dataSource = :dataSource AND created_date < '%2$s'",
+                  dbTables.getDataSourceTable(),
+                  dateTime.toString()
+              )
+          );
+          for (String datasource : datasourcesToDelete) {
+            batch.bind("dataSource", datasource).add();
+          }
+          int[] result = batch.execute();
+          return IntStream.of(result).sum();
+        }
+    );
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
index 591dee2..9eb254f 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java
@@ -33,9 +33,28 @@
 
   Map<String, List<VersionedSupervisorSpec>> getAll();
 
+  /**
+   * Return latest supervisors (both active and terminated)
+   *
+   * @return latest terminated supervisors
+   */
   Map<String, SupervisorSpec> getLatest();
 
   /**
+   * Only return the latest active supervisors
+   *
+   * @return latest active supervisors
+   */
+  Map<String, SupervisorSpec> getLatestActiveOnly();
+
+  /**
+   * Only return the latest terminated supervisors
+   *
+   * @return latest terminated supervisors
+   */
+  Map<String, SupervisorSpec> getLatestTerminatedOnly();
+
+  /**
    * Remove terminated supervisors created before the given timestamp.
    *
    * @param timestamp timestamp in milliseconds
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
index e995a14..0480c45 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
@@ -254,10 +254,40 @@
   }
 
   @Override
+  public Map<String, SupervisorSpec> getLatestActiveOnly()
+  {
+    Map<String, SupervisorSpec> supervisors = getLatest();
+    Map<String, SupervisorSpec> activeSupervisors = new HashMap<>();
+    for (Map.Entry<String, SupervisorSpec> entry : supervisors.entrySet()) {
+      // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec
+      // (NoopSupervisorSpec is used as a tombstone marker)
+      if (!(entry.getValue() instanceof NoopSupervisorSpec)) {
+        activeSupervisors.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return ImmutableMap.copyOf(activeSupervisors);
+  }
+
+  @Override
+  public Map<String, SupervisorSpec> getLatestTerminatedOnly()
+  {
+    Map<String, SupervisorSpec> supervisors = getLatest();
+    Map<String, SupervisorSpec> activeSupervisors = new HashMap<>();
+    for (Map.Entry<String, SupervisorSpec> entry : supervisors.entrySet()) {
+      // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec
+      // (NoopSupervisorSpec is used as a tombstone marker)
+      if (entry.getValue() instanceof NoopSupervisorSpec) {
+        activeSupervisors.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return ImmutableMap.copyOf(activeSupervisors);
+  }
+
+  @Override
   public int removeTerminatedSupervisorsOlderThan(long timestamp)
   {
     DateTime dateTime = DateTimes.utc(timestamp);
-    Map<String, SupervisorSpec> supervisors = getLatest();
+    Map<String, SupervisorSpec> terminatedSupervisors = getLatestTerminatedOnly();
     return dbi.withHandle(
         handle -> {
           final PreparedBatch batch = handle.prepareBatch(
@@ -267,13 +297,8 @@
                   dateTime.toString()
               )
           );
-          for (Map.Entry<String, SupervisorSpec> supervisor : supervisors.entrySet()) {
-            final SupervisorSpec spec = supervisor.getValue();
-            // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec
-            // (NoopSupervisorSpec is used as a tombstone marker)
-            if (spec instanceof NoopSupervisorSpec) {
-              batch.bind("spec_id", supervisor.getKey()).add();
-            }
+          for (Map.Entry<String, SupervisorSpec> supervisor : terminatedSupervisors.entrySet()) {
+            batch.bind("spec_id", supervisor.getKey()).add();
           }
           int[] result = batch.execute();
           return IntStream.of(result).sum();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index 7985b5c..14da3c8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -79,6 +79,14 @@
   @Default("PT-1s")
   public abstract Duration getCoordinatorRuleKillDurationToRetain();
 
+  @Config("druid.coordinator.kill.datasource.period")
+  @Default("P1D")
+  public abstract Duration getCoordinatorDatasourceKillPeriod();
+
+  @Config("druid.coordinator.kill.datasource.durationToRetain")
+  @Default("PT-1s")
+  public abstract Duration getCoordinatorDatasourceKillDurationToRetain();
+
   @Config("druid.coordinator.load.timeout")
   public Duration getLoadTimeoutDelay()
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
index 651cf6b..ac735c6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
@@ -51,6 +51,7 @@
     );
     this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis();
     Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0");
+    Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator audit kill retainDuration cannot be greater than current time in ms");
     log.debug(
         "Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]",
         this.period,
@@ -62,19 +63,24 @@
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
   {
-    if ((lastKillTime + period) < System.currentTimeMillis()) {
-      lastKillTime = System.currentTimeMillis();
-
-      long timestamp = System.currentTimeMillis() - retainDuration;
-      int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
-      ServiceEmitter emitter = params.getEmitter();
-      emitter.emit(
-          new ServiceMetricEvent.Builder().build(
-              "metadata/kill/audit/count",
-              auditRemoved
-          )
-      );
-      log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved);
+    long currentTimeMillis = System.currentTimeMillis();
+    if ((lastKillTime + period) < currentTimeMillis) {
+      lastKillTime = currentTimeMillis;
+      long timestamp = currentTimeMillis - retainDuration;
+      try {
+        int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
+        ServiceEmitter emitter = params.getEmitter();
+        emitter.emit(
+            new ServiceMetricEvent.Builder().build(
+                "metadata/kill/audit/count",
+                auditRemoved
+            )
+        );
+        log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved);
+      }
+      catch (Exception e) {
+        log.error(e, "Failed to kill audit log");
+      }
     }
     return params;
   }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
new file mode 100644
index 0000000..9c9535b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * CoordinatorDuty for automatic deletion of datasource metadata from the datasource table in metadata storage.
+ * (Note: datasource metadata only exists for datasource created from supervisor).
+ * Note that this class relies on the supervisorSpec.getDataSources names to match with the
+ * 'datasource' column of the datasource metadata table.
+ */
+public class KillDatasourceMetadata implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(KillDatasourceMetadata.class);
+
+  private final long period;
+  private final long retainDuration;
+  private long lastKillTime = 0;
+
+  private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
+  private final MetadataSupervisorManager metadataSupervisorManager;
+
+  @Inject
+  public KillDatasourceMetadata(
+      DruidCoordinatorConfig config,
+      IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+      MetadataSupervisorManager metadataSupervisorManager
+  )
+  {
+    this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
+    this.metadataSupervisorManager = metadataSupervisorManager;
+    this.period = config.getCoordinatorDatasourceKillPeriod().getMillis();
+    Preconditions.checkArgument(
+        this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
+        "Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
+    );
+    this.retainDuration = config.getCoordinatorDatasourceKillDurationToRetain().getMillis();
+    Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator datasource metadata kill retainDuration must be >= 0");
+    Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator datasource metadata kill retainDuration cannot be greater than current time in ms");
+    log.debug(
+        "Datasource Metadata Kill Task scheduling enabled with period [%s], retainDuration [%s]",
+        this.period,
+        this.retainDuration
+    );
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
+  {
+    long currentTimeMillis = System.currentTimeMillis();
+    if ((lastKillTime + period) < currentTimeMillis) {
+      lastKillTime = currentTimeMillis;
+      long timestamp = currentTimeMillis - retainDuration;
+      try {
+        // Datasource metadata only exists for datasource with supervisor
+        // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource
+        // is still active or not
+        Map<String, SupervisorSpec> allActiveSupervisor = metadataSupervisorManager.getLatestActiveOnly();
+        Set<String> allDatasourceWithActiveSupervisor = allActiveSupervisor.values()
+                                                                           .stream()
+                                                                           .map(supervisorSpec -> supervisorSpec.getDataSources())
+                                                                           .flatMap(Collection::stream)
+                                                                           .filter(datasource -> !Strings.isNullOrEmpty(datasource))
+                                                                           .collect(Collectors.toSet());
+        // We exclude removing datasource metadata with active supervisor
+        int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(
+            timestamp,
+            allDatasourceWithActiveSupervisor
+        );
+        ServiceEmitter emitter = params.getEmitter();
+        emitter.emit(
+            new ServiceMetricEvent.Builder().build(
+                "metadata/kill/datasource/count",
+                datasourceMetadataRemovedCount
+            )
+        );
+        log.info(
+            "Finished running KillDatasourceMetadata duty. Removed %,d datasource metadata",
+            datasourceMetadataRemovedCount
+        );
+      }
+      catch (Exception e) {
+        log.error(e, "Failed to kill datasource metadata");
+      }
+    }
+    return params;
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
index eb4f018..50b1740 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
@@ -47,6 +47,7 @@
     );
     this.retainDuration = config.getCoordinatorRuleKillDurationToRetain().getMillis();
     Preconditions.checkArgument(this.retainDuration >= 0, "coordinator rule kill retainDuration must be >= 0");
+    Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator rule kill retainDuration cannot be greater than current time in ms");
     log.debug(
         "Rule Kill Task scheduling enabled with period [%s], retainDuration [%s]",
         this.period,
@@ -57,19 +58,24 @@
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
   {
-    if ((lastKillTime + period) < System.currentTimeMillis()) {
-      lastKillTime = System.currentTimeMillis();
-
-      long timestamp = System.currentTimeMillis() - retainDuration;
-      int ruleRemoved = params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp);
-      ServiceEmitter emitter = params.getEmitter();
-      emitter.emit(
-          new ServiceMetricEvent.Builder().build(
-              "metadata/kill/rule/count",
-              ruleRemoved
-          )
-      );
-      log.info("Finished running KillRules duty. Removed %,d rule", ruleRemoved);
+    long currentTimeMillis = System.currentTimeMillis();
+    if ((lastKillTime + period) < currentTimeMillis) {
+      lastKillTime = currentTimeMillis;
+      long timestamp = currentTimeMillis - retainDuration;
+      try {
+        int ruleRemoved = params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp);
+        ServiceEmitter emitter = params.getEmitter();
+        emitter.emit(
+            new ServiceMetricEvent.Builder().build(
+                "metadata/kill/rule/count",
+                ruleRemoved
+            )
+        );
+        log.info("Finished running KillRules duty. Removed %,d rule", ruleRemoved);
+      }
+      catch (Exception e) {
+        log.error(e, "Failed to kill rules metadata");
+      }
     }
     return params;
   }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
index a1c2a3f..8a0b484 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
@@ -55,6 +55,7 @@
     );
     this.retainDuration = config.getCoordinatorSupervisorKillDurationToRetain().getMillis();
     Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator supervisor kill retainDuration must be >= 0");
+    Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator supervisor kill retainDuration cannot be greater than current time in ms");
     log.debug(
         "Supervisor Kill Task scheduling enabled with period [%s], retainDuration [%s]",
         this.period,
@@ -65,19 +66,24 @@
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
   {
-    if ((lastKillTime + period) < System.currentTimeMillis()) {
-      lastKillTime = System.currentTimeMillis();
-
-      long timestamp = System.currentTimeMillis() - retainDuration;
-      int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp);
-      ServiceEmitter emitter = params.getEmitter();
-      emitter.emit(
-          new ServiceMetricEvent.Builder().build(
-              "metadata/kill/supervisor/count",
-              supervisorRemoved
-          )
-      );
-      log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved);
+    long currentTimeMillis = System.currentTimeMillis();
+    if ((lastKillTime + period) < currentTimeMillis) {
+      lastKillTime = currentTimeMillis;
+      long timestamp = currentTimeMillis - retainDuration;
+      try {
+        int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp);
+        ServiceEmitter emitter = params.getEmitter();
+        emitter.emit(
+            new ServiceMetricEvent.Builder().build(
+                "metadata/kill/supervisor/count",
+                supervisorRemoved
+            )
+        );
+        log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved);
+      }
+      catch (Exception e) {
+        log.error(e, "Failed to kill terminated supervisor metadata");
+      }
     }
     return params;
   }
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 924ad92..7acd90f 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -1471,4 +1471,82 @@
       Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.TRY_AGAIN, result);
     }
   }
+
+  @Test
+  public void testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDeleted() throws Exception
+  {
+    coordinator.announceHistoricalSegments(
+        ImmutableSet.of(defaultSegment),
+        ImmutableSet.of(),
+        new ObjectMetadata(null),
+        new ObjectMetadata(ImmutableMap.of("foo", "bar"))
+    );
+
+    Assert.assertEquals(
+        new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+        coordinator.retrieveDataSourceMetadata("fooDataSource")
+    );
+
+    // Try delete. Datasource should not be deleted as it is in excluded set
+    int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of("fooDataSource"));
+
+    // Datasource should not be deleted
+    Assert.assertEquals(
+        new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+        coordinator.retrieveDataSourceMetadata("fooDataSource")
+    );
+    Assert.assertEquals(0, deletedCount);
+  }
+
+  @Test
+  public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThanTimeShouldBeDeleted() throws Exception
+  {
+    coordinator.announceHistoricalSegments(
+        ImmutableSet.of(defaultSegment),
+        ImmutableSet.of(),
+        new ObjectMetadata(null),
+        new ObjectMetadata(ImmutableMap.of("foo", "bar"))
+    );
+
+    Assert.assertEquals(
+        new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+        coordinator.retrieveDataSourceMetadata("fooDataSource")
+    );
+
+    // Try delete. Datasource should be deleted as it is not in excluded set and created time older than given time
+    int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of());
+
+    // Datasource should be deleted
+    Assert.assertNull(
+        coordinator.retrieveDataSourceMetadata("fooDataSource")
+    );
+    Assert.assertEquals(1, deletedCount);
+  }
+
+  @Test
+  public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted() throws Exception
+  {
+    coordinator.announceHistoricalSegments(
+        ImmutableSet.of(defaultSegment),
+        ImmutableSet.of(),
+        new ObjectMetadata(null),
+        new ObjectMetadata(ImmutableMap.of("foo", "bar"))
+    );
+
+    Assert.assertEquals(
+        new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+        coordinator.retrieveDataSourceMetadata("fooDataSource")
+    );
+
+    // Do delete. Datasource metadata should not be deleted. Datasource is not active but it was created just now so it's
+    // created timestamp will be later than the timestamp 2012-01-01T00:00:00Z
+    int deletedCount = coordinator.removeDataSourceMetadataOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis(), ImmutableSet.of());
+
+    // Datasource should not be deleted
+    Assert.assertEquals(
+        new ObjectMetadata(ImmutableMap.of("foo", "bar")),
+        coordinator.retrieveDataSourceMetadata("fooDataSource")
+    );
+    Assert.assertEquals(0, deletedCount);
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
index 5e4f483..9547387 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
@@ -243,6 +243,46 @@
     Assert.assertNull(specs.get(0).getSpec());
   }
 
+  @Test
+  public void testGetLatestActiveOnly()
+  {
+    final String supervisor1 = "test-supervisor-1";
+    final String datasource1 = "datasource-1";
+    final String supervisor2 = "test-supervisor-2";
+    final Map<String, String> data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1");
+    Assert.assertTrue(supervisorManager.getAll().isEmpty());
+    supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1));
+    // supervisor1 is terminated
+    supervisorManager.insert(supervisor1, new NoopSupervisorSpec(supervisor1, ImmutableList.of(datasource1)));
+    // supervisor2 is still active
+    supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data1rev1));
+    // get latest active should only return supervisor2
+    Map<String, SupervisorSpec> actual = supervisorManager.getLatestActiveOnly();
+    Assert.assertEquals(1, actual.size());
+    Assert.assertTrue(actual.containsKey(supervisor2));
+  }
+
+
+  @Test
+  public void testGetLatestTerminatedOnly()
+  {
+    final String supervisor1 = "test-supervisor-1";
+    final String datasource1 = "datasource-1";
+    final String supervisor2 = "test-supervisor-2";
+    final Map<String, String> data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1");
+    Assert.assertTrue(supervisorManager.getAll().isEmpty());
+    supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1));
+    // supervisor1 is terminated
+    supervisorManager.insert(supervisor1, new NoopSupervisorSpec(supervisor1, ImmutableList.of(datasource1)));
+    // supervisor2 is still active
+    supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data1rev1));
+    // get latest terminated should only return supervisor1
+    Map<String, SupervisorSpec> actual = supervisorManager.getLatestTerminatedOnly();
+    Assert.assertEquals(1, actual.size());
+    Assert.assertTrue(actual.containsKey(supervisor1));
+  }
+
+
   private static class BadSupervisorSpec implements SupervisorSpec
   {
     private final String id;
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index b8e3103..adad57e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -178,6 +178,8 @@
         null,
         null,
         null,
+        null,
+        null,
         10,
         new Duration("PT0s")
     );
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 97c220d..2d88d6e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -150,6 +150,8 @@
         null,
         null,
         null,
+        null,
+        null,
         10,
         new Duration("PT0s")
     );
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 2a85500..4991f13 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -87,6 +87,8 @@
       null,
       null,
       null,
+      null,
+      null,
       10,
       Duration.ZERO
   )
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 8b2f486..ea4322a 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -102,6 +102,8 @@
             null,
             null,
             null,
+            null,
+            null,
             10,
             Duration.millis(0)
         )
@@ -304,6 +306,8 @@
             null,
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
@@ -363,6 +367,8 @@
             null,
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index 57ac65b..326bc76 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -51,6 +51,8 @@
             null,
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index 2c1b706..e2c0570 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -37,6 +37,8 @@
   private final Duration coordinatorAuditKillDurationToRetain;
   private final Duration coordinatorRuleKillPeriod;
   private final Duration coordinatorRuleKillDurationToRetain;
+  private final Duration coordinatorDatasourceKillPeriod;
+  private final Duration coordinatorDatasourceKillDurationToRetain;
   private final Duration getLoadQueuePeonRepeatDelay;
   private final int coordinatorKillMaxSegments;
 
@@ -54,6 +56,8 @@
       Duration coordinatorAuditKillDurationToRetain,
       Duration coordinatorRuleKillPeriod,
       Duration coordinatorRuleKillDurationToRetain,
+      Duration coordinatorDatasourceKillPeriod,
+      Duration coordinatorDatasourceKillDurationToRetain,
       int coordinatorKillMaxSegments,
       Duration getLoadQueuePeonRepeatDelay
   )
@@ -71,6 +75,8 @@
     this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
     this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod;
     this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain;
+    this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
+    this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
     this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
   }
@@ -148,6 +154,18 @@
   }
 
   @Override
+  public Duration getCoordinatorDatasourceKillPeriod()
+  {
+    return coordinatorDatasourceKillPeriod;
+  }
+
+  @Override
+  public Duration getCoordinatorDatasourceKillDurationToRetain()
+  {
+    return coordinatorDatasourceKillDurationToRetain;
+  }
+
+  @Override
   public int getCoordinatorKillMaxSegments()
   {
     return coordinatorKillMaxSegments;
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
index aeb0bd9..4b883f3 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
@@ -68,6 +68,8 @@
         new Duration("PT1S"),
         null,
         null,
+        null,
+        null,
         10,
         null
     );
@@ -94,6 +96,8 @@
         new Duration("PT1S"),
         null,
         null,
+        null,
+        null,
         10,
         null
     );
@@ -120,6 +124,8 @@
         new Duration("PT1S"),
         null,
         null,
+        null,
+        null,
         10,
         null
     );
@@ -145,6 +151,8 @@
         new Duration("PT-1S"),
         null,
         null,
+        null,
+        null,
         10,
         null
     );
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
new file mode 100644
index 0000000..ff29136
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.metadata.TestSupervisorSpec;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KillDatasourceMetadataTest
+{
+  @Mock
+  private IndexerMetadataStorageCoordinator mockIndexerMetadataStorageCoordinator;
+
+  @Mock
+  private MetadataSupervisorManager mockMetadataSupervisorManager;
+
+  @Mock
+  private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
+
+  @Mock
+  private TestSupervisorSpec mockKinesisSupervisorSpec;
+
+  @Mock
+  private ServiceEmitter mockServiceEmitter;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private KillDatasourceMetadata killDatasourceMetadata;
+
+  @Test
+  public void testRunSkipIfLastRunLessThanPeriod()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new Duration(Long.MAX_VALUE),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+    killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verifyZeroInteractions(mockIndexerMetadataStorageCoordinator);
+    Mockito.verifyZeroInteractions(mockMetadataSupervisorManager);
+  }
+
+  @Test
+  public void testRunNotSkipIfLastRunMoreThanPeriod()
+  {
+    Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new Duration("PT6S"),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+    killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.anySet());
+    Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+  }
+
+  @Test
+  public void testConstructorFailIfInvalidPeriod()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new Duration("PT3S"),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
+    killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+  }
+
+  @Test
+  public void testConstructorFailIfInvalidRetainDuration()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new Duration("PT6S"),
+        new Duration("PT-1S"),
+        10,
+        null
+    );
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Coordinator datasource metadata kill retainDuration must be >= 0");
+    killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+  }
+
+  @Test
+  public void testRunWithEmptyFilterExcludedDatasource()
+  {
+    Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new Duration("PT6S"),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+    killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of()));
+    Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
index d3268f2..8d63e91 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
@@ -75,6 +75,8 @@
         null,
         new Duration(Long.MAX_VALUE),
         new Duration("PT1S"),
+        null,
+        null,
         10,
         null
     );
@@ -101,6 +103,8 @@
         null,
         new Duration("PT6S"),
         new Duration("PT1S"),
+        null,
+        null,
         10,
         null
     );
@@ -127,6 +131,8 @@
         null,
         new Duration("PT3S"),
         new Duration("PT1S"),
+        null,
+        null,
         10,
         null
     );
@@ -152,6 +158,8 @@
         null,
         new Duration("PT6S"),
         new Duration("PT-1S"),
+        null,
+        null,
         10,
         null
     );
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
index 9aeb992..9ab3fb6 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
@@ -68,6 +68,8 @@
         null,
         null,
         null,
+        null,
+        null,
         10,
         null
     );
@@ -94,6 +96,8 @@
         null,
         null,
         null,
+        null,
+        null,
         10,
         null
     );
@@ -120,6 +124,8 @@
         null,
         null,
         null,
+        null,
+        null,
         10,
         null
     );
@@ -145,6 +151,8 @@
         null,
         null,
         null,
+        null,
+        null,
         10,
         null
     );
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 34d020f..8a6eef3 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -115,6 +115,8 @@
             null,
             null,
             null,
+            null,
+            null,
             1000,
             Duration.ZERO
         )
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 584023c..c864592 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -73,6 +73,7 @@
 import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
 import org.apache.druid.server.coordinator.duty.KillAuditLog;
+import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
 import org.apache.druid.server.coordinator.duty.KillRules;
 import org.apache.druid.server.coordinator.duty.KillSupervisors;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
@@ -254,7 +255,7 @@
                 CoordinatorMetadataStoreManagementDuty.class
             );
             conditionalMetadataStoreManagementDutyMultibind.addConditionBinding(
-                "druid.coordinator.kill.rule.on",
+                "druid.coordinator.kill.supervisor.on",
                 Predicates.equalTo("true"),
                 KillSupervisors.class
             );
@@ -268,6 +269,11 @@
                 Predicates.equalTo("true"),
                 KillRules.class
             );
+            conditionalMetadataStoreManagementDutyMultibind.addConditionBinding(
+                "druid.coordinator.kill.datasource.on",
+                Predicates.equalTo("true"),
+                KillDatasourceMetadata.class
+            );
 
             bindNodeRoleAndAnnouncer(
                 binder,