blob: f04559f26c83e320cbdadfd9720dbfabcffba404 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@RunWith(Parameterized.class)
public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
{
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, true}
);
}
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
private File inputDir;
public SinglePhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi)
{
super(DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE);
this.lockGranularity = lockGranularity;
this.useInputFormatApi = useInputFormatApi;
}
@Before
public void setup() throws IOException
{
inputDir = temporaryFolder.newFolder("data");
// set up data
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 24 + i, i));
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i));
}
}
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i));
}
}
getObjectMapper().registerSubtypes(SettableSplittableLocalInputSource.class);
}
@After
public void teardown()
{
temporaryFolder.delete();
}
@Test
public void testIsReady() throws Exception
{
final ParallelIndexSupervisorTask task = newTask(INTERVAL_TO_INDEX, false, true);
final TaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
final SinglePhaseParallelIndexTaskRunner runner = task.createSinglePhaseTaskRunner(toolbox);
final Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator = runner.subTaskSpecIterator();
while (subTaskSpecIterator.hasNext()) {
final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) subTaskSpecIterator.next();
final SinglePhaseSubTask subTask = new SinglePhaseSubTask(
null,
spec.getGroupId(),
null,
spec.getSupervisorTaskId(),
spec.getId(),
0,
spec.getIngestionSpec(),
spec.getContext()
);
final TaskActionClient subTaskActionClient = createActionClient(subTask);
prepareTaskForLocking(subTask);
Assert.assertTrue(subTask.isReady(subTaskActionClient));
}
}
private void runTestTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
// The task could run differently between when appendToExisting is false and true even when this is an initial write
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(
task,
interval == null ? LockGranularity.TIME_CHUNK : lockGranularity,
appendToExisting,
originalSegmentsIfAppend
);
}
private void runOverwriteTask(
@Nullable Interval interval,
Granularity segmentGranularity,
LockGranularity actualLockGranularity
)
{
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpecAfterOverwrite(task, actualLockGranularity);
}
private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity)
{
// Ingest all data.
runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList());
final Collection<DataSegment> allSegments = new HashSet<>(
inputInterval == null
? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
: getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE)
);
// Reingest the same data. Each segment should get replaced by a segment with a newer version.
final LockGranularity actualLockGranularity;
if (inputInterval == null) {
actualLockGranularity = LockGranularity.TIME_CHUNK;
} else {
actualLockGranularity = secondSegmentGranularity.equals(Granularities.DAY)
? lockGranularity
: LockGranularity.TIME_CHUNK;
}
runOverwriteTask(inputInterval, secondSegmentGranularity, actualLockGranularity);
// Verify that the segment has been replaced.
final Collection<DataSegment> newSegments =
inputInterval == null
? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
: getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE);
Assert.assertFalse(newSegments.isEmpty());
allSegments.addAll(newSegments);
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(allSegments);
final Interval timelineInterval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(
timelineInterval,
Partitions.ONLY_COMPLETE
);
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
private void assertShardSpec(
ParallelIndexSupervisorTask task,
LockGranularity actualLockGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
final Collection<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
if (!appendToExisting && actualLockGranularity == LockGranularity.TIME_CHUNK) {
// Initial write
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
// Append or initial write with segment lock
final Map<Interval, List<DataSegment>> intervalToOriginalSegments = SegmentUtils.groupSegmentsByInterval(
originalSegmentsIfAppend
);
for (DataSegment segment : segments) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
final List<DataSegment> originalSegmentsInInterval = intervalToOriginalSegments.get(segment.getInterval());
final int expectedNumCorePartitions =
originalSegmentsInInterval == null || originalSegmentsInInterval.isEmpty()
? 0
: originalSegmentsInInterval.get(0).getShardSpec().getNumCorePartitions();
Assert.assertEquals(expectedNumCorePartitions, shardSpec.getNumCorePartitions());
}
}
}
private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask task, LockGranularity actualLockGranularity)
{
final Collection<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
if (actualLockGranularity != LockGranularity.SEGMENT) {
// Check the core partition set in the shardSpec
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass());
final NumberedOverwriteShardSpec shardSpec = (NumberedOverwriteShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getAtomicUpdateGroupSize());
}
}
}
}
@Test
public void testWithoutInterval()
{
testRunAndOverwrite(null, Granularities.DAY);
}
@Test()
public void testRunInParallel()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
}
@Test
public void testWithoutIntervalWithDifferentSegmentGranularity()
{
testRunAndOverwrite(null, Granularities.MONTH);
}
@Test()
public void testRunInParallelWithDifferentSegmentGranularity()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.MONTH);
}
@Test
public void testRunInSequential()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(interval, appendToExisting, false);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
@Test
public void testPublishEmptySegments()
{
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2020-12/P1M"), false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testWith1MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
interval,
Granularities.DAY,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
@Test
public void testAppendToExisting()
{
final Interval interval = Intervals.of("2017-12/P1M");
runTestTask(interval, Granularities.DAY, true, Collections.emptyList());
final Collection<DataSegment> oldSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(interval, Granularities.DAY, true, oldSegments);
final Collection<DataSegment> newSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(newSegments.containsAll(oldSegments));
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(newSegments);
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
@Test
public void testRunParallelWithNoInputSplitToProcess()
{
// The input source filter on this task does not match any input
// Hence, the this task will has no input split to process
final ParallelIndexSupervisorTask task = newTask(
Intervals.of("2017-12/P1M"),
Granularities.DAY,
true,
true,
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
"non_existing_file_filter"
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
// Task state should still be SUCCESS even if no input split to process
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testOverwriteAndAppend()
{
final Interval interval = Intervals.of("2017-12/P1M");
testRunAndOverwrite(interval, Granularities.DAY);
final Collection<DataSegment> beforeAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(
interval,
Granularities.DAY,
true,
beforeAppendSegments
);
final Collection<DataSegment> afterAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(afterAppendSegments.containsAll(beforeAppendSegments));
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline
.forSegments(afterAppendSegments);
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
boolean appendToExisting,
boolean splittableInputSource
)
{
return newTask(interval, Granularities.DAY, appendToExisting, splittableInputSource);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource
)
{
return newTask(
interval,
segmentGranularity,
appendToExisting,
splittableInputSource,
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
VALID_INPUT_SOURCE_FILTER
);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource,
ParallelIndexTuningConfig tuningConfig,
String inputSourceFilter
)
{
// set up ingestion spec
final ParallelIndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC,
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
segmentGranularity,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
),
null
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, inputSourceFilter, splittableInputSource),
DEFAULT_INPUT_FORMAT,
appendToExisting,
null
),
tuningConfig
);
} else {
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
getObjectMapper().convertValue(
new StringInputRowParser(
DEFAULT_PARSE_SPEC,
null
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
segmentGranularity,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
),
null,
getObjectMapper()
),
new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, inputSourceFilter, null),
appendToExisting
),
tuningConfig
);
}
// set up test tools
return new ParallelIndexSupervisorTask(
null,
null,
null,
ingestionSpec,
Collections.emptyMap()
);
}
private static class SettableSplittableLocalInputSource extends LocalInputSource
{
private final boolean splittableInputSource;
@JsonCreator
private SettableSplittableLocalInputSource(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
@JsonProperty("splittableInputSource") boolean splittableInputSource
)
{
super(baseDir, filter);
this.splittableInputSource = splittableInputSource;
}
@JsonProperty
public boolean isSplittableInputSource()
{
return splittableInputSource;
}
@Override
public boolean isSplittable()
{
return splittableInputSource;
}
}
}