blob: 8d9b2943b6046f80d3901856dcde4c4f3817b6c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.druid.utils.Streams;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class CompactSegmentsTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String DATA_SOURCE_PREFIX = "dataSource_";
// Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
final MutableInt nextRangePartitionBoundary = new MutableInt(0);
return ImmutableList.of(
new Object[]{
new DynamicPartitionsSpec(300000, Long.MAX_VALUE),
(BiFunction<Integer, Integer, ShardSpec>) NumberedShardSpec::new
},
new Object[]{
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")),
(BiFunction<Integer, Integer, ShardSpec>) (bucketId, numBuckets) -> new HashBasedNumberedShardSpec(
bucketId,
numBuckets,
bucketId,
numBuckets,
ImmutableList.of("dim"),
null,
JSON_MAPPER
)
},
new Object[]{
new SingleDimensionPartitionsSpec(300000, null, "dim", false),
(BiFunction<Integer, Integer, ShardSpec>) (bucketId, numBuckets) -> new SingleDimensionShardSpec(
"dim",
bucketId == 0 ? null : String.valueOf(nextRangePartitionBoundary.getAndIncrement()),
bucketId.equals(numBuckets) ? null : String.valueOf(nextRangePartitionBoundary.getAndIncrement()),
bucketId,
numBuckets
)
}
);
}
private final PartitionsSpec partitionsSpec;
private final BiFunction<Integer, Integer, ShardSpec> shardSpecFactory;
private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
public CompactSegmentsTest(PartitionsSpec partitionsSpec, BiFunction<Integer, Integer, ShardSpec> shardSpecFactory)
{
this.partitionsSpec = partitionsSpec;
this.shardSpecFactory = shardSpecFactory;
}
@Before
public void setup()
{
List<DataSegment> segments = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
segments.add(createSegment(dataSource, j, true, k));
segments.add(createSegment(dataSource, j, false, k));
}
}
}
dataSources = DataSourcesSnapshot
.fromUsedSegments(segments, ImmutableMap.of())
.getUsedSegmentsTimelinesPerDataSource();
}
private DataSegment createSegment(String dataSource, int startDay, boolean beforeNoon, int partition)
{
final ShardSpec shardSpec = shardSpecFactory.apply(partition, 2);
final Interval interval = beforeNoon ?
Intervals.of(
StringUtils.format(
"2017-01-%02dT00:00:00/2017-01-%02dT12:00:00",
startDay + 1,
startDay + 1
)
) :
Intervals.of(
StringUtils.format(
"2017-01-%02dT12:00:00/2017-01-%02dT00:00:00",
startDay + 1,
startDay + 2
)
);
return new DataSegment(
dataSource,
interval,
"version",
null,
ImmutableList.of(),
ImmutableList.of(),
shardSpec,
0,
10L
);
}
@Test
public void testRun()
{
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
private int i = 0;
@Override
public String get()
{
return "newVersion_" + i++;
}
};
int expectedCompactTaskCount = 1;
int expectedRemainingSegments = 400;
// compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 9, 9),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
// compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
for (int endDay = 4; endDay > 1; endDay -= 1) {
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 40;
assertCompactSegments(
compactSegments,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
}
assertLastSegmentNotCompacted(compactSegments);
}
@Test
public void testMakeStats()
{
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
for (int compaction_run_count = 0; compaction_run_count < 11; compaction_run_count++) {
assertCompactSegmentStatistics(compactSegments, compaction_run_count);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
for (int i = 0; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
0,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
0,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
0
);
}
// Run auto compaction without any dataSource in the compaction config
// Should still populate the result of everything fully compacted
doCompactSegments(compactSegments, new ArrayList<>());
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
for (int i = 0; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
0,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
0,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
0
);
}
assertLastSegmentNotCompacted(compactSegments);
}
@Test
public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIntervals()
{
// Only test and validate for one datasource for simplicity.
// This dataSource has three intervals already compacted (3 intervals, 120 byte, 12 segments already compacted)
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day compacted (two compacted intervals back-to-back)
beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
}
if (j == 1) {
// Make one interval on this day compacted
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
}
segments.add(beforeNoon);
segments.add(afterNoon);
}
}
dataSources = DataSourcesSnapshot
.fromUsedSegments(segments, ImmutableMap.of())
.getUsedSegmentsTimelinesPerDataSource();
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 120 byte, 12 segments already compacted before the run
for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
120 + 40 * (compaction_run_count + 1),
0,
TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
3 + (compaction_run_count + 1),
0,
TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
// 12 segments was compressed before any auto compaction
// 4 segments was compressed in this run of auto compaction
// Each previous auto compaction run resulted in 2 compacted segments (4 segments compacted into 2 segments)
12 + 4 + 2 * (compaction_run_count),
0
);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
0,
TOTAL_BYTE_PER_DATASOURCE,
0,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
0,
0,
// 12 segments was compressed before any auto compaction
// 32 segments needs compaction which is now compacted into 16 segments (4 segments compacted into 2 segments each run)
12 + 16,
0
);
}
@Test
public void testMakeStatsForDataSourceWithSkipped()
{
// Only test and validate for one datasource for simplicity.
// This dataSource has three intervals skipped (3 intervals, 1200 byte, 12 segments skipped by auto compaction)
// Note that these segment used to be 10 bytes each in other tests, we are increasing it to 100 bytes each here
// so that they will be skipped by the auto compaction.
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day skipped (two skipped intervals back-to-back)
beforeNoon = beforeNoon.withSize(100);
afterNoon = afterNoon.withSize(100);
}
if (j == 1) {
// Make one interval on this day skipped
afterNoon = afterNoon.withSize(100);
}
segments.add(beforeNoon);
segments.add(afterNoon);
}
}
dataSources = DataSourcesSnapshot
.fromUsedSegments(segments, ImmutableMap.of())
.getUsedSegmentsTimelinesPerDataSource();
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 1200 byte (each segment is 100 bytes), 12 segments will be skipped by auto compaction
for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
// Minus 120 bytes accounting for the three skipped segments' original size
TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
40 * (compaction_run_count + 1),
1200,
TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
(compaction_run_count + 1),
3,
TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
4 + 2 * (compaction_run_count),
12
);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
0,
// Minus 120 bytes accounting for the three skipped segments' original size
TOTAL_BYTE_PER_DATASOURCE - 120,
1200,
0,
TOTAL_INTERVAL_PER_DATASOURCE - 3,
3,
0,
16,
12
);
}
private void verifySnapshot(
CompactSegments compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
String dataSourceName,
long expectedByteCountAwaitingCompaction,
long expectedByteCountCompressed,
long expectedByteCountSkipped,
long expectedIntervalCountAwaitingCompaction,
long expectedIntervalCountCompressed,
long expectedIntervalCountSkipped,
long expectedSegmentCountAwaitingCompaction,
long expectedSegmentCountCompressed,
long expectedSegmentCountSkipped
)
{
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
AutoCompactionSnapshot snapshot = autoCompactionSnapshots.get(dataSourceName);
Assert.assertEquals(dataSourceName, snapshot.getDataSource());
Assert.assertEquals(scheduleStatus, snapshot.getScheduleStatus());
Assert.assertEquals(expectedByteCountAwaitingCompaction, snapshot.getBytesAwaitingCompaction());
Assert.assertEquals(expectedByteCountCompressed, snapshot.getBytesCompacted());
Assert.assertEquals(expectedByteCountSkipped, snapshot.getBytesSkipped());
Assert.assertEquals(expectedIntervalCountAwaitingCompaction, snapshot.getIntervalCountAwaitingCompaction());
Assert.assertEquals(expectedIntervalCountCompressed, snapshot.getIntervalCountCompacted());
Assert.assertEquals(expectedIntervalCountSkipped, snapshot.getIntervalCountSkipped());
Assert.assertEquals(expectedSegmentCountAwaitingCompaction, snapshot.getSegmentCountAwaitingCompaction());
Assert.assertEquals(expectedSegmentCountCompressed, snapshot.getSegmentCountCompacted());
Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
}
private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compaction_run_count)
{
for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
// One compaction task triggered
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
// Note: Subsequent compaction run after the dataSource was compacted will show different numbers than
// on the run it was compacted. For example, in a compaction run, if a dataSource had 4 segments compacted,
// on the same compaction run the segment compressed count will be 4 but on subsequent run it might be 2
// (assuming the 4 segments was compacted into 2 segments).
for (int i = 0; i <= dataSourceIndex; i++) {
// dataSource up to dataSourceIndex now compacted. Check that the stats match the expectedAfterCompaction values
// This verify that dataSource which got slot to compact has correct statistics
if (i != dataSourceIndex) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
40 * (compaction_run_count + 1),
0,
TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
(compaction_run_count + 1),
0,
TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
2 * (compaction_run_count + 1),
0
);
} else {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
40 * (compaction_run_count + 1),
0,
TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
(compaction_run_count + 1),
0,
TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
2 * compaction_run_count + 4,
0
);
}
}
for (int i = dataSourceIndex + 1; i < 3; i++) {
// dataSource after dataSourceIndex is not yet compacted. Check that the stats match the expectedBeforeCompaction values
// This verify that dataSource that ran out of slot has correct statistics
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40 * compaction_run_count,
40 * compaction_run_count,
0,
TOTAL_INTERVAL_PER_DATASOURCE - compaction_run_count,
compaction_run_count,
0,
TOTAL_SEGMENT_PER_DATASOURCE - 4 * compaction_run_count,
2 * compaction_run_count,
0
);
}
}
}
private CoordinatorStats doCompactSegments(CompactSegments compactSegments)
{
return doCompactSegments(compactSegments, createCompactionConfigs());
}
private CoordinatorStats doCompactSegments(CompactSegments compactSegments, List<DataSourceCompactionConfig> compactionConfigs)
{
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
.withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs))
.build();
return compactSegments.run(params).getCoordinatorStats();
}
private void assertCompactSegments(
CompactSegments compactSegments,
Interval expectedInterval,
int expectedRemainingSegments,
int expectedCompactTaskCount,
Supplier<String> expectedVersionSupplier
)
{
for (int i = 0; i < 3; i++) {
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
expectedCompactTaskCount,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
// One of dataSource is compacted
if (expectedRemainingSegments > 0) {
// If expectedRemainingSegments is positive, we check how many dataSources have the segments waiting for
// compaction.
long numDataSourceOfExpectedRemainingSegments = stats
.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING)
.stream()
.mapToLong(ds -> stats.getDataSourceStat(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING, ds))
.filter(stat -> stat == expectedRemainingSegments)
.count();
Assert.assertEquals(i + 1, numDataSourceOfExpectedRemainingSegments);
} else {
// Otherwise, we check how many dataSources are in the coordinator stats.
Assert.assertEquals(
2 - i,
stats.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING).size()
);
}
}
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
List<TimelineObjectHolder<String, DataSegment>> holders = dataSources.get(dataSource).lookup(expectedInterval);
Assert.assertEquals(1, holders.size());
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holders.get(0).getObject());
Assert.assertEquals(2, chunks.size());
final String expectedVersion = expectedVersionSupplier.get();
for (PartitionChunk<DataSegment> chunk : chunks) {
Assert.assertEquals(expectedInterval, chunk.getObject().getInterval());
Assert.assertEquals(expectedVersion, chunk.getObject().getVersion());
}
}
}
private void assertLastSegmentNotCompacted(CompactSegments compactSegments)
{
// Segments of the latest interval should not be compacted
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
final Interval interval = Intervals.of(StringUtils.format("2017-01-09T12:00:00/2017-01-10"));
List<TimelineObjectHolder<String, DataSegment>> holders = dataSources.get(dataSource).lookup(interval);
Assert.assertEquals(1, holders.size());
for (TimelineObjectHolder<String, DataSegment> holder : holders) {
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject());
Assert.assertEquals(4, chunks.size());
for (PartitionChunk<DataSegment> chunk : chunks) {
DataSegment segment = chunk.getObject();
Assert.assertEquals(interval, segment.getInterval());
Assert.assertEquals("version", segment.getVersion());
}
}
}
// Emulating realtime dataSource
final String dataSource = DATA_SOURCE_PREFIX + 0;
addMoreData(dataSource, 9);
CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
addMoreData(dataSource, 10);
stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
}
private void addMoreData(String dataSource, int day)
{
for (int i = 0; i < 2; i++) {
DataSegment newSegment = createSegment(dataSource, day, true, i);
dataSources.get(dataSource).add(
newSegment.getInterval(),
newSegment.getVersion(),
newSegment.getShardSpec().createChunk(newSegment)
);
newSegment = createSegment(dataSource, day, false, i);
dataSources.get(dataSource).add(
newSegment.getInterval(),
newSegment.getVersion(),
newSegment.getShardSpec().createChunk(newSegment)
);
}
}
private List<DataSourceCompactionConfig> createCompactionConfigs()
{
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
50L,
null,
new Period("PT1H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null
)
);
}
return compactionConfigs;
}
private class TestDruidLeaderClient extends DruidLeaderClient
{
private final ObjectMapper jsonMapper;
private int compactVersionSuffix = 0;
private TestDruidLeaderClient(ObjectMapper jsonMapper)
{
super(null, new TestNodeDiscoveryProvider(), null, null, null);
this.jsonMapper = jsonMapper;
}
@Override
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
{
return new Request(httpMethod, new URL("http", "host", 8090, urlPath));
}
@Override
public StringFullResponseHolder go(Request request) throws IOException
{
final String urlString = request.getUrl().toString();
if (urlString.contains("/druid/indexer/v1/task")) {
return handleTask(request);
} else if (urlString.contains("/druid/indexer/v1/workers")) {
return handleWorkers();
} else if (urlString.contains("/druid/indexer/v1/waitingTasks")
|| urlString.contains("/druid/indexer/v1/pendingTasks")
|| urlString.contains("/druid/indexer/v1/runningTasks")) {
return createStringFullResponseHolder(jsonMapper.writeValueAsString(Collections.emptyList()));
} else {
throw new IAE("Cannot handle request for url[%s]", request.getUrl());
}
}
private StringFullResponseHolder createStringFullResponseHolder(String content)
{
final HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
final StringFullResponseHolder holder = new StringFullResponseHolder(
HttpResponseStatus.OK,
httpResponse,
StandardCharsets.UTF_8
);
holder.addChunk(content);
return holder;
}
private StringFullResponseHolder handleWorkers() throws JsonProcessingException
{
final List<IndexingWorkerInfo> workerInfos = new ArrayList<>();
// There are 10 workers available in this test
for (int i = 0; i < 10; i++) {
workerInfos.add(
new IndexingWorkerInfo(
new IndexingWorker("http", "host", "8091", 1, "version"),
0,
Collections.emptySet(),
Collections.emptyList(),
DateTimes.EPOCH,
null
)
);
}
return createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos));
}
private StringFullResponseHolder handleTask(Request request) throws IOException
{
final ClientTaskQuery taskQuery = jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class);
if (!(taskQuery instanceof ClientCompactionTaskQuery)) {
throw new IAE("Cannot run non-compaction task");
}
final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) taskQuery;
final Interval intervalToCompact = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(
compactionTaskQuery.getDataSource()
);
final List<DataSegment> segments = timeline.lookup(intervalToCompact)
.stream()
.flatMap(holder -> Streams.sequentialStreamFrom(holder.getObject()))
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
compactSegments(
timeline,
segments,
compactionTaskQuery.getTuningConfig()
);
return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId())));
}
private void compactSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
List<DataSegment> segments,
ClientCompactionTaskQueryTuningConfig tuningConfig
)
{
Preconditions.checkArgument(segments.size() > 1);
DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
for (DataSegment segment : segments) {
if (segment.getInterval().getStart().compareTo(minStart) < 0) {
minStart = segment.getInterval().getStart();
}
if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
maxEnd = segment.getInterval().getEnd();
}
}
Interval compactInterval = new Interval(minStart, maxEnd);
segments.forEach(
segment -> timeline.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
);
final String version = "newVersion_" + compactVersionSuffix++;
final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
final PartitionsSpec compactionPartitionsSpec;
if (tuningConfig.getPartitionsSpec() instanceof DynamicPartitionsSpec) {
compactionPartitionsSpec = new DynamicPartitionsSpec(
tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
((DynamicPartitionsSpec) tuningConfig.getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE)
);
} else {
compactionPartitionsSpec = tuningConfig.getPartitionsSpec();
}
for (int i = 0; i < 2; i++) {
DataSegment compactSegment = new DataSegment(
segments.get(0).getDataSource(),
compactInterval,
version,
null,
segments.get(0).getDimensions(),
segments.get(0).getMetrics(),
shardSpecFactory.apply(i, 2),
new CompactionState(
compactionPartitionsSpec,
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "roaring", "compressRunOnSerialization", true),
"dimensionCompression",
"lz4",
"metricCompression",
"lz4",
"longEncoding",
"longs"
)
),
1,
segmentSize
);
timeline.add(
compactInterval,
compactSegment.getVersion(),
compactSegment.getShardSpec().createChunk(compactSegment)
);
}
}
}
private static class TestNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
@Override
public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
throw new UnsupportedOperationException();
}
@Override
public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
{
return EasyMock.niceMock(DruidNodeDiscovery.class);
}
}
}