blob: 173e4537ccad3caf577803c637a5c9179d3e9005 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.druid.utils.Streams;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class CompactSegmentsTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final DruidCoordinatorConfig COORDINATOR_CONFIG = Mockito.mock(DruidCoordinatorConfig.class);
private static final String DATA_SOURCE_PREFIX = "dataSource_";
private static final int PARTITION_PER_TIME_INTERVAL = 4;
// Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10;
private static final NewestSegmentFirstPolicy SEARCH_POLICY = new NewestSegmentFirstPolicy(JSON_MAPPER);
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
final MutableInt nextRangePartitionBoundary = new MutableInt(0);
return ImmutableList.of(
new Object[]{
new DynamicPartitionsSpec(300000, Long.MAX_VALUE),
(BiFunction<Integer, Integer, ShardSpec>) NumberedShardSpec::new
},
new Object[]{
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")),
(BiFunction<Integer, Integer, ShardSpec>) (bucketId, numBuckets) -> new HashBasedNumberedShardSpec(
bucketId,
numBuckets,
bucketId,
numBuckets,
ImmutableList.of("dim"),
null,
JSON_MAPPER
)
},
new Object[]{
new SingleDimensionPartitionsSpec(300000, null, "dim", false),
(BiFunction<Integer, Integer, ShardSpec>) (bucketId, numBuckets) -> new SingleDimensionShardSpec(
"dim",
bucketId == 0 ? null : String.valueOf(nextRangePartitionBoundary.getAndIncrement()),
bucketId.equals(numBuckets) ? null : String.valueOf(nextRangePartitionBoundary.getAndIncrement()),
bucketId,
numBuckets
)
}
);
}
private final PartitionsSpec partitionsSpec;
private final BiFunction<Integer, Integer, ShardSpec> shardSpecFactory;
private DataSourcesSnapshot dataSources;
Map<String, List<DataSegment>> datasourceToSegments = new HashMap<>();
public CompactSegmentsTest(PartitionsSpec partitionsSpec, BiFunction<Integer, Integer, ShardSpec> shardSpecFactory)
{
this.partitionsSpec = partitionsSpec;
this.shardSpecFactory = shardSpecFactory;
}
@Before
public void setup()
{
List<DataSegment> allSegments = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
List<DataSegment> segmentForDatasource = datasourceToSegments.computeIfAbsent(dataSource, key -> new ArrayList<>());
DataSegment dataSegment = createSegment(dataSource, j, true, k);
allSegments.add(dataSegment);
segmentForDatasource.add(dataSegment);
dataSegment = createSegment(dataSource, j, false, k);
allSegments.add(dataSegment);
segmentForDatasource.add(dataSegment);
}
}
}
dataSources = DataSourcesSnapshot.fromUsedSegments(allSegments, ImmutableMap.of());
}
private DataSegment createSegment(String dataSource, int startDay, boolean beforeNoon, int partition)
{
final ShardSpec shardSpec = shardSpecFactory.apply(partition, 2);
final Interval interval = beforeNoon ?
Intervals.of(
StringUtils.format(
"2017-01-%02dT00:00:00/2017-01-%02dT12:00:00",
startDay + 1,
startDay + 1
)
) :
Intervals.of(
StringUtils.format(
"2017-01-%02dT12:00:00/2017-01-%02dT00:00:00",
startDay + 1,
startDay + 2
)
);
return new DataSegment(
dataSource,
interval,
"version",
null,
Collections.emptyList(),
Collections.emptyList(),
shardSpec,
0,
10L
);
}
@Test
public void testSerde() throws Exception
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
JSON_MAPPER.setInjectableValues(
new InjectableValues.Std()
.addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG)
.addValue(OverlordClient.class, overlordClient)
.addValue(CompactionSegmentSearchPolicy.class, SEARCH_POLICY)
);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments);
CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
Assert.assertNotNull(serdeCompactSegments);
Assert.assertSame(overlordClient, serdeCompactSegments.getOverlordClient());
}
@Test
public void testRun()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
private int i = 0;
@Override
public String get()
{
return "newVersion_" + i++;
}
};
int expectedCompactTaskCount = 1;
int expectedRemainingSegments = 400;
// compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 9, 9),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
// compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
for (int endDay = 4; endDay > 1; endDay -= 1) {
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
}
assertLastSegmentNotCompacted(compactSegments);
}
@Test
public void testMakeStats()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
for (int i = 0; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
40,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
1,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
4
);
}
// Test run auto compaction with one datasource auto compaction disabled
// Snapshot should not contain datasource with auto compaction disabled
List<DataSourceCompactionConfig> removedOneConfig = createCompactionConfigs();
removedOneConfig.remove(0);
doCompactSegments(compactSegments, removedOneConfig);
for (int i = 1; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
40,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
1,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
4
);
}
// Run auto compaction without any dataSource in the compaction config
// Snapshot should be empty
doCompactSegments(compactSegments, new ArrayList<>());
Assert.assertEquals(
0,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().isEmpty());
assertLastSegmentNotCompacted(compactSegments);
}
@Test
public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIntervals()
{
// Only test and validate for one datasource for simplicity.
// This dataSource has three intervals already compacted (3 intervals, 120 byte, 12 segments already compacted)
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day compacted (two compacted intervals back-to-back)
beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, null, null, ImmutableMap.of(), ImmutableMap.of()));
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, null, null, ImmutableMap.of(), ImmutableMap.of()));
}
if (j == 1) {
// Make one interval on this day compacted
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, null, null, ImmutableMap.of(), ImmutableMap.of()));
}
segments.add(beforeNoon);
segments.add(afterNoon);
}
}
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of());
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 120 byte, 12 segments already compacted before the run
for (int compactionRunCount = 0; compactionRunCount < 8; compactionRunCount++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compactionRunCount + 1),
120 + 40 * (compactionRunCount + 1),
40,
TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compactionRunCount + 1),
3 + (compactionRunCount + 1),
1,
TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compactionRunCount + 1),
// 12 segments was compressed before any auto compaction
// 4 segments was compressed in this run of auto compaction
// Each previous auto compaction run resulted in 2 compacted segments (4 segments compacted into 2 segments)
12 + 4 + 2 * (compactionRunCount),
4
);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
0,
TOTAL_BYTE_PER_DATASOURCE,
40,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
1,
0,
// 12 segments was compressed before any auto compaction
// 32 segments needs compaction which is now compacted into 16 segments (4 segments compacted into 2 segments each run)
12 + 16,
4
);
}
@Test
public void testMakeStatsWithDeactivatedDatasource()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
doCompactionAndAssertCompactSegmentStatistics(compactSegments, compactionRunCount);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
for (int i = 0; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
40,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
1,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
4
);
}
// Deactivate one datasource (datasource 0 no longer exist in timeline)
dataSources.getUsedSegmentsTimelinesPerDataSource()
.remove(DATA_SOURCE_PREFIX + 0);
// Test run auto compaction with one datasource deactivated
// Snapshot should not contain deactivated datasource
doCompactSegments(compactSegments);
for (int i = 1; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
40,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
1,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
4
);
}
Assert.assertEquals(2, compactSegments.getAutoCompactionSnapshot().size());
Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 1));
Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 2));
Assert.assertFalse(compactSegments.getAutoCompactionSnapshot().containsKey(DATA_SOURCE_PREFIX + 0));
}
@Test
public void testMakeStatsForDataSourceWithSkipped()
{
// Only test and validate for one datasource for simplicity.
// This dataSource has three intervals skipped (3 intervals, 1200 byte, 12 segments skipped by auto compaction)
// Note that these segment used to be 10 bytes each in other tests, we are increasing it to 100 bytes each here
// so that they will be skipped by the auto compaction.
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day skipped (two skipped intervals back-to-back)
beforeNoon = beforeNoon.withSize(100);
afterNoon = afterNoon.withSize(100);
}
if (j == 1) {
// Make one interval on this day skipped
afterNoon = afterNoon.withSize(100);
}
segments.add(beforeNoon);
segments.add(afterNoon);
}
}
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of());
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 1200 byte (each segment is 100 bytes), 12 segments will be skipped by auto compaction
for (int compactionRunCount = 0; compactionRunCount < 8; compactionRunCount++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
// Minus 120 bytes accounting for the three skipped segments' original size
TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compactionRunCount + 1),
40 * (compactionRunCount + 1),
1240,
TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compactionRunCount + 1),
(compactionRunCount + 1),
4,
TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compactionRunCount + 1),
4 + 2 * (compactionRunCount),
16
);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
0,
// Minus 120 bytes accounting for the three skipped segments' original size
TOTAL_BYTE_PER_DATASOURCE - 120,
1240,
0,
TOTAL_INTERVAL_PER_DATASOURCE - 3,
4,
0,
16,
16
);
}
@Test
public void testRunMultipleCompactionTaskSlots()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats = doCompactSegments(compactSegments, 3);
Assert.assertEquals(3, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Compaction.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Compaction.SUBMITTED_TASKS));
}
@Test
public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsOverMaxSlot()
{
int maxCompactionSlot = 3;
Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(maxCompactionSlot, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
Assert.assertEquals(maxCompactionSlot, stats.get(Stats.Compaction.MAX_SLOTS));
Assert.assertEquals(maxCompactionSlot, stats.get(Stats.Compaction.SUBMITTED_TASKS));
}
@Test
public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsUnderMaxSlot()
{
int maxCompactionSlot = 100;
Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.get(Stats.Compaction.MAX_SLOTS));
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.get(Stats.Compaction.SUBMITTED_TASKS));
}
@Test
public void testCompactWithoutGranularitySpec()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertEquals(
Intervals.of("2017-01-09T12:00:00.000Z/2017-01-10T00:00:00.000Z"),
taskPayload.getIoConfig().getInputSpec().getInterval()
);
Assert.assertNull(taskPayload.getGranularitySpec().getSegmentGranularity());
Assert.assertNull(taskPayload.getGranularitySpec().getQueryGranularity());
Assert.assertNull(taskPayload.getGranularitySpec().isRollup());
}
@Test
public void testCompactWithNotNullIOConfig()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
new UserCompactionTaskIOConfig(true),
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertTrue(taskPayload.getIoConfig().isDropExisting());
}
@Test
public void testCompactWithNullIOConfig()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertEquals(BatchIOConfig.DEFAULT_DROP_EXISTING, taskPayload.getIoConfig().isDropExisting());
}
@Test
public void testCompactWithGranularitySpec()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
// are within the same year
Assert.assertEquals(
ClientCompactionIntervalSpec.fromSegments(datasourceToSegments.get(dataSource), Granularities.YEAR),
taskPayload.getIoConfig().getInputSpec()
);
ClientCompactionTaskGranularitySpec expectedGranularitySpec =
new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
Assert.assertEquals(expectedGranularitySpec, taskPayload.getGranularitySpec());
}
@Test
public void testCompactWithDimensionSpec()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertEquals(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")),
taskPayload.getDimensionsSpec().getDimensions()
);
}
@Test
public void testCompactWithoutDimensionSpec()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertNull(taskPayload.getDimensionsSpec());
}
@Test
public void testCompactWithRollupInGranularitySpec()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, true),
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
// are within the same year
Assert.assertEquals(
ClientCompactionIntervalSpec.fromSegments(datasourceToSegments.get(dataSource), Granularities.YEAR),
taskPayload.getIoConfig().getInputSpec()
);
ClientCompactionTaskGranularitySpec expectedGranularitySpec =
new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, true);
Assert.assertEquals(expectedGranularitySpec, taskPayload.getGranularitySpec());
}
@Test
public void testCompactWithGranularitySpecConflictWithActiveCompactionTask()
{
final String dataSource = DATA_SOURCE_PREFIX + 0;
final String conflictTaskId = "taskIdDummy";
final TaskStatusPlus runningConflictCompactionTask = new TaskStatusPlus(
conflictTaskId,
"groupId",
"compact",
DateTimes.EPOCH,
DateTimes.EPOCH,
TaskState.RUNNING,
RunnerTaskState.RUNNING,
-1L,
TaskLocation.unknown(),
dataSource,
null
);
final TaskPayloadResponse runningConflictCompactionTaskPayload = new TaskPayloadResponse(
conflictTaskId,
new ClientCompactionTaskQuery(
conflictTaskId,
dataSource,
new ClientCompactionIOConfig(
new ClientCompactionIntervalSpec(
Intervals.of("2000/2099"),
"testSha256OfSortedSegmentIds"
),
null
),
null,
new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null),
null,
null,
null,
null
)
);
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = ArgumentCaptor.forClass(Object.class);
Mockito.when(mockClient.runTask(ArgumentMatchers.anyString(), payloadCaptor.capture()))
.thenReturn(Futures.immediateFuture(null));
Mockito.when(mockClient.taskStatuses(null, null, 0))
.thenReturn(
Futures.immediateFuture(
CloseableIterators.withEmptyBaggage(ImmutableList.of(runningConflictCompactionTask).iterator())));
Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any()))
.thenReturn(Futures.immediateFuture(Collections.emptyMap()));
Mockito.when(mockClient.cancelTask(conflictTaskId))
.thenReturn(Futures.immediateFuture(null));
Mockito.when(mockClient.getTotalWorkerCapacity())
.thenReturn(Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(0, 0)));
Mockito.when(mockClient.taskPayload(ArgumentMatchers.eq(conflictTaskId)))
.thenReturn(Futures.immediateFuture(runningConflictCompactionTaskPayload));
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
// Verify that conflict task was canceled
Mockito.verify(mockClient).cancelTask(conflictTaskId);
// The active conflict task has interval of 2000/2099
// Make sure that we do not skip interval of conflict task.
// Since we cancel the task and will have to compact those intervals with the new segmentGranulartity
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
// are within the same year
Assert.assertEquals(
ClientCompactionIntervalSpec.fromSegments(datasourceToSegments.get(dataSource), Granularities.YEAR),
taskPayload.getIoConfig().getInputSpec()
);
ClientCompactionTaskGranularitySpec expectedGranularitySpec =
new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
Assert.assertEquals(expectedGranularitySpec, taskPayload.getGranularitySpec());
}
@Test
public void testRunParallelCompactionMultipleCompactionTaskSlots()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(4, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
Assert.assertEquals(4, stats.get(Stats.Compaction.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Compaction.SUBMITTED_TASKS));
}
@Test
public void testRunWithLockedIntervals()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
// Lock all intervals for dataSource_1 and dataSource_2
final String datasource1 = DATA_SOURCE_PREFIX + 1;
overlordClient.lockedIntervals
.computeIfAbsent(datasource1, k -> new ArrayList<>())
.add(Intervals.of("2017/2018"));
final String datasource2 = DATA_SOURCE_PREFIX + 2;
overlordClient.lockedIntervals
.computeIfAbsent(datasource2, k -> new ArrayList<>())
.add(Intervals.of("2017/2018"));
// Lock all intervals but one for dataSource_0
final String datasource0 = DATA_SOURCE_PREFIX + 0;
overlordClient.lockedIntervals
.computeIfAbsent(datasource0, k -> new ArrayList<>())
.add(Intervals.of("2017-01-01T13:00:00Z/2017-02-01"));
// Verify that locked intervals are skipped and only one compaction task
// is submitted for dataSource_0
CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(1, stats.get(Stats.Compaction.SUBMITTED_TASKS));
Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size());
final ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0);
Assert.assertEquals(datasource0, compactionTask.getDataSource());
Assert.assertEquals(
Intervals.of("2017-01-01T00:00:00/2017-01-01T12:00:00"),
compactionTask.getIoConfig().getInputSpec().getInterval()
);
}
@Test
public void testCompactWithTransformSpec()
{
NullHandling.initializeForTests();
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
new UserCompactionTaskTransformConfig(new SelectorDimFilter("dim1", "foo", null)),
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertNotNull(taskPayload.getTransformSpec());
Assert.assertEquals(new SelectorDimFilter("dim1", "foo", null), taskPayload.getTransformSpec().getFilter());
}
@Test
public void testCompactWithoutCustomSpecs()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertNull(taskPayload.getTransformSpec());
Assert.assertNull(taskPayload.getMetricsSpec());
}
@Test
public void testCompactWithMetricsSpec()
{
NullHandling.initializeForTests();
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")};
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
aggregatorFactories,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
AggregatorFactory[] actual = taskPayload.getMetricsSpec();
Assert.assertNotNull(actual);
Assert.assertArrayEquals(aggregatorFactories, actual);
}
@Test
public void testDetermineSegmentGranularityFromSegmentsToCompact()
{
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
segments.add(
new DataSegment(
dataSourceName,
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
"1",
null,
ImmutableList.of(),
ImmutableList.of(),
shardSpecFactory.apply(0, 2),
0,
10L
)
);
segments.add(
new DataSegment(
dataSourceName,
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
"1",
null,
ImmutableList.of(),
ImmutableList.of(),
shardSpecFactory.apply(1, 2),
0,
10L
)
);
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of());
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSourceName,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertEquals(
ClientCompactionIntervalSpec.fromSegments(segments, Granularities.DAY),
taskPayload.getIoConfig().getInputSpec()
);
ClientCompactionTaskGranularitySpec expectedGranularitySpec =
new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null);
Assert.assertEquals(expectedGranularitySpec, taskPayload.getGranularitySpec());
}
@Test
public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionConfig()
{
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
segments.add(
new DataSegment(
dataSourceName,
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
"1",
null,
ImmutableList.of(),
ImmutableList.of(),
shardSpecFactory.apply(0, 2),
0,
10L
)
);
segments.add(
new DataSegment(
dataSourceName,
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
"1",
null,
ImmutableList.of(),
ImmutableList.of(),
shardSpecFactory.apply(1, 2),
0,
10L
)
);
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of());
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSourceName,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertEquals(
ClientCompactionIntervalSpec.fromSegments(segments, Granularities.YEAR),
taskPayload.getIoConfig().getInputSpec()
);
ClientCompactionTaskGranularitySpec expectedGranularitySpec =
new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
Assert.assertEquals(expectedGranularitySpec, taskPayload.getGranularitySpec());
}
@Test
public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertNotNull(taskPayload.getTuningConfig());
Assert.assertNotNull(taskPayload.getTuningConfig().getAppendableIndexSpec());
Assert.assertTrue(((OnheapIncrementalIndex.Spec) taskPayload.getTuningConfig()
.getAppendableIndexSpec()).isPreserveExistingMetrics());
}
@Test
public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse()
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue();
Assert.assertNotNull(taskPayload.getTuningConfig());
Assert.assertNotNull(taskPayload.getTuningConfig().getAppendableIndexSpec());
Assert.assertFalse(((OnheapIncrementalIndex.Spec) taskPayload.getTuningConfig()
.getAppendableIndexSpec()).isPreserveExistingMetrics());
}
private void verifySnapshot(
CompactSegments compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
String dataSourceName,
long expectedByteCountAwaitingCompaction,
long expectedByteCountCompressed,
long expectedByteCountSkipped,
long expectedIntervalCountAwaitingCompaction,
long expectedIntervalCountCompressed,
long expectedIntervalCountSkipped,
long expectedSegmentCountAwaitingCompaction,
long expectedSegmentCountCompressed,
long expectedSegmentCountSkipped
)
{
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
AutoCompactionSnapshot snapshot = autoCompactionSnapshots.get(dataSourceName);
Assert.assertEquals(dataSourceName, snapshot.getDataSource());
Assert.assertEquals(scheduleStatus, snapshot.getScheduleStatus());
Assert.assertEquals(expectedByteCountAwaitingCompaction, snapshot.getBytesAwaitingCompaction());
Assert.assertEquals(expectedByteCountCompressed, snapshot.getBytesCompacted());
Assert.assertEquals(expectedByteCountSkipped, snapshot.getBytesSkipped());
Assert.assertEquals(expectedIntervalCountAwaitingCompaction, snapshot.getIntervalCountAwaitingCompaction());
Assert.assertEquals(expectedIntervalCountCompressed, snapshot.getIntervalCountCompacted());
Assert.assertEquals(expectedIntervalCountSkipped, snapshot.getIntervalCountSkipped());
Assert.assertEquals(expectedSegmentCountAwaitingCompaction, snapshot.getSegmentCountAwaitingCompaction());
Assert.assertEquals(expectedSegmentCountCompressed, snapshot.getSegmentCountCompacted());
Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
}
private void doCompactionAndAssertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
{
for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
// One compaction task triggered
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
// Note: Subsequent compaction run after the dataSource was compacted will show different numbers than
// on the run it was compacted. For example, in a compaction run, if a dataSource had 4 segments compacted,
// on the same compaction run the segment compressed count will be 4 but on subsequent run it might be 2
// (assuming the 4 segments was compacted into 2 segments).
for (int i = 0; i <= dataSourceIndex; i++) {
// dataSource up to dataSourceIndex now compacted. Check that the stats match the expectedAfterCompaction values
// This verify that dataSource which got slot to compact has correct statistics
if (i != dataSourceIndex) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40L * (compactionRunCount + 1),
40L * (compactionRunCount + 1),
40,
TOTAL_INTERVAL_PER_DATASOURCE - (compactionRunCount + 1),
(compactionRunCount + 1),
1,
TOTAL_SEGMENT_PER_DATASOURCE - 4L * (compactionRunCount + 1),
2L * (compactionRunCount + 1),
4
);
} else {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40L * (compactionRunCount + 1),
40L * (compactionRunCount + 1),
40,
TOTAL_INTERVAL_PER_DATASOURCE - (compactionRunCount + 1),
(compactionRunCount + 1),
1,
TOTAL_SEGMENT_PER_DATASOURCE - 4L * (compactionRunCount + 1),
2L * compactionRunCount + 4,
4
);
}
}
for (int i = dataSourceIndex + 1; i < 3; i++) {
// dataSource after dataSourceIndex is not yet compacted. Check that the stats match the expectedBeforeCompaction values
// This verify that dataSource that ran out of slot has correct statistics
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40L * compactionRunCount,
40L * compactionRunCount,
40,
TOTAL_INTERVAL_PER_DATASOURCE - compactionRunCount,
compactionRunCount,
1,
TOTAL_SEGMENT_PER_DATASOURCE - 4L * compactionRunCount,
2L * compactionRunCount,
4
);
}
}
}
private CoordinatorRunStats doCompactSegments(CompactSegments compactSegments)
{
return doCompactSegments(compactSegments, (Integer) null);
}
private CoordinatorRunStats doCompactSegments(CompactSegments compactSegments, @Nullable Integer numCompactionTaskSlots)
{
return doCompactSegments(compactSegments, createCompactionConfigs(), numCompactionTaskSlots);
}
private void doCompactSegments(
CompactSegments compactSegments,
List<DataSourceCompactionConfig> compactionConfigs
)
{
doCompactSegments(compactSegments, compactionConfigs, null);
}
private CoordinatorRunStats doCompactSegments(
CompactSegments compactSegments,
List<DataSourceCompactionConfig> compactionConfigs,
@Nullable Integer numCompactionTaskSlots
)
{
return doCompactSegments(compactSegments, compactionConfigs, numCompactionTaskSlots, false);
}
private CoordinatorRunStats doCompactSegments(
CompactSegments compactSegments,
List<DataSourceCompactionConfig> compactionConfigs,
@Nullable Integer numCompactionTaskSlots,
boolean useAutoScaleSlots
)
{
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withDataSourcesSnapshot(dataSources)
.withCompactionConfig(
new CoordinatorCompactionConfig(
compactionConfigs,
numCompactionTaskSlots == null ? null : 1.0, // 100% when numCompactionTaskSlots is not null
numCompactionTaskSlots,
useAutoScaleSlots
)
)
.build();
return compactSegments.run(params).getCoordinatorStats();
}
private void assertCompactSegments(
CompactSegments compactSegments,
Interval expectedInterval,
int expectedRemainingSegments,
int expectedCompactTaskCount,
Supplier<String> expectedVersionSupplier
)
{
for (int i = 0; i < 3; i++) {
final CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
expectedCompactTaskCount,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
// If expectedRemainingSegments is positive, we count the number of datasources
// which have that many segments waiting for compaction. Otherwise, we count
// all the datasources in the coordinator stats
final AtomicInteger numDatasources = new AtomicInteger();
stats.forEachStat(
(stat, rowKey, value) -> {
if (stat.equals(Stats.Compaction.PENDING_BYTES)
&& (expectedRemainingSegments <= 0 || value == expectedRemainingSegments)) {
numDatasources.incrementAndGet();
}
}
);
if (expectedRemainingSegments > 0) {
Assert.assertEquals(i + 1, numDatasources.get());
} else {
Assert.assertEquals(2 - i, numDatasources.get());
}
}
final Map<String, SegmentTimeline> dataSourceToTimeline
= dataSources.getUsedSegmentsTimelinesPerDataSource();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
List<TimelineObjectHolder<String, DataSegment>> holders = dataSourceToTimeline.get(dataSource).lookup(expectedInterval);
Assert.assertEquals(1, holders.size());
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holders.get(0).getObject());
Assert.assertEquals(2, chunks.size());
final String expectedVersion = expectedVersionSupplier.get();
for (PartitionChunk<DataSegment> chunk : chunks) {
Assert.assertEquals(expectedInterval, chunk.getObject().getInterval());
Assert.assertEquals(expectedVersion, chunk.getObject().getVersion());
}
}
}
private void assertLastSegmentNotCompacted(CompactSegments compactSegments)
{
// Segments of the latest interval should not be compacted
final Map<String, SegmentTimeline> dataSourceToTimeline
= dataSources.getUsedSegmentsTimelinesPerDataSource();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
final Interval interval = Intervals.of(StringUtils.format("2017-01-09T12:00:00/2017-01-10"));
List<TimelineObjectHolder<String, DataSegment>> holders = dataSourceToTimeline.get(dataSource).lookup(interval);
Assert.assertEquals(1, holders.size());
for (TimelineObjectHolder<String, DataSegment> holder : holders) {
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject());
Assert.assertEquals(4, chunks.size());
for (PartitionChunk<DataSegment> chunk : chunks) {
DataSegment segment = chunk.getObject();
Assert.assertEquals(interval, segment.getInterval());
Assert.assertEquals("version", segment.getVersion());
}
}
}
// Emulating realtime dataSource
final String dataSource = DATA_SOURCE_PREFIX + 0;
addMoreData(dataSource, 9);
CoordinatorRunStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
addMoreData(dataSource, 10);
stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.get(Stats.Compaction.SUBMITTED_TASKS)
);
}
private void addMoreData(String dataSource, int day)
{
final SegmentTimeline timeline
= dataSources.getUsedSegmentsTimelinesPerDataSource().get(dataSource);
for (int i = 0; i < 2; i++) {
DataSegment newSegment = createSegment(dataSource, day, true, i);
timeline.add(
newSegment.getInterval(),
newSegment.getVersion(),
newSegment.getShardSpec().createChunk(newSegment)
);
newSegment = createSegment(dataSource, day, false, i);
timeline.add(
newSegment.getInterval(),
newSegment.getVersion(),
newSegment.getShardSpec().createChunk(newSegment)
);
}
}
private List<DataSourceCompactionConfig> createCompactionConfigs()
{
return createCompactionConfigs(null);
}
private List<DataSourceCompactionConfig> createCompactionConfigs(@Nullable Integer maxNumConcurrentSubTasks)
{
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
50L,
null,
new Period("PT1H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
maxNumConcurrentSubTasks,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null
)
);
}
return compactionConfigs;
}
private class TestOverlordClient extends NoopOverlordClient
{
private final ObjectMapper jsonMapper;
// Map from Task Id to the intervals locked by that task
private final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
// List of submitted compaction tasks for verification in the tests
private final List<ClientCompactionTaskQuery> submittedCompactionTasks = new ArrayList<>();
private int compactVersionSuffix = 0;
private TestOverlordClient(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public ListenableFuture<URI> findCurrentLeader()
{
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<Void> runTask(String taskId, Object taskObject)
{
final ClientTaskQuery taskQuery = jsonMapper.convertValue(taskObject, ClientTaskQuery.class);
if (!(taskQuery instanceof ClientCompactionTaskQuery)) {
throw new IAE("Cannot run non-compaction task");
}
final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) taskQuery;
submittedCompactionTasks.add(compactionTaskQuery);
final Interval intervalToCompact = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
final SegmentTimeline timeline = dataSources.getUsedSegmentsTimelinesPerDataSource()
.get(compactionTaskQuery.getDataSource());
final List<DataSegment> segments = timeline.lookup(intervalToCompact)
.stream()
.flatMap(holder -> Streams.sequentialStreamFrom(holder.getObject()))
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
compactSegments(timeline, segments, compactionTaskQuery);
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
List<LockFilterPolicy> lockFilterPolicies
)
{
return Futures.immediateFuture(lockedIntervals);
}
@Override
public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
@Nullable String state,
@Nullable String dataSource,
@Nullable Integer maxCompletedTasks
)
{
return Futures.immediateFuture(CloseableIterators.withEmptyBaggage(Collections.emptyIterator()));
}
@Override
public ListenableFuture<IndexingTotalWorkerCapacityInfo> getTotalWorkerCapacity()
{
return Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(5, 10));
}
private void compactSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
List<DataSegment> segments,
ClientCompactionTaskQuery clientCompactionTaskQuery
)
{
Preconditions.checkArgument(segments.size() > 1);
DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
for (DataSegment segment : segments) {
if (segment.getInterval().getStart().compareTo(minStart) < 0) {
minStart = segment.getInterval().getStart();
}
if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
maxEnd = segment.getInterval().getEnd();
}
}
Interval compactInterval = new Interval(minStart, maxEnd);
segments.forEach(
segment -> timeline.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
);
final String version = "newVersion_" + compactVersionSuffix++;
final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
final PartitionsSpec compactionPartitionsSpec;
if (clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec() instanceof DynamicPartitionsSpec) {
compactionPartitionsSpec = new DynamicPartitionsSpec(
clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(),
((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE)
);
} else {
compactionPartitionsSpec = clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec();
}
Map<String, Object> transformSpec = null;
try {
if (clientCompactionTaskQuery.getTransformSpec() != null) {
transformSpec = jsonMapper.readValue(
jsonMapper.writeValueAsString(new TransformSpec(clientCompactionTaskQuery.getTransformSpec()
.getFilter(), null)),
new TypeReference<Map<String, Object>>()
{
}
);
}
}
catch (JsonProcessingException e) {
throw new IAE("Invalid Json payload");
}
List<Object> metricsSpec = null;
if (clientCompactionTaskQuery.getMetricsSpec() != null) {
metricsSpec = jsonMapper.convertValue(clientCompactionTaskQuery.getMetricsSpec(), new TypeReference<List<Object>>() {});
}
for (int i = 0; i < 2; i++) {
DataSegment compactSegment = new DataSegment(
segments.get(0).getDataSource(),
compactInterval,
version,
null,
segments.get(0).getDimensions(),
segments.get(0).getMetrics(),
shardSpecFactory.apply(i, 2),
new CompactionState(
compactionPartitionsSpec,
clientCompactionTaskQuery.getDimensionsSpec() == null ? null : new DimensionsSpec(
clientCompactionTaskQuery.getDimensionsSpec().getDimensions()
),
metricsSpec,
transformSpec,
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "roaring"),
"dimensionCompression",
"lz4",
"metricCompression",
"lz4",
"longEncoding",
"longs"
),
ImmutableMap.of()
),
1,
segmentSize
);
timeline.add(
compactInterval,
compactSegment.getVersion(),
compactSegment.getShardSpec().createChunk(compactSegment)
);
}
}
}
public static class StaticUtilsTest
{
@Test
public void testIsParalleModeNullTuningConfigReturnFalse()
{
Assert.assertFalse(CompactSegments.isParallelMode(null));
}
@Test
public void testIsParallelModeNullPartitionsSpecReturnFalse()
{
ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(null);
Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
}
@Test
public void testIsParallelModeNonRangePartitionVaryingMaxNumConcurrentSubTasks()
{
ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(null);
Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
}
@Test
public void testIsParallelModeRangePartitionVaryingMaxNumConcurrentSubTasks()
{
ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(SingleDimensionPartitionsSpec.class));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(null);
Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
}
@Test
public void testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsParallelMode()
{
ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
Assert.assertEquals(3, CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
}
@Test
public void testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsSequentialMode()
{
ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
Assert.assertEquals(1, CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
}
}
private static ArgumentCaptor<Object> setUpMockClient(final OverlordClient mockClient)
{
final ArgumentCaptor<Object> payloadCaptor = ArgumentCaptor.forClass(Object.class);
Mockito.when(mockClient.taskStatuses(null, null, 0))
.thenReturn(Futures.immediateFuture(CloseableIterators.withEmptyBaggage(Collections.emptyIterator())));
Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any()))
.thenReturn(Futures.immediateFuture(Collections.emptyMap()));
Mockito.when(mockClient.getTotalWorkerCapacity())
.thenReturn(Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(0, 0)));
Mockito.when(mockClient.runTask(ArgumentMatchers.anyString(), payloadCaptor.capture()))
.thenReturn(Futures.immediateFuture(null));
return payloadCaptor;
}
}