blob: 45f40bda2501c2e24753068eb2839c0daa882600 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
{
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
);
private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
TIMESTAMP_SPEC,
DIMENSIONS_SPEC,
null,
Arrays.asList("ts", "dim1", "dim2", "val"),
false,
0
);
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
Arrays.asList("ts", "dim1", "dim2", "val"),
null,
false,
false,
0
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 1},
new Object[]{LockGranularity.SEGMENT, true, 2}
);
}
private final int maxNumConcurrentSubTasks;
private File inputDir;
public HashPartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
}
@Before
public void setup() throws IOException
{
inputDir = temporaryFolder.newFolder("data");
// set up data
for (int i = 0; i < 10; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
for (int j = 0; j < 10; j++) {
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i));
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i));
}
}
}
for (int i = 0; i < 5; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) {
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i + 1, i + 10, i));
}
}
}
@Test
public void testRun() throws Exception
{
final Set<DataSegment> publishedSegments = runTestTask(
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
TaskState.SUCCESS,
false
);
assertHashedPartition(publishedSegments);
}
@Test
public void testAppendLinearlyPartitionedSegmensToHashPartitionedDatasourceSuccessfullyAppend()
{
final Set<DataSegment> publishedSegments = new HashSet<>();
publishedSegments.addAll(
runTestTask(
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
TaskState.SUCCESS,
false
)
);
// Append
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(5, null),
TaskState.SUCCESS,
true
)
);
// And append again
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(10, null),
TaskState.SUCCESS,
true
)
);
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
publishedSegments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final List<DataSegment> segments = entry.getValue();
final List<DataSegment> hashedSegments = segments
.stream()
.filter(segment -> segment.getShardSpec().getClass() == HashBasedNumberedShardSpec.class)
.collect(Collectors.toList());
final List<DataSegment> linearSegments = segments
.stream()
.filter(segment -> segment.getShardSpec().getClass() == NumberedShardSpec.class)
.collect(Collectors.toList());
for (DataSegment hashedSegment : hashedSegments) {
final HashBasedNumberedShardSpec hashShardSpec = (HashBasedNumberedShardSpec) hashedSegment.getShardSpec();
for (DataSegment linearSegment : linearSegments) {
Assert.assertEquals(hashedSegment.getInterval(), linearSegment.getInterval());
Assert.assertEquals(hashedSegment.getVersion(), linearSegment.getVersion());
final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) linearSegment.getShardSpec();
Assert.assertEquals(hashShardSpec.getNumCorePartitions(), numberedShardSpec.getNumCorePartitions());
Assert.assertTrue(hashShardSpec.getPartitionNum() < numberedShardSpec.getPartitionNum());
}
}
}
}
private Set<DataSegment> runTestTask(
PartitionsSpec partitionsSpec,
TaskState expectedTaskState,
boolean appendToExisting
)
{
if (isUseInputFormatApi()) {
return runTestTask(
TIMESTAMP_SPEC,
DIMENSIONS_SPEC,
INPUT_FORMAT,
null,
INTERVAL_TO_INDEX,
inputDir,
"test_*",
partitionsSpec,
maxNumConcurrentSubTasks,
expectedTaskState,
appendToExisting
);
} else {
return runTestTask(
null,
null,
null,
PARSE_SPEC,
INTERVAL_TO_INDEX,
inputDir,
"test_*",
partitionsSpec,
maxNumConcurrentSubTasks,
expectedTaskState,
appendToExisting
);
}
}
private void assertHashedPartition(Set<DataSegment> publishedSegments) throws IOException
{
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
publishedSegments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
final File tempSegmentDir = temporaryFolder.newFolder();
for (List<DataSegment> segmentsInInterval : intervalToSegments.values()) {
Assert.assertEquals(2, segmentsInInterval.size());
for (DataSegment segment : segmentsInInterval) {
List<ScanResultValue> results = querySegment(segment, ImmutableList.of("dim1", "dim2"), tempSegmentDir);
final int hash = HashBasedNumberedShardSpec.hash(getObjectMapper(), (List<Object>) results.get(0).getEvents());
for (ScanResultValue value : results) {
Assert.assertEquals(
hash,
HashBasedNumberedShardSpec.hash(getObjectMapper(), (List<Object>) value.getEvents())
);
}
}
}
}
}