Fix ingestion task failure when no input split to process (#11553)

* fix ingestion task failure when no input split to process

* add IT

* fix IT
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index b34241a..890cb90 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -289,13 +289,17 @@
   @Override
   public void collectReport(SubTaskReportType report)
   {
+    // This method is only called when there is a subtask sending its report.
+    // Since TaskMonitor is responsible for spawning subtasks, the taskMonitor cannot be null if we have subtask sending report
+    // This null check is to ensure that the contract mentioned above is not broken
+    assert taskMonitor != null;
     taskMonitor.collectReport(report);
   }
 
   @Override
   public Map<String, SubTaskReportType> getReports()
   {
-    return taskMonitor.getReports();
+    return taskMonitor == null ? Collections.emptyMap() : taskMonitor.getReports();
   }
 
   @Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 7b9ac3d..f04559f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -81,6 +81,7 @@
   }
 
   private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
+  private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
 
   private final LockGranularity lockGranularity;
   private final boolean useInputFormatApi;
@@ -368,7 +369,8 @@
             null,
             null,
             null
-        )
+        ),
+        VALID_INPUT_SOURCE_FILTER
     );
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
     Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
@@ -394,6 +396,24 @@
   }
 
   @Test
+  public void testRunParallelWithNoInputSplitToProcess()
+  {
+    // The input source filter on this task does not match any input
+    // Hence, the this task will has no input split to process
+    final ParallelIndexSupervisorTask task = newTask(
+        Intervals.of("2017-12/P1M"),
+        Granularities.DAY,
+        true,
+        true,
+        AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+        "non_existing_file_filter"
+    );
+    task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
+    // Task state should still be SUCCESS even if no input split to process
+    Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
+  }
+
+  @Test
   public void testOverwriteAndAppend()
   {
     final Interval interval = Intervals.of("2017-12/P1M");
@@ -437,7 +457,8 @@
         segmentGranularity,
         appendToExisting,
         splittableInputSource,
-        AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
+        AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+        VALID_INPUT_SOURCE_FILTER
     );
   }
 
@@ -446,7 +467,8 @@
       Granularity segmentGranularity,
       boolean appendToExisting,
       boolean splittableInputSource,
-      ParallelIndexTuningConfig tuningConfig
+      ParallelIndexTuningConfig tuningConfig,
+      String inputSourceFilter
   )
   {
     // set up ingestion spec
@@ -469,7 +491,7 @@
           ),
           new ParallelIndexIOConfig(
               null,
-              new SettableSplittableLocalInputSource(inputDir, "test_*", splittableInputSource),
+              new SettableSplittableLocalInputSource(inputDir, inputSourceFilter, splittableInputSource),
               DEFAULT_INPUT_FORMAT,
               appendToExisting,
               null
@@ -499,7 +521,7 @@
               getObjectMapper()
           ),
           new ParallelIndexIOConfig(
-              new LocalFirehoseFactory(inputDir, "test_*", null),
+              new LocalFirehoseFactory(inputDir, inputSourceFilter, null),
               appendToExisting
           ),
           tuningConfig
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index acab50d..6dca729 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -302,7 +302,7 @@
     }
   }
 
-  private void submitTaskAndWait(
+  protected void submitTaskAndWait(
       String taskSpec,
       String dataSourceName,
       boolean waitForNewVersion,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index ec9c753..833a0ac 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -170,6 +170,35 @@
   }
 
   @Test
+  public void testReIndexWithNonExistingDatasource() throws Exception
+  {
+    Pair<Boolean, Boolean> dummyPair = new Pair<>(false, false);
+    final String fullBaseDatasourceName = "nonExistingDatasource2904";
+    final String fullReindexDatasourceName = "newDatasource123";
+
+    String taskSpec = StringUtils.replace(
+        getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
+        "%%DATASOURCE%%",
+        fullBaseDatasourceName
+    );
+    taskSpec = StringUtils.replace(
+        taskSpec,
+        "%%REINDEX_DATASOURCE%%",
+        fullReindexDatasourceName
+    );
+
+    // This method will also verify task is successful after task finish running
+    // We expect task to be successful even if the datasource to reindex does not exist
+    submitTaskAndWait(
+        taskSpec,
+        fullReindexDatasourceName,
+        false,
+        false,
+        dummyPair
+    );
+  }
+
+  @Test
   public void testMERGEIndexData() throws Exception
   {
     final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";