Fix the task id creation in CompactionTask (#10445)

* Fix the task id creation in CompactionTask

* review comments

* Ignore test for range partitioning and segment lock
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 62a9f26..ba2502f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -32,6 +32,7 @@
 import org.apache.curator.shaded.com.google.common.base.Verify;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -361,10 +362,14 @@
           // a new Appenderator on its own instead. As a result, they should use different sequence names to allocate
           // new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details.
           // In this case, we use different fake IDs for each created index task.
-          final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1
-                                   ? createIndexTaskSpecId(i)
-                                   : getId();
-          return newTask(subtaskId, ingestionSpecs.get(i));
+          ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
+          InputSource inputSource = ingestionSpec.getIOConfig().getNonNullInputSource(
+              ingestionSpec.getDataSchema().getParser()
+          );
+          final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)
+                                   ? getId()
+                                   : createIndexTaskSpecId(i);
+          return newTask(subtaskId, ingestionSpec);
         })
         .collect(Collectors.toList());
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index dd0e759..4a218a0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -466,18 +466,25 @@
     registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
   }
 
-  private boolean isParallelMode()
+  public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
   {
+    if (null == tuningConfig) {
+      return false;
+    }
+    boolean useRangePartitions = useRangePartitions(tuningConfig);
     // Range partitioning is not implemented for runSequential() (but hash partitioning is)
-    int minRequiredNumConcurrentSubTasks = useRangePartitions() ? 1 : 2;
-
-    return baseInputSource.isSplittable()
-           && ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
+    int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
+    return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
   }
 
-  private boolean useRangePartitions()
+  private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
   {
-    return ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+    return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+  }
+
+  private boolean isParallelMode()
+  {
+    return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());
   }
 
   /**
@@ -512,7 +519,7 @@
    */
   private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
-    return useRangePartitions()
+    return useRangePartitions(ingestionSchema.getTuningConfig())
            ? runRangePartitionMultiPhaseParallel(toolbox)
            : runHashPartitionMultiPhaseParallel(toolbox);
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 616dd24..7b7005b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -63,6 +63,7 @@
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
 import org.joda.time.Interval;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -152,7 +153,7 @@
           lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
           segment.getShardSpec().getClass()
       );
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
@@ -161,9 +162,7 @@
   public void testRunParallelWithHashPartitioningMatchCompactionState()
   {
     // Hash partitioning is not supported with segment lock yet
-    if (lockGranularity == LockGranularity.SEGMENT) {
-      return;
-    }
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
     runIndexTask(null, true);
 
     final Builder builder = new Builder(
@@ -182,7 +181,7 @@
         compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
     );
     for (DataSegment segment : compactedSegments) {
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
@@ -192,9 +191,7 @@
   public void testRunParallelWithRangePartitioning()
   {
     // Range partitioning is not supported with segment lock yet
-    if (lockGranularity == LockGranularity.SEGMENT) {
-      return;
-    }
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
     runIndexTask(null, true);
 
     final Builder builder = new Builder(
@@ -213,7 +210,36 @@
         compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
     );
     for (DataSegment segment : compactedSegments) {
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
+      Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
+      Assert.assertEquals(expectedState, segment.getLastCompactionState());
+    }
+  }
+
+  @Test
+  public void testRunParallelWithRangePartitioningWithSingleTask()
+  {
+    // Range partitioning is not supported with segment lock yet
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+    runIndexTask(null, true);
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        getSegmentLoaderFactory(),
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true))
+        .build();
+
+    final Set<DataSegment> compactedSegments = runTask(compactionTask);
+    final CompactionState expectedState = new CompactionState(
+        new SingleDimensionPartitionsSpec(7, null, "dim", false),
+        compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
+    );
+    for (DataSegment segment : compactedSegments) {
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
@@ -242,7 +268,7 @@
           lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
           segment.getShardSpec().getClass()
       );
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertEquals(null, segment.getLastCompactionState());
     }
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 60c286a..ef1db8a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -21,11 +21,14 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Ordering;
+import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.segment.IndexSpec;
@@ -36,6 +39,7 @@
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.HashPartitionFunction;
+import org.easymock.EasyMock;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
@@ -55,6 +59,9 @@
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+
 @RunWith(Enclosed.class)
 public class ParallelIndexSupervisorTaskTest
 {
@@ -241,4 +248,64 @@
       );
     }
   }
+
+  public static class staticUtilsTest
+  {
+    @Test
+    public void testIsParallelModeFalse_nullTuningConfig()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null));
+    }
+
+    @Test
+    public void testIsParallelModeFalse_rangePartition()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      expect(inputSource.isSplittable()).andReturn(true).anyTimes();
+
+      ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+      expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
+                                                            .anyTimes();
+      expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2);
+      EasyMock.replay(inputSource, tuningConfig);
+
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+    }
+
+    @Test
+    public void testIsParallelModeFalse_notRangePartition()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      expect(inputSource.isSplittable()).andReturn(true).anyTimes();
+
+      ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+      expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class))
+                                                            .anyTimes();
+      expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3);
+      EasyMock.replay(inputSource, tuningConfig);
+
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+    }
+
+    @Test
+    public void testIsParallelModeFalse_inputSourceNotSplittable()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      expect(inputSource.isSplittable()).andReturn(false).anyTimes();
+
+      ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+      expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
+                                                            .anyTimes();
+      expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3);
+      EasyMock.replay(inputSource, tuningConfig);
+
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+    }
+  }
+
 }