Supervisor metadata auto cleanup failing as missing Guice injection (#11424)
* Fix Supervisor metadata auto cleanup failing as missing Guice injection
* Fix Supervisor metadata auto cleanup failing as missing Guice injection
* fix IT
* fix IT
* Update services/src/main/java/org/apache/druid/cli/CliCoordinator.java
Co-authored-by: Clint Wylie <cjwylie@gmail.com>
* fix
* fix
* fix
* fix
* fix
* fix
* fix
Co-authored-by: Clint Wylie <cjwylie@gmail.com>
diff --git a/integration-tests/docker/environment-configs/common b/integration-tests/docker/environment-configs/common
index 815596f..2ded396 100644
--- a/integration-tests/docker/environment-configs/common
+++ b/integration-tests/docker/environment-configs/common
@@ -68,6 +68,10 @@
druid_sql_enable=true
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
druid_request_logging_type=slf4j
+druid_coordinator_kill_supervisor_on=true
+druid_coordinator_kill_supervisor_period=PT10S
+druid_coordinator_kill_supervisor_durationToRetain=PT0M
+druid_coordinator_period_metadataStoreManagementPeriod=PT10S
# Testing the legacy config from https://github.com/apache/druid/pull/10267
# Can remove this when the flag is no longer needed
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 184f514..dc25039 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -507,6 +507,41 @@
}
}
+ public List<Object> getSupervisorHistory(String id)
+ {
+ try {
+ StatusResponseHolder response = httpClient.go(
+ new Request(
+ HttpMethod.GET,
+ new URL(StringUtils.format(
+ "%ssupervisor/%s/history",
+ getIndexerURL(),
+ StringUtils.urlEncode(id)
+ ))
+ ),
+ StatusResponseHandler.getInstance()
+ ).get();
+ if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+ return null;
+ } else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while getting supervisor status, response [%s %s]",
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+ List<Object> responseData = jsonMapper.readValue(
+ response.getContent(), new TypeReference<List<Object>>()
+ {
+ }
+ );
+ return responseData;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 72fbef2..ed1811a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -360,7 +360,100 @@
}
}
+ protected void doTestTerminatedSupervisorAutoCleanup(@Nullable Boolean transactionEnabled) throws Exception
+ {
+ final GeneratedTestConfig generatedTestConfig1 = new GeneratedTestConfig(
+ INPUT_FORMAT,
+ getResourceAsString(JSON_INPUT_FORMAT_PATH)
+ );
+ final GeneratedTestConfig generatedTestConfig2 = new GeneratedTestConfig(
+ INPUT_FORMAT,
+ getResourceAsString(JSON_INPUT_FORMAT_PATH)
+ );
+ try (
+ final Closeable closer1 = createResourceCloser(generatedTestConfig1);
+ final Closeable closer2 = createResourceCloser(generatedTestConfig2);
+ final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
+ ) {
+ final String taskSpec1 = generatedTestConfig1.getStreamIngestionPropsTransform()
+ .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+ LOG.info("supervisorSpec1: [%s]\n", taskSpec1);
+ final String taskSpec2 = generatedTestConfig2.getStreamIngestionPropsTransform()
+ .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+ LOG.info("supervisorSpec2: [%s]\n", taskSpec2);
+ // Start both supervisors
+ generatedTestConfig1.setSupervisorId(indexer.submitSupervisor(taskSpec1));
+ generatedTestConfig2.setSupervisorId(indexer.submitSupervisor(taskSpec2));
+ LOG.info("Submitted supervisors");
+ // Start generating the data
+ final StreamGenerator streamGenerator1 = new WikipediaStreamEventStreamGenerator(
+ new JsonEventSerializer(jsonMapper),
+ EVENTS_PER_SECOND,
+ CYCLE_PADDING_MS
+ );
+ streamGenerator1.run(
+ generatedTestConfig1.getStreamName(),
+ streamEventWriter,
+ TOTAL_NUMBER_OF_SECOND,
+ FIRST_EVENT_TIME
+ );
+ final StreamGenerator streamGenerator2 = new WikipediaStreamEventStreamGenerator(
+ new JsonEventSerializer(jsonMapper),
+ EVENTS_PER_SECOND,
+ CYCLE_PADDING_MS
+ );
+ streamGenerator2.run(
+ generatedTestConfig2.getStreamName(),
+ streamEventWriter,
+ TOTAL_NUMBER_OF_SECOND,
+ FIRST_EVENT_TIME
+ );
+ // Verify supervisors are healthy before termination
+ ITRetryUtil.retryUntil(
+ () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig1.getSupervisorId())),
+ true,
+ 10000,
+ 30,
+ "Waiting for supervisor1 to be healthy"
+ );
+ ITRetryUtil.retryUntil(
+ () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig2.getSupervisorId())),
+ true,
+ 10000,
+ 30,
+ "Waiting for supervisor2 to be healthy"
+ );
+ // Sleep for 10 secs to make sure that at least one cycle of supervisor auto cleanup duty ran
+ Thread.sleep(10000);
+
+ // Verify that supervisor specs exist
+ List<Object> specs1 = indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
+ Assert.assertNotNull(specs1);
+ Assert.assertFalse(specs1.isEmpty());
+
+ List<Object> specs2 = indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId());
+ Assert.assertNotNull(specs2);
+ Assert.assertFalse(specs2.isEmpty());
+
+ // Supervisor 1 should still be active while supervisor 2 is now terminated
+ LOG.info("Terminating supervisor 2");
+ indexer.terminateSupervisor(generatedTestConfig2.getSupervisorId());
+
+ // Verify that auto cleanup eventually removes supervisor spec after termination
+ ITRetryUtil.retryUntil(
+ () -> indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId()) == null,
+ true,
+ 10000,
+ 30,
+ "Waiting for supervisor spec 2 to be auto clean"
+ );
+ // Verify that supervisor 1 spec was not remove
+ specs1 = indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
+ Assert.assertNotNull(specs1);
+ Assert.assertFalse(specs1.isEmpty());
+ }
+ }
protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception
{
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
index 967ff52..936e7a1 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -71,4 +71,14 @@
{
doTestIndexDataWithStreamReshardSplit(false);
}
+
+ /**
+ * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+ * and supervisor maintained and scoped within this test only
+ */
+ @Test
+ public void testKafkaTerminatedSupervisorAutoCleanup() throws Exception
+ {
+ doTestTerminatedSupervisorAutoCleanup(false);
+ }
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
new file mode 100644
index 0000000..a676c18
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisIndexingServiceTest
+{
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "kinesis_parallelized";
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+ /**
+ * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+ * and supervisor maintained and scoped within this test only
+ */
+ @Test
+ public void testKinesisTerminatedSupervisorAutoCleanup() throws Exception
+ {
+ doTestTerminatedSupervisorAutoCleanup(false);
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
index 8a0b484..3f87f5d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
@@ -79,7 +79,7 @@
supervisorRemoved
)
);
- log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved);
+ log.info("Finished running KillSupervisors duty. Removed %,d supervisor specs", supervisorRemoved);
}
catch (Exception e) {
log.error(e, "Failed to kill terminated supervisor metadata");
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index e888e6b..58323dc 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -28,6 +28,7 @@
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.name.Names;
+import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.audit.AuditManager;
@@ -49,6 +50,8 @@
import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ExecutorServices;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@@ -64,6 +67,7 @@
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.audit.AuditManagerProvider;
import org.apache.druid.server.coordinator.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
@@ -289,6 +293,13 @@
Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
+
+ if (!beOverlord) {
+ // These are needed to deserialize SupervisorSpec for Supervisor Auto Cleanup
+ binder.bind(TaskStorage.class).toProvider(Providers.of(null));
+ binder.bind(TaskMaster.class).toProvider(Providers.of(null));
+ binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
+ }
}
@Provides