Fix the task id creation in CompactionTask (#10445)
* Fix the task id creation in CompactionTask
* review comments
* Ignore test for range partitioning and segment lock
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 62a9f26..ba2502f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -32,6 +32,7 @@
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -361,10 +362,14 @@
// a new Appenderator on its own instead. As a result, they should use different sequence names to allocate
// new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details.
// In this case, we use different fake IDs for each created index task.
- final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1
- ? createIndexTaskSpecId(i)
- : getId();
- return newTask(subtaskId, ingestionSpecs.get(i));
+ ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
+ InputSource inputSource = ingestionSpec.getIOConfig().getNonNullInputSource(
+ ingestionSpec.getDataSchema().getParser()
+ );
+ final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)
+ ? getId()
+ : createIndexTaskSpecId(i);
+ return newTask(subtaskId, ingestionSpec);
})
.collect(Collectors.toList());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index dd0e759..4a218a0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -466,18 +466,25 @@
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
}
- private boolean isParallelMode()
+ public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
{
+ if (null == tuningConfig) {
+ return false;
+ }
+ boolean useRangePartitions = useRangePartitions(tuningConfig);
// Range partitioning is not implemented for runSequential() (but hash partitioning is)
- int minRequiredNumConcurrentSubTasks = useRangePartitions() ? 1 : 2;
-
- return baseInputSource.isSplittable()
- && ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
+ int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
+ return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
}
- private boolean useRangePartitions()
+ private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
{
- return ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+ return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+ }
+
+ private boolean isParallelMode()
+ {
+ return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());
}
/**
@@ -512,7 +519,7 @@
*/
private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{
- return useRangePartitions()
+ return useRangePartitions(ingestionSchema.getTuningConfig())
? runRangePartitionMultiPhaseParallel(toolbox)
: runHashPartitionMultiPhaseParallel(toolbox);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 616dd24..7b7005b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -63,6 +63,7 @@
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -152,7 +153,7 @@
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
- // Expecte compaction state to exist as store compaction state by default
+ // Expect compaction state to exist as store compaction state by default
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@@ -161,9 +162,7 @@
public void testRunParallelWithHashPartitioningMatchCompactionState()
{
// Hash partitioning is not supported with segment lock yet
- if (lockGranularity == LockGranularity.SEGMENT) {
- return;
- }
+ Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
runIndexTask(null, true);
final Builder builder = new Builder(
@@ -182,7 +181,7 @@
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
- // Expecte compaction state to exist as store compaction state by default
+ // Expect compaction state to exist as store compaction state by default
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
@@ -192,9 +191,7 @@
public void testRunParallelWithRangePartitioning()
{
// Range partitioning is not supported with segment lock yet
- if (lockGranularity == LockGranularity.SEGMENT) {
- return;
- }
+ Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
runIndexTask(null, true);
final Builder builder = new Builder(
@@ -213,7 +210,36 @@
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
- // Expecte compaction state to exist as store compaction state by default
+ // Expect compaction state to exist as store compaction state by default
+ Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
+ Assert.assertEquals(expectedState, segment.getLastCompactionState());
+ }
+ }
+
+ @Test
+ public void testRunParallelWithRangePartitioningWithSingleTask()
+ {
+ // Range partitioning is not supported with segment lock yet
+ Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+ runIndexTask(null, true);
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ getSegmentLoaderFactory(),
+ RETRY_POLICY_FACTORY
+ );
+ final CompactionTask compactionTask = builder
+ .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+ .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true))
+ .build();
+
+ final Set<DataSegment> compactedSegments = runTask(compactionTask);
+ final CompactionState expectedState = new CompactionState(
+ new SingleDimensionPartitionsSpec(7, null, "dim", false),
+ compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
+ );
+ for (DataSegment segment : compactedSegments) {
+ // Expect compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
@@ -242,7 +268,7 @@
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
- // Expecte compaction state to exist as store compaction state by default
+ // Expect compaction state to exist as store compaction state by default
Assert.assertEquals(null, segment.getLastCompactionState());
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 60c286a..ef1db8a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -21,11 +21,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Ordering;
+import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IndexSpec;
@@ -36,6 +39,7 @@
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
+import org.easymock.EasyMock;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Interval;
@@ -55,6 +59,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+
@RunWith(Enclosed.class)
public class ParallelIndexSupervisorTaskTest
{
@@ -241,4 +248,64 @@
);
}
}
+
+ public static class staticUtilsTest
+ {
+ @Test
+ public void testIsParallelModeFalse_nullTuningConfig()
+ {
+ InputSource inputSource = mock(InputSource.class);
+ Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null));
+ }
+
+ @Test
+ public void testIsParallelModeFalse_rangePartition()
+ {
+ InputSource inputSource = mock(InputSource.class);
+ expect(inputSource.isSplittable()).andReturn(true).anyTimes();
+
+ ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+ expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
+ .anyTimes();
+ expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2);
+ EasyMock.replay(inputSource, tuningConfig);
+
+ Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+ Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+ Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+ }
+
+ @Test
+ public void testIsParallelModeFalse_notRangePartition()
+ {
+ InputSource inputSource = mock(InputSource.class);
+ expect(inputSource.isSplittable()).andReturn(true).anyTimes();
+
+ ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+ expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class))
+ .anyTimes();
+ expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3);
+ EasyMock.replay(inputSource, tuningConfig);
+
+ Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+ Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+ Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+ }
+
+ @Test
+ public void testIsParallelModeFalse_inputSourceNotSplittable()
+ {
+ InputSource inputSource = mock(InputSource.class);
+ expect(inputSource.isSplittable()).andReturn(false).anyTimes();
+
+ ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+ expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
+ .anyTimes();
+ expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3);
+ EasyMock.replay(inputSource, tuningConfig);
+
+ Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+ }
+ }
+
}