blob: c82f1e7ece37ef26886247463d10f69bab213592 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.SimpleQueryableIndex;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.SpatialIndex;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CompactionTaskTest
{
private static final long SEGMENT_SIZE_BYTES = 100;
private static final int NUM_ROWS_PER_SEGMENT = 10;
private static final String DATA_SOURCE = "dataSource";
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double";
private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-07-01");
private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
Intervals.of("2017-01-01/2017-02-01"),
Intervals.of("2017-02-01/2017-03-01"),
Intervals.of("2017-03-01/2017-04-01"),
Intervals.of("2017-04-01/2017-05-01"),
Intervals.of("2017-05-01/2017-06-01"),
Intervals.of("2017-06-01/2017-07-01"),
// overlapping intervals
Intervals.of("2017-06-01/2017-06-02"),
Intervals.of("2017-06-15/2017-06-16"),
Intervals.of("2017-06-30/2017-07-01")
);
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = new HashMap<>();
private static final ParallelIndexTuningConfig TUNING_CONFIG = createTuningConfig();
private static final TestUtils TEST_UTILS = new TestUtils();
private static final Map<DataSegment, File> SEGMENT_MAP = new HashMap<>();
private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP);
private static final IndexingServiceClient INDEXING_SERVICE_CLIENT = new NoopIndexingServiceClient();
private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private static Map<String, DimensionSchema> DIMENSIONS;
private static List<AggregatorFactory> AGGREGATORS;
private static List<DataSegment> SEGMENTS;
private TaskToolbox toolbox;
private SegmentLoaderFactory segmentLoaderFactory;
@BeforeClass
public static void setupClass()
{
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-01-01/2017-02-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-02-01/2017-03-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-03-01/2017-04-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-04-01/2017-05-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-05-01/2017-06-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-06-02"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-15/2017-06-16"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-30/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
DIMENSIONS = new HashMap<>();
AGGREGATORS = new ArrayList<>();
DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN));
int numUmbrellaIntervals = 6;
for (int i = 0; i < numUmbrellaIntervals; i++) {
final StringDimensionSchema schema = new StringDimensionSchema(
"string_dim_" + i,
null,
null
);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < numUmbrellaIntervals; i++) {
final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < numUmbrellaIntervals; i++) {
final FloatDimensionSchema schema = new FloatDimensionSchema("float_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < numUmbrellaIntervals; i++) {
final DoubleDimensionSchema schema = new DoubleDimensionSchema("double_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
AGGREGATORS.add(new CountAggregatorFactory("agg_0"));
AGGREGATORS.add(new LongSumAggregatorFactory("agg_1", "long_dim_1"));
AGGREGATORS.add(new LongMaxAggregatorFactory("agg_2", "long_dim_2"));
AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
SEGMENT_MAP.put(
new DataSegment(
DATA_SOURCE,
SEGMENT_INTERVALS.get(i),
"version_" + i,
ImmutableMap.of(),
findDimensions(i, SEGMENT_INTERVALS.get(i)),
AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()),
new NumberedShardSpec(0, 1),
0,
SEGMENT_SIZE_BYTES
),
new File("file_" + i)
);
}
SEGMENTS = new ArrayList<>(SEGMENT_MAP.keySet());
}
private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
{
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
objectMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector,
objectMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector,
objectMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
GuiceInjectableValues injectableValues = new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
binder -> {
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
binder.bind(RowIngestionMetersFactory.class).toInstance(TEST_UTILS.getRowIngestionMetersFactory());
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager());
binder.bind(IndexingServiceClient.class).toInstance(INDEXING_SERVICE_CLIENT);
}
)
)
);
objectMapper.setInjectableValues(injectableValues);
objectMapper.registerModule(
new SimpleModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec"))
);
objectMapper.registerModules(new IndexingServiceTuningConfigModule().getJacksonModules());
return objectMapper;
}
private static List<String> findDimensions(int startIndex, Interval segmentInterval)
{
final List<String> dimensions = new ArrayList<>();
dimensions.add(TIMESTAMP_COLUMN);
for (int i = 0; i < 6; i++) {
int postfix = i + startIndex;
postfix = postfix % 6;
dimensions.add("string_dim_" + postfix);
dimensions.add("long_dim_" + postfix);
dimensions.add("float_dim_" + postfix);
dimensions.add("double_dim_" + postfix);
}
dimensions.add(MIXED_TYPE_COLUMN_MAP.get(segmentInterval).getName());
return dimensions;
}
private static ParallelIndexTuningConfig createTuningConfig()
{
return new ParallelIndexTuningConfig(
null,
null, // null to compute maxRowsPerSegment automatically
500000,
1000000L,
null,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
null,
true,
false,
5000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
testIndexIO,
SEGMENT_MAP
);
segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, OBJECT_MAPPER);
}
@Test
public void testSerdeWithInterval() throws IOException
{
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask task = builder
.inputSpec(
new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))
)
.tuningConfig(createTuningConfig())
.context(ImmutableMap.of("testKey", "testContext"))
.build();
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@Test
public void testSerdeWithSegments() throws IOException
{
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask task = builder
.segments(SEGMENTS)
.tuningConfig(createTuningConfig())
.context(ImmutableMap.of("testKey", "testContext"))
.build();
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@Test
public void testSerdeWithDimensions() throws IOException
{
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask task = builder
.segments(SEGMENTS)
.dimensionsSpec(
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim2"),
new StringDimensionSchema("dim3")
)
)
)
.tuningConfig(createTuningConfig())
.context(ImmutableMap.of("testKey", "testVal"))
.build();
final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
assertEquals(task, fromJson);
}
@Test
public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws IOException
{
final OldCompactionTaskWithAnyTuningConfigType oldTask = new OldCompactionTaskWithAnyTuningConfigType(
null,
null,
DATA_SOURCE,
null,
SEGMENTS,
null,
null,
null,
null,
null,
new IndexTuningConfig(
null,
null, // null to compute maxRowsPerSegment automatically
500000,
1000000L,
null,
null,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
null,
true,
false,
5000L,
null,
null,
null,
null,
null
),
null,
toolbox.getJsonMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
toolbox.getChatHandlerProvider(),
toolbox.getRowIngestionMetersFactory(),
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
toolbox.getAppenderatorsManager()
);
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask expectedFromJson = builder
.segments(SEGMENTS)
.tuningConfig(CompactionTask.getTuningConfig(oldTask.getTuningConfig()))
.build();
final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper) OBJECT_MAPPER);
mapper.registerSubtypes(new NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact"));
final byte[] bytes = mapper.writeValueAsBytes(oldTask);
final CompactionTask fromJson = mapper.readValue(bytes, CompactionTask.class);
assertEquals(expectedFromJson, fromJson);
}
@Test
public void testSerdeWithUnknownTuningConfigThrowingError() throws IOException
{
final OldCompactionTaskWithAnyTuningConfigType taskWithUnknownTuningConfig =
new OldCompactionTaskWithAnyTuningConfigType(
null,
null,
DATA_SOURCE,
null,
SEGMENTS,
null,
null,
null,
null,
null,
RealtimeTuningConfig.makeDefaultTuningConfig(null),
null,
OBJECT_MAPPER,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
toolbox.getRowIngestionMetersFactory(),
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY,
toolbox.getAppenderatorsManager()
);
final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper) OBJECT_MAPPER);
mapper.registerSubtypes(
new NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact"),
new NamedType(RealtimeTuningConfig.class, "realtime")
);
final byte[] bytes = mapper.writeValueAsBytes(taskWithUnknownTuningConfig);
expectedException.expect(ValueInstantiationException.class);
expectedException.expectCause(CoreMatchers.instanceOf(IllegalStateException.class));
expectedException.expectMessage(
"Unknown tuningConfig type: [org.apache.druid.segment.indexing.RealtimeTuningConfig]"
);
mapper.readValue(bytes, CompactionTask.class);
}
private static void assertEquals(CompactionTask expected, CompactionTask actual)
{
Assert.assertEquals(expected.getType(), actual.getType());
Assert.assertEquals(expected.getDataSource(), actual.getDataSource());
Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig());
Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec());
Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec());
Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig());
Assert.assertEquals(expected.getContext(), actual.getContext());
}
@Test
public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH
);
}
@Test
public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException
{
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
100000,
null,
500000,
1000000L,
null,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
null,
true,
false,
null,
null,
null,
10,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH
);
}
@Test
public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException
{
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
500000,
1000000L,
1000000L,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
null,
false,
false,
5000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH
);
}
@Test
public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException
{
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
500000,
1000000L,
null,
null,
null,
new HashedPartitionsSpec(null, 3, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
null,
true,
false,
5000L,
null,
null,
10,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH
);
}
@Test
public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOException, SegmentLoadingException
{
final DimensionsSpec customSpec = new DimensionsSpec(
Lists.newArrayList(
new LongDimensionSchema("timestamp"),
new StringDimensionSchema("string_dim_0"),
new StringDimensionSchema("string_dim_1"),
new StringDimensionSchema("string_dim_2"),
new StringDimensionSchema("string_dim_3"),
new StringDimensionSchema("string_dim_4"),
new LongDimensionSchema("long_dim_0"),
new LongDimensionSchema("long_dim_1"),
new LongDimensionSchema("long_dim_2"),
new LongDimensionSchema("long_dim_3"),
new LongDimensionSchema("long_dim_4"),
new FloatDimensionSchema("float_dim_0"),
new FloatDimensionSchema("float_dim_1"),
new FloatDimensionSchema("float_dim_2"),
new FloatDimensionSchema("float_dim_3"),
new FloatDimensionSchema("float_dim_4"),
new DoubleDimensionSchema("double_dim_0"),
new DoubleDimensionSchema("double_dim_1"),
new DoubleDimensionSchema("double_dim_2"),
new DoubleDimensionSchema("double_dim_3"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema(MIXED_TYPE_COLUMN)
)
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
customSpec,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(6);
IntStream.range(0, 6).forEach(i -> dimensionsSpecs.add(customSpec));
assertIngestionSchema(
ingestionSpecs,
dimensionsSpecs,
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH
);
}
@Test
public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, SegmentLoadingException
{
final AggregatorFactory[] customMetricsSpec = new AggregatorFactory[]{
new CountAggregatorFactory("custom_count"),
new LongSumAggregatorFactory("custom_long_sum", "agg_1"),
new FloatMinAggregatorFactory("custom_float_min", "agg_3"),
new DoubleMaxAggregatorFactory("custom_double_max", "agg_4")
};
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
customMetricsSpec,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Arrays.asList(customMetricsSpec),
SEGMENT_INTERVALS,
Granularities.MONTH
);
}
@Test
public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH
);
}
@Test
public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOException, SegmentLoadingException
{
expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class));
expectedException.expectMessage(CoreMatchers.containsString("are different from the current used segments"));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
Collections.sort(segments);
// Remove one segment in the middle
segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
}
@Test
public void testMissingMetadata() throws IOException, SegmentLoadingException
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(CoreMatchers.startsWith("Index metadata doesn't exist for segment"));
final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO();
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
}
@Test
public void testEmptyInterval()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask task = builder
.interval(Intervals.of("2000-01-01/2000-01-01"))
.build();
}
@Test
public void testSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new PeriodGranularity(Period.months(3), null, null),
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null)
);
}
@Test
public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH
);
}
private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration()
{
return ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
}
private static List<DimensionSchema> getDimensionSchema(DimensionSchema mixedTypeColumn)
{
return Lists.newArrayList(
new LongDimensionSchema("timestamp"),
new StringDimensionSchema("string_dim_4"),
new LongDimensionSchema("long_dim_4"),
new FloatDimensionSchema("float_dim_4"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema("string_dim_0"),
new LongDimensionSchema("long_dim_0"),
new FloatDimensionSchema("float_dim_0"),
new DoubleDimensionSchema("double_dim_0"),
new StringDimensionSchema("string_dim_1"),
new LongDimensionSchema("long_dim_1"),
new FloatDimensionSchema("float_dim_1"),
new DoubleDimensionSchema("double_dim_1"),
new StringDimensionSchema("string_dim_2"),
new LongDimensionSchema("long_dim_2"),
new FloatDimensionSchema("float_dim_2"),
new DoubleDimensionSchema("double_dim_2"),
new StringDimensionSchema("string_dim_3"),
new LongDimensionSchema("long_dim_3"),
new FloatDimensionSchema("float_dim_3"),
new DoubleDimensionSchema("double_dim_3"),
new StringDimensionSchema("string_dim_5"),
new LongDimensionSchema("long_dim_5"),
new FloatDimensionSchema("float_dim_5"),
new DoubleDimensionSchema("double_dim_5"),
mixedTypeColumn
);
}
private void assertIngestionSchema(
List<ParallelIndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs,
List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals,
Granularity expectedSegmentGranularity
)
{
assertIngestionSchema(
ingestionSchemas,
expectedDimensionsSpecs,
expectedMetricsSpec,
expectedSegmentIntervals,
new ParallelIndexTuningConfig(
null,
null,
500000,
1000000L,
Long.MAX_VALUE,
null,
null,
new HashedPartitionsSpec(5000000, null, null), // automatically computed targetPartitionSize
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
null,
true,
false,
5000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
expectedSegmentGranularity
);
}
private void assertIngestionSchema(
List<ParallelIndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs,
List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals,
ParallelIndexTuningConfig expectedTuningConfig,
Granularity expectedSegmentGranularity
)
{
Preconditions.checkArgument(
ingestionSchemas.size() == expectedDimensionsSpecs.size(),
"ingesionSchemas.size()[%s] should be same with expectedDimensionsSpecs.size()[%s]",
ingestionSchemas.size(),
expectedDimensionsSpecs.size()
);
for (int i = 0; i < ingestionSchemas.size(); i++) {
final ParallelIndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i);
// assert dataSchema
final DataSchema dataSchema = ingestionSchema.getDataSchema();
Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
Assert.assertEquals(new TimestampSpec(null, null, null), dataSchema.getTimestampSpec());
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensions()),
new HashSet<>(dataSchema.getDimensionsSpec().getDimensions())
);
// metrics
final List<AggregatorFactory> expectedAggregators = expectedMetricsSpec
.stream()
.map(AggregatorFactory::getCombiningFactory)
.collect(Collectors.toList());
Assert.assertEquals(expectedAggregators, Arrays.asList(dataSchema.getAggregators()));
Assert.assertEquals(
new UniformGranularitySpec(
expectedSegmentGranularity,
Granularities.NONE,
false,
Collections.singletonList(expectedSegmentIntervals.get(i))
),
dataSchema.getGranularitySpec()
);
// assert ioConfig
final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting());
final InputSource inputSource = ioConfig.getInputSource();
Assert.assertTrue(inputSource instanceof DruidInputSource);
final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
Assert.assertEquals(DATA_SOURCE, druidInputSource.getDataSource());
Assert.assertEquals(expectedSegmentIntervals.get(i), druidInputSource.getInterval());
Assert.assertNull(druidInputSource.getDimFilter());
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
new HashSet<>(druidInputSource.getDimensions())
);
// assert tuningConfig
Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig());
}
}
private static class TestCoordinatorClient extends CoordinatorClient
{
private final Map<DataSegment, File> segmentMap;
TestCoordinatorClient(Map<DataSegment, File> segmentMap)
{
super(null, null);
this.segmentMap = segmentMap;
}
@Override
public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(
String dataSource,
List<Interval> intervals
)
{
return ImmutableSet.copyOf(segmentMap.keySet());
}
}
private static class TestTaskToolbox extends TaskToolbox
{
private final Map<DataSegment, File> segmentFileMap;
TestTaskToolbox(
TaskActionClient taskActionClient,
IndexIO indexIO,
Map<DataSegment, File> segmentFileMap
)
{
super(
null,
null,
taskActionClient,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
null,
null,
indexIO,
null,
null,
null,
new IndexMergerV9(OBJECT_MAPPER, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()),
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
TEST_UTILS.getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
INDEXING_SERVICE_CLIENT,
COORDINATOR_CLIENT,
null,
null
);
this.segmentFileMap = segmentFileMap;
}
@Override
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
{
final Map<DataSegment, File> submap = Maps.newHashMapWithExpectedSize(segments.size());
for (DataSegment segment : segments) {
final File file = Preconditions.checkNotNull(segmentFileMap.get(segment));
submap.put(segment, file);
}
return submap;
}
}
private static class TestTaskActionClient implements TaskActionClient
{
private final List<DataSegment> segments;
TestTaskActionClient(List<DataSegment> segments)
{
this.segments = segments;
}
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
if (!(taskAction instanceof RetrieveUsedSegmentsAction)) {
throw new ISE("action[%s] is not supported", taskAction);
}
return (RetType) segments;
}
}
private static class TestIndexIO extends IndexIO
{
private final Map<File, QueryableIndex> queryableIndexMap;
TestIndexIO(
ObjectMapper mapper,
Map<DataSegment, File> segmentFileMap
)
{
super(mapper, () -> 0);
queryableIndexMap = Maps.newHashMapWithExpectedSize(segmentFileMap.size());
for (Entry<DataSegment, File> entry : segmentFileMap.entrySet()) {
final DataSegment segment = entry.getKey();
final List<String> columnNames = new ArrayList<>(segment.getDimensions().size() + segment.getMetrics().size());
columnNames.add(ColumnHolder.TIME_COLUMN_NAME);
columnNames.addAll(segment.getDimensions());
columnNames.addAll(segment.getMetrics());
final Map<String, Supplier<ColumnHolder>> columnMap = Maps.newHashMapWithExpectedSize(columnNames.size());
final List<AggregatorFactory> aggregatorFactories = new ArrayList<>(segment.getMetrics().size());
for (String columnName : columnNames) {
if (MIXED_TYPE_COLUMN.equals(columnName)) {
ColumnHolder columnHolder = createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()));
columnMap.put(columnName, () -> columnHolder);
} else if (DIMENSIONS.containsKey(columnName)) {
ColumnHolder columnHolder = createColumn(DIMENSIONS.get(columnName));
columnMap.put(columnName, () -> columnHolder);
} else {
final Optional<AggregatorFactory> maybeMetric = AGGREGATORS.stream()
.filter(agg -> agg.getName().equals(columnName))
.findAny();
if (maybeMetric.isPresent()) {
ColumnHolder columnHolder = createColumn(maybeMetric.get());
columnMap.put(columnName, () -> columnHolder);
aggregatorFactories.add(maybeMetric.get());
}
}
}
final Metadata metadata = new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
);
queryableIndexMap.put(
entry.getValue(),
new SimpleQueryableIndex(
segment.getInterval(),
new ListIndexed<>(segment.getDimensions()),
null,
columnMap,
null,
metadata,
false
)
);
}
}
@Override
public QueryableIndex loadIndex(File file)
{
return queryableIndexMap.get(file);
}
void removeMetadata(File file)
{
final SimpleQueryableIndex index = (SimpleQueryableIndex) queryableIndexMap.get(file);
if (index != null) {
queryableIndexMap.put(
file,
new SimpleQueryableIndex(
index.getDataInterval(),
index.getColumnNames(),
index.getAvailableDimensions(),
index.getBitmapFactoryForDimensions(),
index.getColumns(),
index.getFileMapper(),
null,
() -> index.getDimensionHandlers()
)
);
}
}
Map<File, QueryableIndex> getQueryableIndexMap()
{
return queryableIndexMap;
}
}
private static ColumnHolder createColumn(DimensionSchema dimensionSchema)
{
return new TestColumn(dimensionSchema.getValueType());
}
private static ColumnHolder createColumn(AggregatorFactory aggregatorFactory)
{
return new TestColumn(aggregatorFactory.getType());
}
private static class TestColumn implements ColumnHolder
{
private final ColumnCapabilities columnCapabilities;
TestColumn(ValueType type)
{
columnCapabilities = new ColumnCapabilitiesImpl()
.setType(type)
.setDictionaryEncoded(type == ValueType.STRING) // set a fake value to make string columns
.setHasBitmapIndexes(type == ValueType.STRING)
.setHasSpatialIndexes(false)
.setHasMultipleValues(false);
}
@Override
public ColumnCapabilities getCapabilities()
{
return columnCapabilities;
}
@Override
public int getLength()
{
return NUM_ROWS_PER_SEGMENT;
}
@Override
public BaseColumn getColumn()
{
return null;
}
@Override
public SettableColumnValueSelector makeNewSettableColumnValueSelector()
{
return null;
}
@Override
public BitmapIndex getBitmapIndex()
{
return null;
}
@Override
public SpatialIndex getSpatialIndex()
{
return null;
}
}
/**
* The compaction task spec in 0.16.0 except for the tuningConfig.
* The original spec accepts only {@link IndexTuningConfig}, but this class acceps any type of tuningConfig for
* testing.
*/
private static class OldCompactionTaskWithAnyTuningConfigType extends AbstractTask
{
private final Interval interval;
private final List<DataSegment> segments;
@Nullable
private final DimensionsSpec dimensionsSpec;
@Nullable
private final AggregatorFactory[] metricsSpec;
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final Long targetCompactionSizeBytes;
@Nullable
private final TuningConfig tuningConfig;
@JsonCreator
public OldCompactionTaskWithAnyTuningConfigType(
@JsonProperty("id") final String id,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") @Nullable final Interval interval,
@JsonProperty("segments") @Nullable final List<DataSegment> segments,
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
@JsonProperty("targetCompactionSizeBytes") @Nullable final Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@JacksonInject RetryPolicyFactory retryPolicyFactory,
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
super(getOrMakeId(id, "compact", dataSource), null, taskResource, dataSource, context);
this.interval = interval;
this.segments = segments;
this.dimensionsSpec = dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
}
@Override
public String getType()
{
return "compact";
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
@Nullable
public DimensionsSpec getDimensionsSpec()
{
return dimensionsSpec;
}
@JsonProperty
@Nullable
public AggregatorFactory[] getMetricsSpec()
{
return metricsSpec;
}
@JsonProperty
@Nullable
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
@Nullable
@JsonProperty
public Long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}
@Nullable
@JsonProperty
public TuningConfig getTuningConfig()
{
return tuningConfig;
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
throw new UnsupportedOperationException();
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
throw new UnsupportedOperationException();
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
throw new UnsupportedOperationException();
}
}
}