Fix ingestion task failure when no input split to process (#11553)
* fix ingestion task failure when no input split to process
* add IT
* fix IT
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index b34241a..890cb90 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -289,13 +289,17 @@
@Override
public void collectReport(SubTaskReportType report)
{
+ // This method is only called when there is a subtask sending its report.
+ // Since TaskMonitor is responsible for spawning subtasks, the taskMonitor cannot be null if we have subtask sending report
+ // This null check is to ensure that the contract mentioned above is not broken
+ assert taskMonitor != null;
taskMonitor.collectReport(report);
}
@Override
public Map<String, SubTaskReportType> getReports()
{
- return taskMonitor.getReports();
+ return taskMonitor == null ? Collections.emptyMap() : taskMonitor.getReports();
}
@Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 7b9ac3d..f04559f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -81,6 +81,7 @@
}
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
+ private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
@@ -368,7 +369,8 @@
null,
null,
null
- )
+ ),
+ VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
@@ -394,6 +396,24 @@
}
@Test
+ public void testRunParallelWithNoInputSplitToProcess()
+ {
+ // The input source filter on this task does not match any input
+ // Hence, the this task will has no input split to process
+ final ParallelIndexSupervisorTask task = newTask(
+ Intervals.of("2017-12/P1M"),
+ Granularities.DAY,
+ true,
+ true,
+ AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+ "non_existing_file_filter"
+ );
+ task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
+ // Task state should still be SUCCESS even if no input split to process
+ Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
+ }
+
+ @Test
public void testOverwriteAndAppend()
{
final Interval interval = Intervals.of("2017-12/P1M");
@@ -437,7 +457,8 @@
segmentGranularity,
appendToExisting,
splittableInputSource,
- AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
+ AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+ VALID_INPUT_SOURCE_FILTER
);
}
@@ -446,7 +467,8 @@
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource,
- ParallelIndexTuningConfig tuningConfig
+ ParallelIndexTuningConfig tuningConfig,
+ String inputSourceFilter
)
{
// set up ingestion spec
@@ -469,7 +491,7 @@
),
new ParallelIndexIOConfig(
null,
- new SettableSplittableLocalInputSource(inputDir, "test_*", splittableInputSource),
+ new SettableSplittableLocalInputSource(inputDir, inputSourceFilter, splittableInputSource),
DEFAULT_INPUT_FORMAT,
appendToExisting,
null
@@ -499,7 +521,7 @@
getObjectMapper()
),
new ParallelIndexIOConfig(
- new LocalFirehoseFactory(inputDir, "test_*", null),
+ new LocalFirehoseFactory(inputDir, inputSourceFilter, null),
appendToExisting
),
tuningConfig
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index acab50d..6dca729 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -302,7 +302,7 @@
}
}
- private void submitTaskAndWait(
+ protected void submitTaskAndWait(
String taskSpec,
String dataSourceName,
boolean waitForNewVersion,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index ec9c753..833a0ac 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -170,6 +170,35 @@
}
@Test
+ public void testReIndexWithNonExistingDatasource() throws Exception
+ {
+ Pair<Boolean, Boolean> dummyPair = new Pair<>(false, false);
+ final String fullBaseDatasourceName = "nonExistingDatasource2904";
+ final String fullReindexDatasourceName = "newDatasource123";
+
+ String taskSpec = StringUtils.replace(
+ getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
+ "%%DATASOURCE%%",
+ fullBaseDatasourceName
+ );
+ taskSpec = StringUtils.replace(
+ taskSpec,
+ "%%REINDEX_DATASOURCE%%",
+ fullReindexDatasourceName
+ );
+
+ // This method will also verify task is successful after task finish running
+ // We expect task to be successful even if the datasource to reindex does not exist
+ submitTaskAndWait(
+ taskSpec,
+ fullReindexDatasourceName,
+ false,
+ false,
+ dummyPair
+ );
+ }
+
+ @Test
public void testMERGEIndexData() throws Exception
{
final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";