Supervisor metadata auto cleanup failing as missing Guice injection (#11424)

* Fix Supervisor metadata auto cleanup failing as missing Guice injection

* Fix Supervisor metadata auto cleanup failing as missing Guice injection

* fix IT

* fix IT

* Update services/src/main/java/org/apache/druid/cli/CliCoordinator.java

Co-authored-by: Clint Wylie <cjwylie@gmail.com>

* fix

* fix

* fix

* fix

* fix

* fix

* fix

Co-authored-by: Clint Wylie <cjwylie@gmail.com>
diff --git a/integration-tests/docker/environment-configs/common b/integration-tests/docker/environment-configs/common
index 815596f..2ded396 100644
--- a/integration-tests/docker/environment-configs/common
+++ b/integration-tests/docker/environment-configs/common
@@ -68,6 +68,10 @@
 druid_sql_enable=true
 druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
 druid_request_logging_type=slf4j
+druid_coordinator_kill_supervisor_on=true
+druid_coordinator_kill_supervisor_period=PT10S
+druid_coordinator_kill_supervisor_durationToRetain=PT0M
+druid_coordinator_period_metadataStoreManagementPeriod=PT10S
 
 # Testing the legacy config from https://github.com/apache/druid/pull/10267
 # Can remove this when the flag is no longer needed
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 184f514..dc25039 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -507,6 +507,41 @@
     }
   }
 
+  public List<Object> getSupervisorHistory(String id)
+  {
+    try {
+      StatusResponseHolder response = httpClient.go(
+          new Request(
+              HttpMethod.GET,
+              new URL(StringUtils.format(
+                  "%ssupervisor/%s/history",
+                  getIndexerURL(),
+                  StringUtils.urlEncode(id)
+              ))
+          ),
+          StatusResponseHandler.getInstance()
+      ).get();
+      if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+        return null;
+      } else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while getting supervisor status, response [%s %s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      List<Object> responseData = jsonMapper.readValue(
+          response.getContent(), new TypeReference<List<Object>>()
+          {
+          }
+      );
+      return responseData;
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private StatusResponseHolder makeRequest(HttpMethod method, String url)
   {
     try {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 72fbef2..ed1811a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -360,7 +360,100 @@
     }
   }
 
+  protected void doTestTerminatedSupervisorAutoCleanup(@Nullable Boolean transactionEnabled) throws Exception
+  {
+    final GeneratedTestConfig generatedTestConfig1 = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
+    final GeneratedTestConfig generatedTestConfig2 = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
+    try (
+        final Closeable closer1 = createResourceCloser(generatedTestConfig1);
+        final Closeable closer2 = createResourceCloser(generatedTestConfig2);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
+    ) {
+      final String taskSpec1 = generatedTestConfig1.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+      LOG.info("supervisorSpec1: [%s]\n", taskSpec1);
+      final String taskSpec2 = generatedTestConfig2.getStreamIngestionPropsTransform()
+                                                   .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+      LOG.info("supervisorSpec2: [%s]\n", taskSpec2);
+      // Start both supervisors
+      generatedTestConfig1.setSupervisorId(indexer.submitSupervisor(taskSpec1));
+      generatedTestConfig2.setSupervisorId(indexer.submitSupervisor(taskSpec2));
+      LOG.info("Submitted supervisors");
+      // Start generating the data
+      final StreamGenerator streamGenerator1 = new WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      streamGenerator1.run(
+          generatedTestConfig1.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
+      final StreamGenerator streamGenerator2 = new WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      streamGenerator2.run(
+          generatedTestConfig2.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
+      // Verify supervisors are healthy before termination
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig1.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor1 to be healthy"
+      );
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig2.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor2 to be healthy"
+      );
 
+      // Sleep for 10 secs to make sure that at least one cycle of supervisor auto cleanup duty ran
+      Thread.sleep(10000);
+
+      // Verify that supervisor specs exist
+      List<Object> specs1 = indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
+      Assert.assertNotNull(specs1);
+      Assert.assertFalse(specs1.isEmpty());
+
+      List<Object> specs2 = indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId());
+      Assert.assertNotNull(specs2);
+      Assert.assertFalse(specs2.isEmpty());
+
+      // Supervisor 1 should still be active while supervisor 2 is now terminated
+      LOG.info("Terminating supervisor 2");
+      indexer.terminateSupervisor(generatedTestConfig2.getSupervisorId());
+
+      // Verify that auto cleanup eventually removes supervisor spec after termination
+      ITRetryUtil.retryUntil(
+          () -> indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId()) == null,
+          true,
+          10000,
+          30,
+          "Waiting for supervisor spec 2 to be auto clean"
+      );
+      // Verify that supervisor 1 spec was not remove
+      specs1 = indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
+      Assert.assertNotNull(specs1);
+      Assert.assertFalse(specs1.isEmpty());
+    }
+  }
 
   protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception
   {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
index 967ff52..936e7a1 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -71,4 +71,14 @@
   {
     doTestIndexDataWithStreamReshardSplit(false);
   }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaTerminatedSupervisorAutoCleanup() throws Exception
+  {
+    doTestTerminatedSupervisorAutoCleanup(false);
+  }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
new file mode 100644
index 0000000..a676c18
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisIndexingServiceTest
+{
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kinesis_parallelized";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKinesisTerminatedSupervisorAutoCleanup() throws Exception
+  {
+    doTestTerminatedSupervisorAutoCleanup(false);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
index 8a0b484..3f87f5d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
@@ -79,7 +79,7 @@
                 supervisorRemoved
             )
         );
-        log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved);
+        log.info("Finished running KillSupervisors duty. Removed %,d supervisor specs", supervisorRemoved);
       }
       catch (Exception e) {
         log.error(e, "Failed to kill terminated supervisor metadata");
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index e888e6b..58323dc 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -28,6 +28,7 @@
 import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.name.Names;
+import com.google.inject.util.Providers;
 import io.airlift.airline.Command;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.audit.AuditManager;
@@ -49,6 +50,8 @@
 import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.http.JettyHttpClientModule;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ExecutorServices;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@@ -64,6 +67,7 @@
 import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
 import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
 import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.server.audit.AuditManagerProvider;
 import org.apache.druid.server.coordinator.BalancerStrategyFactory;
 import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
@@ -289,6 +293,13 @@
 
             Jerseys.addResource(binder, SelfDiscoveryResource.class);
             LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
+
+            if (!beOverlord) {
+              // These are needed to deserialize SupervisorSpec for Supervisor Auto Cleanup
+              binder.bind(TaskStorage.class).toProvider(Providers.of(null));
+              binder.bind(TaskMaster.class).toProvider(Providers.of(null));
+              binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
+            }
           }
 
           @Provides