blob: 7970e1dd9b7f39bbdfacf8be396a2e4b68ed84b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SuppressWarnings("SameParameterValue")
abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
{
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig().setLegacy(false),
DefaultGenericQueryMetricsFactory.instance()
),
new ScanQueryEngine(),
new ScanQueryConfig()
);
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
AbstractMultiPhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi)
{
this.lockGranularity = lockGranularity;
this.useInputFormatApi = useInputFormatApi;
}
boolean isUseInputFormatApi()
{
return useInputFormatApi;
}
Set<DataSegment> runTestTask(
@Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable InputFormat inputFormat,
@Nullable ParseSpec parseSpec,
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks,
TaskState expectedTaskStatus
)
{
final ParallelIndexSupervisorTask task = newTask(
timestampSpec,
dimensionsSpec,
inputFormat,
parseSpec,
interval,
inputDir,
filter,
partitionsSpec,
maxNumConcurrentSubTasks
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task);
Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode());
return getIndexingServiceClient().getPublishedSegments(task);
}
private ParallelIndexSupervisorTask newTask(
@Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable InputFormat inputFormat,
@Nullable ParseSpec parseSpec,
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
);
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
new MaxSizeSplitHintSpec(1L), // set maxSplitSize to 1 so that each split has only one file.
partitionsSpec,
null,
null,
null,
true,
null,
null,
null,
null,
maxNumConcurrentSubTasks,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final ParallelIndexIngestionSpec ingestionSpec;
if (useInputFormatApi) {
Preconditions.checkArgument(parseSpec == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
new LocalInputSource(inputDir, filter),
inputFormat,
false
);
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
timestampSpec,
dimensionsSpec,
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
granularitySpec,
null
),
ioConfig,
tuningConfig
);
} else {
Preconditions.checkArgument(inputFormat == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, filter, null),
false
);
//noinspection unchecked
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
getObjectMapper().convertValue(
new StringInputRowParser(parseSpec, null),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
granularitySpec,
null,
getObjectMapper()
),
ioConfig,
tuningConfig
);
}
// set up test tools
return new ParallelIndexSupervisorTask(
null,
null,
null,
ingestionSpec,
Collections.emptyMap(),
null,
null,
null,
null,
null
);
}
List<ScanResultValue> querySegment(DataSegment dataSegment, List<String> columns, File tempSegmentDir)
{
Segment segment = loadSegment(dataSegment, tempSegmentDir);
final QueryRunner<ScanResultValue> runner = SCAN_QUERY_RUNNER_FACTORY.createRunner(segment);
return runner.run(
QueryPlus.wrap(
new ScanQuery(
new TableDataSource("dataSource"),
new SpecificSegmentSpec(
new SegmentDescriptor(
dataSegment.getInterval(),
dataSegment.getVersion(),
dataSegment.getShardSpec().getPartitionNum()
)
),
null,
null,
0,
0,
null,
null,
columns,
false,
null
)
)
).toList();
}
private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir)
{
final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper())
.manufacturate(tempSegmentDir);
try {
return loader.getSegment(dataSegment, false);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
}
}