blob: b7f107d07255d2a6266eb774285d6b5e7e4aaa8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@RunWith(Parameterized.class)
public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
{
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, true}
);
}
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
private File inputDir;
public SinglePhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi)
{
this.lockGranularity = lockGranularity;
this.useInputFormatApi = useInputFormatApi;
}
@Before
public void setup() throws IOException
{
inputDir = temporaryFolder.newFolder("data");
// set up data
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 24 + i, i));
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i));
}
}
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i));
}
}
getObjectMapper().registerSubtypes(SettableSplittableLocalInputSource.class);
}
@After
public void teardown()
{
temporaryFolder.delete();
}
@Test
public void testIsReady() throws Exception
{
final ParallelIndexSupervisorTask task = newTask(INTERVAL_TO_INDEX, false, true);
final TaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
final SinglePhaseParallelIndexTaskRunner runner = task.createSinglePhaseTaskRunner(toolbox);
final Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator = runner.subTaskSpecIterator();
while (subTaskSpecIterator.hasNext()) {
final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) subTaskSpecIterator.next();
final SinglePhaseSubTask subTask = new SinglePhaseSubTask(
null,
spec.getGroupId(),
null,
spec.getSupervisorTaskId(),
0,
spec.getIngestionSpec(),
spec.getContext()
);
final TaskActionClient subTaskActionClient = createActionClient(subTask);
prepareTaskForLocking(subTask);
Assert.assertTrue(subTask.isReady(subTaskActionClient));
}
}
private void runTestTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
// The task could run differently between when appendToExisting is false and true even when this is an initial write
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(
task,
interval == null ? LockGranularity.TIME_CHUNK : lockGranularity,
appendToExisting,
originalSegmentsIfAppend
);
}
private void runOverwriteTask(
@Nullable Interval interval,
Granularity segmentGranularity,
LockGranularity actualLockGranularity
)
{
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpecAfterOverwrite(task, actualLockGranularity);
}
private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity)
{
// Ingest all data.
runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList());
final Collection<DataSegment> allSegments = new HashSet<>(
inputInterval == null
? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
: getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE)
);
// Reingest the same data. Each segment should get replaced by a segment with a newer version.
final LockGranularity actualLockGranularity;
if (inputInterval == null) {
actualLockGranularity = LockGranularity.TIME_CHUNK;
} else {
actualLockGranularity = secondSegmentGranularity.equals(Granularities.DAY)
? lockGranularity
: LockGranularity.TIME_CHUNK;
}
runOverwriteTask(inputInterval, secondSegmentGranularity, actualLockGranularity);
// Verify that the segment has been replaced.
final Collection<DataSegment> newSegments =
inputInterval == null
? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
: getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE);
Assert.assertFalse(newSegments.isEmpty());
allSegments.addAll(newSegments);
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(allSegments);
final Interval timelineInterval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(
timelineInterval,
Partitions.ONLY_COMPLETE
);
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
private void assertShardSpec(
ParallelIndexSupervisorTask task,
LockGranularity actualLockGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
final Collection<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
if (!appendToExisting && actualLockGranularity == LockGranularity.TIME_CHUNK) {
// Initial write
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
// Append or initial write with segment lock
final Map<Interval, List<DataSegment>> intervalToOriginalSegments = SegmentUtils.groupSegmentsByInterval(
originalSegmentsIfAppend
);
for (DataSegment segment : segments) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
final List<DataSegment> originalSegmentsInInterval = intervalToOriginalSegments.get(segment.getInterval());
final int expectedNumCorePartitions =
originalSegmentsInInterval == null || originalSegmentsInInterval.isEmpty()
? 0
: originalSegmentsInInterval.get(0).getShardSpec().getNumCorePartitions();
Assert.assertEquals(expectedNumCorePartitions, shardSpec.getNumCorePartitions());
}
}
}
private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask task, LockGranularity actualLockGranularity)
{
final Collection<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
if (actualLockGranularity != LockGranularity.SEGMENT) {
// Check the core partition set in the shardSpec
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass());
final NumberedOverwriteShardSpec shardSpec = (NumberedOverwriteShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getAtomicUpdateGroupSize());
}
}
}
}
@Test
public void testWithoutInterval()
{
testRunAndOverwrite(null, Granularities.DAY);
}
@Test()
public void testRunInParallel()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
}
@Test
public void testWithoutIntervalWithDifferentSegmentGranularity()
{
testRunAndOverwrite(null, Granularities.MONTH);
}
@Test()
public void testRunInParallelWithDifferentSegmentGranularity()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.MONTH);
}
@Test
public void testRunInSequential()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(interval, appendToExisting, false);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
@Test
public void testPublishEmptySegments()
{
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2020-12/P1M"), false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testWith1MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
interval,
Granularities.DAY,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
@Test
public void testAppendToExisting()
{
final Interval interval = Intervals.of("2017-12/P1M");
runTestTask(interval, Granularities.DAY, true, Collections.emptyList());
final Collection<DataSegment> oldSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(interval, Granularities.DAY, true, oldSegments);
final Collection<DataSegment> newSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(newSegments.containsAll(oldSegments));
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(newSegments);
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
@Test
public void testOverwriteAndAppend()
{
final Interval interval = Intervals.of("2017-12/P1M");
testRunAndOverwrite(interval, Granularities.DAY);
final Collection<DataSegment> beforeAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(
interval,
Granularities.DAY,
true,
beforeAppendSegments
);
final Collection<DataSegment> afterAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(afterAppendSegments.containsAll(beforeAppendSegments));
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline
.forSegments(afterAppendSegments);
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
boolean appendToExisting,
boolean splittableInputSource
)
{
return newTask(interval, Granularities.DAY, appendToExisting, splittableInputSource);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource
)
{
return newTask(
interval,
segmentGranularity,
appendToExisting,
splittableInputSource,
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource,
ParallelIndexTuningConfig tuningConfig
)
{
// set up ingestion spec
final ParallelIndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC,
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
segmentGranularity,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
),
null
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, "test_*", splittableInputSource),
DEFAULT_INPUT_FORMAT,
appendToExisting
),
tuningConfig
);
} else {
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
getObjectMapper().convertValue(
new StringInputRowParser(
DEFAULT_PARSE_SPEC,
null
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
segmentGranularity,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
),
null,
getObjectMapper()
),
new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, "test_*", null),
appendToExisting
),
tuningConfig
);
}
// set up test tools
return new ParallelIndexSupervisorTask(
null,
null,
null,
ingestionSpec,
Collections.emptyMap()
);
}
private static class SettableSplittableLocalInputSource extends LocalInputSource
{
private final boolean splittableInputSource;
@JsonCreator
private SettableSplittableLocalInputSource(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
@JsonProperty("splittableInputSource") boolean splittableInputSource
)
{
super(baseDir, filter);
this.splittableInputSource = splittableInputSource;
}
@JsonProperty
public boolean isSplittableInputSource()
{
return splittableInputSource;
}
@Override
public boolean isSplittable()
{
return splittableInputSource;
}
}
}