blob: 5ad774386b65815a0155f6c9f0de7636ed8d0241 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, true}
);
}
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
private File inputDir;
public SinglePhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi)
{
super(DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE);
this.lockGranularity = lockGranularity;
this.useInputFormatApi = useInputFormatApi;
}
@Before
public void setup() throws IOException
{
inputDir = temporaryFolder.newFolder("data");
// set up data
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 24 + i, i));
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i));
if (i == 0) {
// thrown away due to timestamp outside interval
writer.write(StringUtils.format("2012-12-%d,%d th test file\n", 25 + i, i));
// unparseable metric value
writer.write(StringUtils.format("2017-12-%d,%d th test file,badval\n", 25 + i, i));
// unparseable row
writer.write(StringUtils.format("2017unparseable\n"));
}
}
}
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i));
}
}
getObjectMapper().registerSubtypes(SettableSplittableLocalInputSource.class);
}
@After
public void teardown()
{
temporaryFolder.delete();
}
@Test
public void testIsReady() throws Exception
{
final ParallelIndexSupervisorTask task = newTask(INTERVAL_TO_INDEX, false, true);
final TaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
final SinglePhaseParallelIndexTaskRunner runner = task.createSinglePhaseTaskRunner(toolbox);
final Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator = runner.subTaskSpecIterator();
while (subTaskSpecIterator.hasNext()) {
final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) subTaskSpecIterator.next();
final SinglePhaseSubTask subTask = new SinglePhaseSubTask(
null,
spec.getGroupId(),
null,
spec.getSupervisorTaskId(),
spec.getId(),
0,
spec.getIngestionSpec(),
spec.getContext()
);
final TaskActionClient subTaskActionClient = createActionClient(subTask);
prepareTaskForLocking(subTask);
Assert.assertTrue(subTask.isReady(subTaskActionClient));
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
LocalInputSource.TYPE_KEY,
ResourceType.EXTERNAL
), Action.READ)),
subTask.getInputSourceResources()
);
}
}
private ParallelIndexSupervisorTask runTestTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
// The task could run differently between when appendToExisting is false and true even when this is an initial write
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(
task,
interval == null ? LockGranularity.TIME_CHUNK : lockGranularity,
appendToExisting,
originalSegmentsIfAppend
);
TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId());
return (ParallelIndexSupervisorTask) taskContainer.getTask();
}
private ParallelIndexSupervisorTask runOverwriteTask(
@Nullable Interval interval,
Granularity segmentGranularity,
LockGranularity actualLockGranularity
)
{
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpecAfterOverwrite(task, actualLockGranularity);
TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId());
return (ParallelIndexSupervisorTask) taskContainer.getTask();
}
private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity)
{
// Ingest all data.
runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList());
final Collection<DataSegment> allSegments = new HashSet<>(
inputInterval == null
? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
: getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE)
);
// Reingest the same data. Each segment should get replaced by a segment with a newer version.
final LockGranularity actualLockGranularity;
if (inputInterval == null) {
actualLockGranularity = LockGranularity.TIME_CHUNK;
} else {
actualLockGranularity = secondSegmentGranularity.equals(Granularities.DAY)
? lockGranularity
: LockGranularity.TIME_CHUNK;
}
runOverwriteTask(inputInterval, secondSegmentGranularity, actualLockGranularity);
// Verify that the segment has been replaced.
final Collection<DataSegment> newSegments =
inputInterval == null
? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE)
: getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", inputInterval, Segments.ONLY_VISIBLE);
Assert.assertFalse(newSegments.isEmpty());
allSegments.addAll(newSegments);
final SegmentTimeline timeline = SegmentTimeline.forSegments(allSegments);
final Interval timelineInterval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(
timelineInterval,
Partitions.ONLY_COMPLETE
);
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
private void assertShardSpec(
ParallelIndexSupervisorTask task,
LockGranularity actualLockGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
final DataSegmentsWithSchemas dataSegmentsWithSchemas = getIndexingServiceClient().getSegmentAndSchemas(task);
verifySchema(dataSegmentsWithSchemas);
final Collection<DataSegment> segments = dataSegmentsWithSchemas.getSegments();
if (!appendToExisting && actualLockGranularity == LockGranularity.TIME_CHUNK) {
// Initial write
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
// Append or initial write with segment lock
final Map<Interval, List<DataSegment>> intervalToOriginalSegments = SegmentUtils.groupSegmentsByInterval(
originalSegmentsIfAppend
);
for (DataSegment segment : segments) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
final List<DataSegment> originalSegmentsInInterval = intervalToOriginalSegments.get(segment.getInterval());
final int expectedNumCorePartitions =
originalSegmentsInInterval == null || originalSegmentsInInterval.isEmpty()
? 0
: originalSegmentsInInterval.get(0).getShardSpec().getNumCorePartitions();
Assert.assertEquals(expectedNumCorePartitions, shardSpec.getNumCorePartitions());
}
}
}
private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask task, LockGranularity actualLockGranularity)
{
DataSegmentsWithSchemas dataSegmentsWithSchemas = getIndexingServiceClient().getSegmentAndSchemas(task);
verifySchema(dataSegmentsWithSchemas);
final Collection<DataSegment> segments = dataSegmentsWithSchemas.getSegments();
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
if (actualLockGranularity != LockGranularity.SEGMENT) {
// Check the core partition set in the shardSpec
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getNumCorePartitions());
}
}
} else {
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass());
final NumberedOverwriteShardSpec shardSpec = (NumberedOverwriteShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getAtomicUpdateGroupSize());
}
}
}
}
@Test
public void testWithoutInterval()
{
testRunAndOverwrite(null, Granularities.DAY);
}
@Test
public void testRunInParallel()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
}
@Test
public void testGetRunningTaskReports() throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
Intervals.of("2017-12/P1M"),
Granularities.DAY,
false,
true
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
task.addToContext(DISABLE_TASK_INJECT_CONTEXT_KEY, true);
// Keep tasks running until finish is triggered
getIndexingServiceClient().keepTasksRunning();
getIndexingServiceClient().runTask(task.getId(), task);
// Allow enough time for sub-tasks to be in running state
Thread.sleep(2000);
// Fetch and verify live reports
TaskReport.ReportMap reportMap = task.doGetLiveReports(true);
IngestionStatsAndErrors statsAndErrors = ((IngestionStatsAndErrorsTaskReport)
reportMap.get("ingestionStatsAndErrors")).getPayload();
Map<String, Object> rowStats = statsAndErrors.getRowStats();
Assert.assertTrue(rowStats.containsKey("totals"));
getIndexingServiceClient().allowTasksToFinish();
TaskStatus taskStatus = getIndexingServiceClient().waitToFinish(task, 2, TimeUnit.MINUTES);
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
}
@Test
public void testRunInParallelIngestNullColumn()
{
if (!useInputFormatApi) {
return;
}
// Ingest all data.
final List<DimensionSchema> dimensionSchemas = DimensionsSpec.getDefaultSchemas(
Arrays.asList("ts", "unknownDim", "dim")
);
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2017-12/P1M"))
),
null
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, VALID_INPUT_SOURCE_FILTER, true),
DEFAULT_INPUT_FORMAT,
false,
null
),
DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
),
null
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
DataSegmentsWithSchemas dataSegmentsWithSchemas = getIndexingServiceClient().getSegmentAndSchemas(task);
verifySchema(dataSegmentsWithSchemas);
Set<DataSegment> segments = dataSegmentsWithSchemas.getSegments();
for (DataSegment segment : segments) {
for (int i = 0; i < dimensionSchemas.size(); i++) {
Assert.assertEquals(dimensionSchemas.get(i).getName(), segment.getDimensions().get(i));
}
}
}
@Test
public void testRunInParallelIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns()
{
if (!useInputFormatApi) {
return;
}
// Ingest all data.
final List<DimensionSchema> dimensionSchemas = DimensionsSpec.getDefaultSchemas(
Arrays.asList("ts", "unknownDim", "dim")
);
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2017-12/P1M"))
),
null
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, VALID_INPUT_SOURCE_FILTER, true),
DEFAULT_INPUT_FORMAT,
false,
null
),
DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
),
null
);
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
DataSegmentsWithSchemas dataSegmentsWithSchemas = getIndexingServiceClient().getSegmentAndSchemas(task);
verifySchema(dataSegmentsWithSchemas);
Set<DataSegment> segments = dataSegmentsWithSchemas.getSegments();
for (DataSegment segment : segments) {
Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
}
}
@Test
public void testRunInParallelTaskReports()
{
ParallelIndexSupervisorTask task = runTestTask(
Intervals.of("2017-12/P1M"),
Granularities.DAY,
false,
Collections.emptyList()
);
TaskReport.ReportMap actualReports = task.doGetLiveReports(true);
TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(
new ParseExceptionReport(
"{ts=2017unparseable}",
"unparseable",
ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
1L
),
new ParseExceptionReport(
"{ts=2017-12-25, dim=0 th test file, val=badval}",
"processedWithError",
ImmutableList.of("Unable to parse value[badval] for field[val]"),
1L
)
),
new RowIngestionMetersTotals(10, 335, 1, 1, 1)
);
compareTaskReports(expectedReports, actualReports);
}
//
// Ingest all data.
@Test
public void testWithoutIntervalWithDifferentSegmentGranularity()
{
testRunAndOverwrite(null, Granularities.MONTH);
}
@Test()
public void testRunInParallelWithDifferentSegmentGranularity()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.MONTH);
}
@Test
public void testRunInSequential()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(interval, appendToExisting, false);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
TaskContainer taskContainer = getIndexingServiceClient().getTaskContainer(task.getId());
final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask();
TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true);
final RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, 335, 1, 1, 1);
List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of(
new ParseExceptionReport(
"{ts=2017unparseable}",
"unparseable",
ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
1L
),
new ParseExceptionReport(
"{ts=2017-12-25, dim=0 th test file, val=badval}",
"processedWithError",
ImmutableList.of("Unable to parse value[badval] for field[val]"),
1L
)
);
TaskReport.ReportMap expectedReports;
if (useInputFormatApi) {
expectedReports = buildExpectedTaskReportSequential(
task.getId(),
expectedUnparseableEvents,
new RowIngestionMetersTotals(0, 0, 0, 0, 0),
expectedTotals
);
} else {
// when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses the single phase runner
// instead of sequential runner
expectedReports = buildExpectedTaskReportParallel(
task.getId(),
expectedUnparseableEvents,
expectedTotals
);
}
compareTaskReports(expectedReports, actualReports);
}
@Test
public void testPublishEmptySegments()
{
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2020-12/P1M"), false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testWith1MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
interval,
Granularities.DAY,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
@Test
public void testAppendToExisting()
{
final Interval interval = Intervals.of("2017-12/P1M");
runTestTask(interval, Granularities.DAY, true, Collections.emptyList());
final Collection<DataSegment> oldSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(interval, Granularities.DAY, true, oldSegments);
final Collection<DataSegment> newSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(newSegments.containsAll(oldSegments));
final SegmentTimeline timeline = SegmentTimeline.forSegments(newSegments);
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
@Test
public void testMultipleAppends()
{
final Interval interval = null;
final ParallelIndexSupervisorTask task = newTask(interval, Granularities.DAY, true, true);
final ParallelIndexSupervisorTask task2 = newTask(interval, Granularities.DAY, true, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
task.addToContext(Tasks.USE_SHARED_LOCK, true);
task2.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
task2.addToContext(Tasks.USE_SHARED_LOCK, true);
getIndexingServiceClient().runTask(task.getId(), task);
getIndexingServiceClient().runTask(task2.getId(), task2);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().waitToFinish(task, 1, TimeUnit.DAYS).getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().waitToFinish(task2, 1, TimeUnit.DAYS).getStatusCode());
}
@Test
public void testRunParallelWithNoInputSplitToProcess()
{
// The input source filter on this task does not match any input
// Hence, the this task will has no input split to process
final ParallelIndexSupervisorTask task = newTask(
Intervals.of("2017-12/P1M"),
Granularities.DAY,
true,
true,
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
"non_existing_file_filter"
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
// Task state should still be SUCCESS even if no input split to process
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testOverwriteAndAppend()
{
final Interval interval = Intervals.of("2017-12/P1M");
testRunAndOverwrite(interval, Granularities.DAY);
final Collection<DataSegment> beforeAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(
interval,
Granularities.DAY,
true,
beforeAppendSegments
);
final Collection<DataSegment> afterAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(afterAppendSegments.containsAll(beforeAppendSegments));
final SegmentTimeline timeline = SegmentTimeline.forSegments(afterAppendSegments);
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles);
}
@Test
public void testMaxLocksWith1MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
interval,
Granularities.DAY,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
1,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
0,
null
),
VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
if (lockGranularity.equals(LockGranularity.TIME_CHUNK)) {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"Number of locks exceeded maxAllowedLockCount [0]"
);
getIndexingServiceClient().runAndWait(task);
} else {
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
}
@Test
public void testMaxLocksWith2MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
interval,
Granularities.DAY,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
0,
null
),
VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
if (lockGranularity.equals(LockGranularity.TIME_CHUNK)) {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"Number of locks exceeded maxAllowedLockCount [0]"
);
getIndexingServiceClient().runAndWait(task);
} else {
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
}
@Test
public void testIngestBothExplicitAndImplicitDims() throws IOException
{
final Interval interval = Intervals.of("2017-12/P1M");
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "test_" + i + ".json").toPath(), StandardCharsets.UTF_8)) {
writer.write(
getObjectMapper().writeValueAsString(
ImmutableMap.of(
"ts",
StringUtils.format("2017-12-%d", 24 + i),
"implicitDim",
"implicit_" + i,
"explicitDim",
"explicit_" + i
)
)
);
writer.write(
getObjectMapper().writeValueAsString(
ImmutableMap.of(
"ts",
StringUtils.format("2017-12-%d", 25 + i),
"implicitDim",
"implicit_" + i,
"explicitDim",
"explicit_" + i
)
)
);
}
}
final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.builder()
.setDefaultSchemaDimensions(ImmutableList.of("ts", "explicitDim"))
.setIncludeAllDimensions(true)
.build(),
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(interval)
),
null
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, "*.json", true),
new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null,
null,
null
),
false,
null
),
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
),
null
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
DataSegmentsWithSchemas dataSegmentsWithSchemas = getIndexingServiceClient().getSegmentAndSchemas(task);
verifySchema(dataSegmentsWithSchemas);
Set<DataSegment> segments = dataSegmentsWithSchemas.getSegments();
for (DataSegment segment : segments) {
Assert.assertEquals(ImmutableList.of("ts", "explicitDim", "implicitDim"), segment.getDimensions());
}
}
@Test
public void testIngestBothExplicitAndImplicitDimsSchemaDiscovery() throws IOException
{
final Interval interval = Intervals.of("2017-12/P1M");
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "test_" + i + ".json").toPath(), StandardCharsets.UTF_8)) {
writer.write(
getObjectMapper().writeValueAsString(
ImmutableMap.of(
"ts",
StringUtils.format("2017-12-%d", 24 + i),
"implicitDim",
"implicit_" + i,
"explicitDim",
"explicit_" + i
)
)
);
writer.write(
getObjectMapper().writeValueAsString(
ImmutableMap.of(
"ts",
StringUtils.format("2017-12-%d", 25 + i),
"implicitDim",
"implicit_" + i,
"explicitDim",
"explicit_" + i
)
)
);
}
}
final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.builder()
.setDefaultSchemaDimensions(ImmutableList.of("ts", "explicitDim"))
.useSchemaDiscovery(true)
.build(),
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(interval)
),
null
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, "*.json", true),
new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null,
null,
null
),
false,
null
),
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
),
null
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
DataSegmentsWithSchemas dataSegmentsWithSchemas = getIndexingServiceClient().getSegmentAndSchemas(task);
verifySchema(dataSegmentsWithSchemas);
Set<DataSegment> segments = dataSegmentsWithSchemas.getSegments();
for (DataSegment segment : segments) {
Assert.assertEquals(ImmutableList.of("ts", "explicitDim", "implicitDim"), segment.getDimensions());
}
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
boolean appendToExisting,
boolean splittableInputSource
)
{
return newTask(interval, Granularities.DAY, appendToExisting, splittableInputSource);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource
)
{
return newTask(
interval,
segmentGranularity,
appendToExisting,
splittableInputSource,
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
VALID_INPUT_SOURCE_FILTER
);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource,
ParallelIndexTuningConfig tuningConfig,
String inputSourceFilter
)
{
// set up ingestion spec
final ParallelIndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC,
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
segmentGranularity,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
),
null
),
new ParallelIndexIOConfig(
null,
new SettableSplittableLocalInputSource(inputDir, inputSourceFilter, splittableInputSource),
DEFAULT_INPUT_FORMAT,
appendToExisting,
null
),
tuningConfig
);
} else {
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC,
DEFAULT_METRICS_SPEC,
new UniformGranularitySpec(
segmentGranularity,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
),
null
),
new ParallelIndexIOConfig(
null,
new LocalInputSource(inputDir, inputSourceFilter),
createInputFormatFromParseSpec(DEFAULT_PARSE_SPEC),
appendToExisting,
null
),
tuningConfig
);
}
// set up test tools
return new ParallelIndexSupervisorTask(
null,
null,
null,
ingestionSpec,
Collections.emptyMap()
);
}
private String getErrorMessageForUnparseableTimestamp()
{
return StringUtils.format(
"Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable} (Path: %s, Record: 5, Line: 5)",
new File(inputDir, "test_0").toURI()
);
}
private static class SettableSplittableLocalInputSource extends LocalInputSource
{
private final boolean splittableInputSource;
@JsonCreator
private SettableSplittableLocalInputSource(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
@JsonProperty("splittableInputSource") boolean splittableInputSource
)
{
super(baseDir, filter);
this.splittableInputSource = splittableInputSource;
}
@JsonProperty
public boolean isSplittableInputSource()
{
return splittableInputSource;
}
@Override
public boolean isSplittable()
{
return splittableInputSource;
}
}
}