blob: 616dd24978ec156c70bba4243bf8c14b93a018c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@RunWith(Parameterized.class)
public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervisorTaskTest
{
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK},
new Object[]{LockGranularity.SEGMENT}
);
}
private static final String DATA_SOURCE = "test";
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2014-01-01/2014-01-02");
private final LockGranularity lockGranularity;
private File inputDir;
public CompactionTaskParallelRunTest(LockGranularity lockGranularity)
{
this.lockGranularity = lockGranularity;
}
@Before
public void setup() throws IOException
{
getObjectMapper().registerSubtypes(ParallelIndexTuningConfig.class, DruidInputSource.class);
inputDir = temporaryFolder.newFolder();
final File tmpFile = File.createTempFile("druid", "index", inputDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
writer.write("2014-01-01T01:00:20Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,2\n");
writer.write("2014-01-01T01:00:20Z,c,3\n");
writer.write("2014-01-01T02:00:30Z,a,1\n");
writer.write("2014-01-01T02:00:30Z,b,2\n");
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
}
@Test
public void testRunParallelWithDynamicPartitioningMatchCompactionState()
{
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
final CompactionState expectedState = new CompactionState(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
Assert.assertSame(
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
// Expecte compaction state to exist as store compaction state by default
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@Test
public void testRunParallelWithHashPartitioningMatchCompactionState()
{
// Hash partitioning is not supported with segment lock yet
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(newTuningConfig(new HashedPartitionsSpec(null, 3, null), 2, true))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
final CompactionState expectedState = new CompactionState(
new HashedPartitionsSpec(null, 3, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@Test
public void testRunParallelWithRangePartitioning()
{
// Range partitioning is not supported with segment lock yet
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 2, true))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
final CompactionState expectedState = new CompactionState(
new SingleDimensionPartitionsSpec(7, null, "dim", false),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@Test
public void testRunCompactionStateNotStoreIfContextSetToFalse()
{
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, false))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
for (DataSegment segment : compactedSegments) {
Assert.assertSame(
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
// Expecte compaction state to exist as store compaction state by default
Assert.assertEquals(null, segment.getLastCompactionState());
}
}
@Test
public void testCompactHashAndDynamicPartitionedSegments()
{
runIndexTask(new HashedPartitionsSpec(null, 2, null), false);
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(
runTask(compactionTask)
);
Assert.assertEquals(3, intervalToSegments.size());
Assert.assertEquals(
ImmutableSet.of(
Intervals.of("2014-01-01T00/PT1H"),
Intervals.of("2014-01-01T01/PT1H"),
Intervals.of("2014-01-01T02/PT1H")
),
intervalToSegments.keySet()
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final List<DataSegment> segmentsInInterval = entry.getValue();
Assert.assertEquals(1, segmentsInInterval.size());
final ShardSpec shardSpec = segmentsInInterval.get(0).getShardSpec();
if (lockGranularity == LockGranularity.TIME_CHUNK) {
Assert.assertSame(NumberedShardSpec.class, shardSpec.getClass());
final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) shardSpec;
Assert.assertEquals(0, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getNumCorePartitions());
} else {
Assert.assertSame(NumberedOverwriteShardSpec.class, shardSpec.getClass());
final NumberedOverwriteShardSpec numberedShardSpec = (NumberedOverwriteShardSpec) shardSpec;
Assert.assertEquals(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getAtomicUpdateGroupSize());
}
}
}
@Test
public void testCompactRangeAndDynamicPartitionedSegments()
{
runIndexTask(new SingleDimensionPartitionsSpec(2, null, "dim", false), false);
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(
runTask(compactionTask)
);
Assert.assertEquals(3, intervalToSegments.size());
Assert.assertEquals(
ImmutableSet.of(
Intervals.of("2014-01-01T00/PT1H"),
Intervals.of("2014-01-01T01/PT1H"),
Intervals.of("2014-01-01T02/PT1H")
),
intervalToSegments.keySet()
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final List<DataSegment> segmentsInInterval = entry.getValue();
Assert.assertEquals(1, segmentsInInterval.size());
final ShardSpec shardSpec = segmentsInInterval.get(0).getShardSpec();
if (lockGranularity == LockGranularity.TIME_CHUNK) {
Assert.assertSame(NumberedShardSpec.class, shardSpec.getClass());
final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) shardSpec;
Assert.assertEquals(0, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getNumCorePartitions());
} else {
Assert.assertSame(NumberedOverwriteShardSpec.class, shardSpec.getClass());
final NumberedOverwriteShardSpec numberedShardSpec = (NumberedOverwriteShardSpec) shardSpec;
Assert.assertEquals(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getAtomicUpdateGroupSize());
}
}
}
@Test
public void testDruidInputSourceCreateSplitsWithIndividualSplits()
{
runIndexTask(null, true);
List<InputSplit<List<WindowedSegmentId>>> splits = Lists.newArrayList(
DruidInputSource.createSplits(
getCoordinatorClient(),
RETRY_POLICY_FACTORY,
DATA_SOURCE,
INTERVAL_TO_INDEX,
new SegmentsSplitHintSpec(null, 1)
)
);
List<DataSegment> segments = new ArrayList<>(
getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(
DATA_SOURCE,
ImmutableList.of(INTERVAL_TO_INDEX)
)
);
Set<String> segmentIdsFromSplits = new HashSet<>();
Set<String> segmentIdsFromCoordinator = new HashSet<>();
Assert.assertEquals(segments.size(), splits.size());
for (int i = 0; i < segments.size(); i++) {
segmentIdsFromCoordinator.add(segments.get(i).getId().toString());
segmentIdsFromSplits.add(splits.get(i).get().get(0).getSegmentId());
}
Assert.assertEquals(segmentIdsFromCoordinator, segmentIdsFromSplits);
}
private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting)
{
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
new LocalInputSource(inputDir, "druid*"),
new CsvInputFormat(
Arrays.asList("ts", "dim", "val"),
"|",
null,
false,
0
),
appendToExisting
);
ParallelIndexTuningConfig tuningConfig = newTuningConfig(partitionsSpec, 2, !appendToExisting);
ParallelIndexSupervisorTask indexTask = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATA_SOURCE,
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))),
new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
ImmutableList.of(INTERVAL_TO_INDEX)
),
null
),
ioConfig,
tuningConfig
),
null
);
runTask(indexTask);
}
private Set<DataSegment> runTask(Task task)
{
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
return getIndexingServiceClient().getPublishedSegments(task);
}
}