blob: 672fa4846bf59b88176832c79e7425c03a4b93b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.LinearPartialShardSpec;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class SegmentAllocateActionTest
{
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
private static final String DATA_SOURCE = "none";
private static final DateTime PARTY_TIME = DateTimes.of("1999");
private static final DateTime THE_DISTANT_FUTURE = DateTimes.of("3000");
private final LockGranularity lockGranularity;
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.SEGMENT},
new Object[]{LockGranularity.TIME_CHUNK}
);
}
public SegmentAllocateActionTest(LockGranularity lockGranularity)
{
this.lockGranularity = lockGranularity;
}
@Before
public void setUp()
{
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
}
@Test
public void testGranularitiesFinerThanDay()
{
Assert.assertEquals(
ImmutableList.of(
Granularities.DAY,
Granularities.SIX_HOUR,
Granularities.HOUR,
Granularities.THIRTY_MINUTE,
Granularities.FIFTEEN_MINUTE,
Granularities.TEN_MINUTE,
Granularities.FIVE_MINUTE,
Granularities.MINUTE,
Granularities.SECOND
),
Granularity.granularitiesFinerThan(Granularities.DAY)
);
}
@Test
public void testGranularitiesFinerThanHour()
{
Assert.assertEquals(
ImmutableList.of(
Granularities.HOUR,
Granularities.THIRTY_MINUTE,
Granularities.FIFTEEN_MINUTE,
Granularities.TEN_MINUTE,
Granularities.FIVE_MINUTE,
Granularities.MINUTE,
Granularities.SECOND
),
Granularity.granularitiesFinerThan(Granularities.HOUR)
);
}
@Test
public void testManySegmentsSameInterval()
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
null
);
final SegmentIdWithShardSpec id2 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
id1.toString()
);
final SegmentIdWithShardSpec id3 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
id2.toString()
);
if (lockGranularity == LockGranularity.TIME_CHUNK) {
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(input -> input.getInterval().contains(PARTY_TIME))
);
assertSameIdentifier(
id1,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id2,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
assertSameIdentifier(
id3,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(2, 0)
)
);
} else {
final List<TaskLock> partyTimeLocks = taskActionTestKit.getTaskLockbox()
.findLocksForTask(task)
.stream()
.filter(input -> input.getInterval().contains(PARTY_TIME))
.collect(Collectors.toList());
Assert.assertEquals(3, partyTimeLocks.size());
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
id1.getVersion(),
new NumberedShardSpec(0, 0)
),
id1
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
id1.getVersion(),
new NumberedShardSpec(1, 0)
),
id2
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
id1.getVersion(),
new NumberedShardSpec(2, 0)
),
id3
);
}
}
@Test
public void testResumeSequence()
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);
final Map<Integer, SegmentIdWithShardSpec> allocatedPartyTimeIds = new HashMap<>();
final Map<Integer, SegmentIdWithShardSpec> allocatedFutureIds = new HashMap<>();
final SegmentIdWithShardSpec id1 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
null
);
Assert.assertNotNull(id1);
allocatedPartyTimeIds.put(id1.getShardSpec().getPartitionNum(), id1);
final SegmentIdWithShardSpec id2 = allocate(
task,
THE_DISTANT_FUTURE,
Granularities.NONE,
Granularities.HOUR,
"s1",
id1.toString()
);
Assert.assertNotNull(id2);
allocatedFutureIds.put(id2.getShardSpec().getPartitionNum(), id2);
final SegmentIdWithShardSpec id3 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
id2.toString()
);
Assert.assertNotNull(id3);
allocatedPartyTimeIds.put(id3.getShardSpec().getPartitionNum(), id3);
final SegmentIdWithShardSpec id4 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
id1.toString()
);
Assert.assertNull(id4);
final SegmentIdWithShardSpec id5 = allocate(
task,
THE_DISTANT_FUTURE,
Granularities.NONE,
Granularities.HOUR,
"s1",
id1.toString()
);
Assert.assertNotNull(id5);
allocatedFutureIds.put(id5.getShardSpec().getPartitionNum(), id5);
final SegmentIdWithShardSpec id6 = allocate(
task,
THE_DISTANT_FUTURE,
Granularities.NONE,
Granularities.MINUTE,
"s1",
id1.toString()
);
Assert.assertNull(id6);
final SegmentIdWithShardSpec id7 = allocate(
task,
THE_DISTANT_FUTURE,
Granularities.NONE,
Granularities.DAY,
"s1",
id1.toString()
);
Assert.assertNotNull(id7);
allocatedFutureIds.put(id7.getShardSpec().getPartitionNum(), id7);
if (lockGranularity == LockGranularity.TIME_CHUNK) {
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(PARTY_TIME);
}
}
)
);
final TaskLock futureLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(THE_DISTANT_FUTURE);
}
}
)
);
assertSameIdentifier(
id1,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id2,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(THE_DISTANT_FUTURE),
futureLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id3,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
} else {
final List<TaskLock> partyLocks = taskActionTestKit.getTaskLockbox()
.findLocksForTask(task)
.stream()
.filter(input -> input.getInterval().contains(PARTY_TIME))
.collect(Collectors.toList());
Assert.assertEquals(2, partyLocks.size());
final Map<Integer, SegmentLock> partitionIdToLock = new HashMap<>();
partyLocks.forEach(lock -> {
Assert.assertEquals(LockGranularity.SEGMENT, lock.getGranularity());
final SegmentLock segmentLock = (SegmentLock) lock;
partitionIdToLock.put(segmentLock.getPartitionId(), segmentLock);
});
for (Entry<Integer, SegmentLock> entry : partitionIdToLock.entrySet()) {
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
allocatedPartyTimeIds.get(entry.getKey()).getVersion(),
new NumberedShardSpec(entry.getValue().getPartitionId(), 0)
),
allocatedPartyTimeIds.get(entry.getKey())
);
}
final List<TaskLock> futureLocks = taskActionTestKit
.getTaskLockbox()
.findLocksForTask(task)
.stream()
.filter(input -> input.getInterval().contains(THE_DISTANT_FUTURE))
.collect(Collectors.toList());
Assert.assertEquals(1, futureLocks.size());
partitionIdToLock.clear();
futureLocks.forEach(lock -> {
Assert.assertEquals(LockGranularity.SEGMENT, lock.getGranularity());
final SegmentLock segmentLock = (SegmentLock) lock;
partitionIdToLock.put(segmentLock.getPartitionId(), segmentLock);
});
for (Entry<Integer, SegmentLock> entry : partitionIdToLock.entrySet()) {
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(THE_DISTANT_FUTURE),
allocatedFutureIds.get(entry.getKey()).getVersion(),
new NumberedShardSpec(entry.getValue().getPartitionId(), 0)
),
allocatedFutureIds.get(entry.getKey())
);
}
}
Assert.assertNull(id4);
assertSameIdentifier(id2, id5);
Assert.assertNull(id6);
assertSameIdentifier(id2, id7);
}
@Test
public void testMultipleSequences()
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, "s1", null);
final SegmentIdWithShardSpec id2 = allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, "s2", null);
final SegmentIdWithShardSpec id3 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
id1.toString()
);
final SegmentIdWithShardSpec id4 = allocate(
task,
THE_DISTANT_FUTURE,
Granularities.NONE,
Granularities.HOUR,
"s1",
id3.toString()
);
final SegmentIdWithShardSpec id5 = allocate(
task,
THE_DISTANT_FUTURE,
Granularities.NONE,
Granularities.HOUR,
"s2",
id2.toString()
);
final SegmentIdWithShardSpec id6 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
null
);
if (lockGranularity == LockGranularity.TIME_CHUNK) {
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(PARTY_TIME);
}
}
)
);
final TaskLock futureLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(THE_DISTANT_FUTURE);
}
}
)
);
assertSameIdentifier(
id1,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id2,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
assertSameIdentifier(
id3,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(2, 0)
)
);
assertSameIdentifier(
id4,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(THE_DISTANT_FUTURE),
futureLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id5,
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(THE_DISTANT_FUTURE),
futureLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
} else {
final List<TaskLock> partyLocks = taskActionTestKit.getTaskLockbox()
.findLocksForTask(task)
.stream()
.filter(input -> input.getInterval().contains(PARTY_TIME))
.collect(Collectors.toList());
Assert.assertEquals(3, partyLocks.size());
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLocks.get(0).getVersion(),
new NumberedShardSpec(0, 0)
),
id1
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLocks.get(1).getVersion(),
new NumberedShardSpec(1, 0)
),
id2
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
partyLocks.get(2).getVersion(),
new NumberedShardSpec(2, 0)
),
id3
);
final List<TaskLock> futureLocks = taskActionTestKit
.getTaskLockbox()
.findLocksForTask(task)
.stream()
.filter(input -> input.getInterval().contains(THE_DISTANT_FUTURE))
.collect(Collectors.toList());
Assert.assertEquals(2, futureLocks.size());
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(THE_DISTANT_FUTURE),
futureLocks.get(0).getVersion(),
new NumberedShardSpec(0, 0)
),
id4
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(THE_DISTANT_FUTURE),
futureLocks.get(1).getVersion(),
new NumberedShardSpec(1, 0)
),
id5
);
}
assertSameIdentifier(id1, id6);
}
@Test
public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception
{
final Task task = NoopTask.create();
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new LinearShardSpec(0))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new LinearShardSpec(1))
.size(0)
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
null,
LinearPartialShardSpec.instance()
);
final SegmentIdWithShardSpec id2 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
id1.toString(),
LinearPartialShardSpec.instance()
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new LinearShardSpec(2)
),
id1
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new LinearShardSpec(3)
),
id2
);
}
@Test
public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exception
{
final Task task = NoopTask.create();
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.size(0)
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
null
);
final SegmentIdWithShardSpec id2 = allocate(
task,
PARTY_TIME,
Granularities.NONE,
Granularities.HOUR,
"s1",
id1.toString()
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(2, 2)
),
id1
);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(3, 2)
),
id2
);
}
@Test
public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() throws Exception
{
final Task task = NoopTask.create();
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.size(0)
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(task, PARTY_TIME, Granularities.NONE, Granularities.DAY, "s1", null);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(2, 2)
),
id1
);
}
@Test
public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throws Exception
{
final Task task = NoopTask.create();
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.size(0)
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(task, PARTY_TIME, Granularities.NONE, Granularities.MINUTE, "s1", null);
assertSameIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
Granularities.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(2, 2)
),
id1
);
}
@Test
public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity() throws Exception
{
final Task task = NoopTask.create();
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.size(0)
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(task, PARTY_TIME, Granularities.DAY, Granularities.DAY, "s1", null);
Assert.assertNull(id1);
}
@Test
public void testCannotDoAnythingWithSillyQueryGranularity()
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(task, PARTY_TIME, Granularities.DAY, Granularities.HOUR, "s1", null);
Assert.assertNull(id1);
}
@Test
public void testWithPartialShardSpecAndOvershadowingSegments() throws IOException
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);
final ObjectMapper objectMapper = new DefaultObjectMapper();
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(
new HashBasedNumberedShardSpec(0, 2, 0, 2, ImmutableList.of("dim1"), null, objectMapper)
)
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(
new HashBasedNumberedShardSpec(1, 2, 1, 2, ImmutableList.of("dim1"), null, objectMapper)
)
.size(0)
.build()
)
);
final SegmentAllocateAction action = new SegmentAllocateAction(
DATA_SOURCE,
PARTY_TIME,
Granularities.MINUTE,
Granularities.HOUR,
"seq",
null,
true,
new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 1, 2, null),
lockGranularity
);
final SegmentIdWithShardSpec segmentIdentifier = action.perform(task, taskActionTestKit.getTaskActionToolbox());
Assert.assertNotNull(segmentIdentifier);
final ShardSpec shardSpec = segmentIdentifier.getShardSpec();
Assert.assertEquals(2, shardSpec.getPartitionNum());
Assert.assertTrue(shardSpec instanceof HashBasedNumberedShardSpec);
final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) shardSpec;
Assert.assertEquals(2, hashBasedNumberedShardSpec.getNumCorePartitions());
Assert.assertEquals(ImmutableList.of("dim1"), hashBasedNumberedShardSpec.getPartitionDimensions());
}
@Test
public void testSameIntervalWithSegmentGranularity()
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);
Granularity segmentGranularity = new PeriodGranularity(Period.hours(1), null, DateTimes.inferTzFromString("Asia/Shanghai"));
final SegmentIdWithShardSpec id1 = allocate(
task,
PARTY_TIME,
Granularities.MINUTE,
segmentGranularity,
"s1",
null
);
final SegmentIdWithShardSpec id2 = allocate(
task,
PARTY_TIME,
Granularities.MINUTE,
segmentGranularity,
"s2",
null
);
Assert.assertNotNull(id1);
Assert.assertNotNull(id2);
}
private SegmentIdWithShardSpec allocate(
final Task task,
final DateTime timestamp,
final Granularity queryGranularity,
final Granularity preferredSegmentGranularity,
final String sequenceName,
final String sequencePreviousId
)
{
return allocate(
task,
timestamp,
queryGranularity,
preferredSegmentGranularity,
sequenceName,
sequencePreviousId,
NumberedPartialShardSpec.instance()
);
}
private SegmentIdWithShardSpec allocate(
final Task task,
final DateTime timestamp,
final Granularity queryGranularity,
final Granularity preferredSegmentGranularity,
final String sequenceName,
final String sequencePreviousId,
final PartialShardSpec partialShardSpec
)
{
final SegmentAllocateAction action = new SegmentAllocateAction(
DATA_SOURCE,
timestamp,
queryGranularity,
preferredSegmentGranularity,
sequenceName,
sequencePreviousId,
false,
partialShardSpec,
lockGranularity
);
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
}
private void assertSameIdentifier(final SegmentIdWithShardSpec expected, final SegmentIdWithShardSpec actual)
{
Assert.assertEquals(expected, actual);
Assert.assertEquals(expected.getShardSpec().getPartitionNum(), actual.getShardSpec().getPartitionNum());
Assert.assertEquals(expected.getShardSpec().getClass(), actual.getShardSpec().getClass());
if (expected.getShardSpec().getClass() == NumberedShardSpec.class
&& actual.getShardSpec().getClass() == NumberedShardSpec.class) {
Assert.assertEquals(expected.getShardSpec().getNumCorePartitions(), actual.getShardSpec().getNumCorePartitions());
} else if (expected.getShardSpec().getClass() == LinearShardSpec.class
&& actual.getShardSpec().getClass() == LinearShardSpec.class) {
// do nothing
}
}
}