blob: e841fba0f370cda753dac927ff72de018906db97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.timeline.partition.RangeBucketShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class RangePartitionCachingLocalSegmentAllocatorTest
{
private static final String DATASOURCE = "datasource";
private static final String TASKID = "taskid";
private static final String SUPERVISOR_TASKID = "supervisor-taskid";
private static final String PARTITION_DIMENSION = "dimension";
private static final Interval INTERVAL_EMPTY = Intervals.utc(0, 1000);
private static final Interval INTERVAL_SINGLETON = Intervals.utc(1000, 2000);
private static final Interval INTERVAL_NORMAL = Intervals.utc(2000, 3000);
private static final Map<Interval, String> INTERVAL_TO_VERSION = ImmutableMap.of(
INTERVAL_EMPTY, "version-empty",
INTERVAL_SINGLETON, "version-singleton",
INTERVAL_NORMAL, "version-normal"
);
private static final String PARTITION0 = "0";
private static final String PARTITION5 = "5";
private static final String PARTITION9 = "9";
private static final PartitionBoundaries EMPTY_PARTITIONS = new PartitionBoundaries();
private static final PartitionBoundaries SINGLETON_PARTITIONS = new PartitionBoundaries(PARTITION0, PARTITION0);
private static final PartitionBoundaries NORMAL_PARTITIONS = new PartitionBoundaries(
PARTITION0,
PARTITION5,
PARTITION9
);
private static final Map<Interval, PartitionBoundaries> INTERVAL_TO_PARTITONS = ImmutableMap.of(
INTERVAL_EMPTY, EMPTY_PARTITIONS,
INTERVAL_SINGLETON, SINGLETON_PARTITIONS,
INTERVAL_NORMAL, NORMAL_PARTITIONS
);
private SegmentAllocator target;
private SequenceNameFunction sequenceNameFunction;
@Rule
public ExpectedException exception = ExpectedException.none();
@Before
public void setup() throws IOException
{
TaskToolbox toolbox = createToolbox(
INTERVAL_TO_VERSION.keySet()
.stream()
.map(RangePartitionCachingLocalSegmentAllocatorTest::createTaskLock)
.collect(Collectors.toList())
);
final RangePartitionAnalysis partitionAnalysis = new RangePartitionAnalysis(
new SingleDimensionPartitionsSpec(null, 1, PARTITION_DIMENSION, false)
);
INTERVAL_TO_PARTITONS.forEach(partitionAnalysis::updateBucket);
target = SegmentAllocators.forNonLinearPartitioning(
toolbox,
DATASOURCE,
TASKID,
new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, ImmutableList.of()),
new SupervisorTaskAccessWithNullClient(SUPERVISOR_TASKID),
partitionAnalysis
);
sequenceNameFunction = ((CachingLocalSegmentAllocator) target).getSequenceNameFunction();
}
@Test
public void failsIfAllocateFromEmptyInterval()
{
Interval interval = INTERVAL_EMPTY;
InputRow row = createInputRow(interval, PARTITION9);
exception.expect(IllegalStateException.class);
exception.expectMessage("Failed to get shardSpec");
String sequenceName = sequenceNameFunction.getSequenceName(interval, row);
allocate(row, sequenceName);
}
@Test
public void allocatesCorrectShardSpecsForSingletonPartitions()
{
Interval interval = INTERVAL_SINGLETON;
InputRow row = createInputRow(interval, PARTITION9);
testAllocate(row, interval, 0, null);
}
@Test
public void allocatesCorrectShardSpecsForFirstPartition()
{
Interval interval = INTERVAL_NORMAL;
InputRow row = createInputRow(interval, PARTITION0);
testAllocate(row, interval, 0);
}
@Test
public void allocatesCorrectShardSpecsForLastPartition()
{
Interval interval = INTERVAL_NORMAL;
InputRow row = createInputRow(interval, PARTITION9);
int partitionNum = INTERVAL_TO_PARTITONS.get(interval).size() - 2;
testAllocate(row, interval, partitionNum, null);
}
@Test
public void getSequenceName()
{
// getSequenceName_forIntervalAndRow_shouldUseISOFormatAndPartitionNumForRow
Interval interval = INTERVAL_NORMAL;
InputRow row = createInputRow(interval, PARTITION9);
String sequenceName = sequenceNameFunction.getSequenceName(interval, row);
String expectedSequenceName = StringUtils.format("%s_%s_%d", TASKID, interval, 1);
Assert.assertEquals(expectedSequenceName, sequenceName);
}
@SuppressWarnings("SameParameterValue")
private void testAllocate(InputRow row, Interval interval, int bucketId)
{
String partitionEnd = getPartitionEnd(interval, bucketId);
testAllocate(row, interval, bucketId, partitionEnd);
}
@Nullable
private static String getPartitionEnd(Interval interval, int bucketId)
{
PartitionBoundaries partitions = INTERVAL_TO_PARTITONS.get(interval);
boolean isLastPartition = (bucketId + 1) == partitions.size();
return isLastPartition ? null : partitions.get(bucketId + 1);
}
private void testAllocate(InputRow row, Interval interval, int bucketId, @Nullable String partitionEnd)
{
String partitionStart = getPartitionStart(interval, bucketId);
testAllocate(row, interval, bucketId, partitionStart, partitionEnd);
}
@Nullable
private static String getPartitionStart(Interval interval, int bucketId)
{
boolean isFirstPartition = bucketId == 0;
return isFirstPartition ? null : INTERVAL_TO_PARTITONS.get(interval).get(bucketId);
}
private void testAllocate(
InputRow row,
Interval interval,
int bucketId,
@Nullable String partitionStart,
@Nullable String partitionEnd
)
{
String sequenceName = sequenceNameFunction.getSequenceName(interval, row);
SegmentIdWithShardSpec segmentIdWithShardSpec = allocate(row, sequenceName);
Assert.assertEquals(
SegmentId.of(DATASOURCE, interval, INTERVAL_TO_VERSION.get(interval), bucketId),
segmentIdWithShardSpec.asSegmentId()
);
RangeBucketShardSpec shardSpec = (RangeBucketShardSpec) segmentIdWithShardSpec.getShardSpec();
Assert.assertEquals(PARTITION_DIMENSION, shardSpec.getDimension());
Assert.assertEquals(bucketId, shardSpec.getBucketId());
Assert.assertEquals(partitionStart, shardSpec.getStart());
Assert.assertEquals(partitionEnd, shardSpec.getEnd());
}
private SegmentIdWithShardSpec allocate(InputRow row, String sequenceName)
{
try {
return target.allocate(row, sequenceName, null, false);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private static TaskToolbox createToolbox(List<TaskLock> taskLocks)
{
TaskToolbox toolbox = EasyMock.mock(TaskToolbox.class);
EasyMock.expect(toolbox.getTaskActionClient()).andStubReturn(createTaskActionClient(taskLocks));
EasyMock.replay(toolbox);
return toolbox;
}
private static TaskActionClient createTaskActionClient(List<TaskLock> taskLocks)
{
try {
TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(EasyMock.anyObject(LockListAction.class))).andStubReturn(taskLocks);
EasyMock.replay(taskActionClient);
return taskActionClient;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static TaskLock createTaskLock(Interval interval)
{
TaskLock taskLock = EasyMock.mock(TaskLock.class);
EasyMock.expect(taskLock.getInterval()).andStubReturn(interval);
EasyMock.expect(taskLock.getVersion()).andStubReturn(INTERVAL_TO_VERSION.get(interval));
EasyMock.replay(taskLock);
return taskLock;
}
private static InputRow createInputRow(Interval interval, String dimensionValue)
{
long timestamp = interval.getStartMillis();
InputRow inputRow = EasyMock.mock(InputRow.class);
EasyMock.expect(inputRow.getTimestamp()).andStubReturn(DateTimes.utc(timestamp));
EasyMock.expect(inputRow.getTimestampFromEpoch()).andStubReturn(timestamp);
EasyMock.expect(inputRow.getDimension(PARTITION_DIMENSION))
.andStubReturn(Collections.singletonList(dimensionValue));
EasyMock.replay(inputRow);
return inputRow;
}
}