blob: ef1db8a3146875554a383f1a589d813b296cef07 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Ordering;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.easymock.EasyMock;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
@RunWith(Enclosed.class)
public class ParallelIndexSupervisorTaskTest
{
@RunWith(Parameterized.class)
public static class CreateMergeIoConfigsTest
{
private static final int TOTAL_NUM_MERGE_TASKS = 10;
private static final Function<List<GenericPartitionLocation>, PartialGenericSegmentMergeIOConfig>
CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG = PartialGenericSegmentMergeIOConfig::new;
@Parameterized.Parameters(name = "count = {0}")
public static Iterable<? extends Object> data()
{
// different scenarios for last (index = 10 - 1 = 9) partition:
return Arrays.asList(
20, // even partitions per task: round(20 / 10) * (10 - 1) = 2 * 9 = 18 < 20
24, // round down: round(24 / 10) * (10 - 1) = 2 * 9 = 18 < 24
25, // round up to greater: round(25 / 10) * (10 - 1) = 3 * 9 = 27 > 25 (index out of bounds)
27 // round up to equal: round(27 / 10) * (10 - 1) = 3 * 9 = 27 == 27 (empty partition)
);
}
@Parameterized.Parameter
public int count;
@Test
public void handlesLastPartitionCorrectly()
{
List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
assertNoMissingPartitions(count, assignedPartitionLocation);
}
@Test
public void sizesPartitionsEvenly()
{
List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
List<Integer> actualPartitionSizes = assignedPartitionLocation.stream()
.map(i -> i.getPartitionLocations().size())
.collect(Collectors.toList());
List<Integer> sortedPartitionSizes = Ordering.natural().sortedCopy(actualPartitionSizes);
int minPartitionSize = sortedPartitionSizes.get(0);
int maxPartitionSize = sortedPartitionSizes.get(sortedPartitionSizes.size() - 1);
int partitionSizeRange = maxPartitionSize - minPartitionSize;
Assert.assertThat(
"partition sizes = " + actualPartitionSizes,
partitionSizeRange,
Matchers.is(Matchers.both(Matchers.greaterThanOrEqualTo(0)).and(Matchers.lessThanOrEqualTo(1)))
);
}
private List<PartialGenericSegmentMergeIOConfig> createMergeIOConfigs()
{
return ParallelIndexSupervisorTask.createMergeIOConfigs(
TOTAL_NUM_MERGE_TASKS,
createPartitionToLocations(count),
CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG
);
}
private static Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> createPartitionToLocations(int count)
{
return IntStream.range(0, count).boxed().collect(
Collectors.toMap(
i -> Pair.of(createInterval(i), i),
i -> Collections.singletonList(createPartitionLocation(i))
)
);
}
private static GenericPartitionLocation createPartitionLocation(int id)
{
return new GenericPartitionLocation(
"host",
0,
false,
"subTaskId",
createInterval(id),
new BuildingHashBasedNumberedShardSpec(
id,
id,
id + 1,
null,
HashPartitionFunction.MURMUR3_32_ABS,
new ObjectMapper()
)
);
}
private static Interval createInterval(int id)
{
return Intervals.utc(id, id + 1);
}
private static void assertNoMissingPartitions(
int count,
List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation
)
{
List<Integer> expectedIds = IntStream.range(0, count).boxed().collect(Collectors.toList());
List<Integer> actualIds = assignedPartitionLocation.stream()
.flatMap(
i -> i.getPartitionLocations()
.stream()
.map(GenericPartitionLocation::getBucketId)
)
.sorted()
.collect(Collectors.toList());
Assert.assertEquals(expectedIds, actualIds);
}
}
public static class ConstructorTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupAreSet()
{
final boolean appendToExisting = true;
final boolean forceGuaranteedRollup = true;
final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
new InlineInputSource("test"),
new JsonInputFormat(null, null, null),
appendToExisting
);
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
10,
1000L,
null,
null,
null,
new HashedPartitionsSpec(null, 10, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.UNCOMPRESSED,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
new IndexSpec(),
1,
forceGuaranteedRollup,
true,
10000L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
null,
10,
100,
20L,
new Duration(3600),
128,
null,
null,
false,
null,
null
);
final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"datasource",
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
null,
null,
null
),
ioConfig,
tuningConfig
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Perfect rollup cannot be guaranteed when appending to existing dataSources");
new ParallelIndexSupervisorTask(
null,
null,
null,
indexIngestionSpec,
null
);
}
}
public static class staticUtilsTest
{
@Test
public void testIsParallelModeFalse_nullTuningConfig()
{
InputSource inputSource = mock(InputSource.class);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null));
}
@Test
public void testIsParallelModeFalse_rangePartition()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(true).anyTimes();
ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2);
EasyMock.replay(inputSource, tuningConfig);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}
@Test
public void testIsParallelModeFalse_notRangePartition()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(true).anyTimes();
ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3);
EasyMock.replay(inputSource, tuningConfig);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}
@Test
public void testIsParallelModeFalse_inputSourceNotSplittable()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(false).anyTimes();
ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3);
EasyMock.replay(inputSource, tuningConfig);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}
}
}