blob: c2f712320b910342c897e9d0d63b6e108b6f9cea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Helper for creating objects for testing parallel indexing.
*/
class ParallelIndexTestingFactory
{
static final String AUTOMATIC_ID = null;
static final String ID = "id";
static final String GROUP_ID = "group-id";
static final TaskResource TASK_RESOURCE = null;
static final String SUPERVISOR_TASK_ID = "supervisor-task-id";
static final int NUM_ATTEMPTS = 1;
static final Map<String, Object> CONTEXT = Collections.emptyMap();
static final IndexingServiceClient INDEXING_SERVICE_CLIENT = TestUtils.INDEXING_SERVICE_CLIENT;
static final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> TASK_CLIENT_FACTORY =
TestUtils.TASK_CLIENT_FACTORY;
static final AppenderatorsManager APPENDERATORS_MANAGER = TestUtils.APPENDERATORS_MANAGER;
static final ShuffleClient SHUFFLE_CLIENT = new ShuffleClient()
{
@Override
public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
File partitionDir,
String supervisorTaskId,
P location
)
{
return null;
}
};
static final List<Interval> INPUT_INTERVALS = Collections.singletonList(Intervals.ETERNITY);
static final String TASK_EXECUTOR_HOST = "task-executor-host";
static final int TASK_EXECUTOR_PORT = 1;
static final boolean USE_HTTPS = true;
static final Interval INTERVAL = Intervals.ETERNITY;
static final int NUM_ROWS = 2;
static final long SIZE_BYTES = 3;
static final int PARTITION_ID = 4;
static final String HOST = "host";
static final int PORT = 1;
static final String SUBTASK_ID = "subtask-id";
private static final TestUtils TEST_UTILS = new TestUtils();
private static final ObjectMapper NESTED_OBJECT_MAPPER = TEST_UTILS.getTestObjectMapper();
private static final String SCHEMA_TIME = "time";
private static final String SCHEMA_DIMENSION = "dim";
private static final String DATASOURCE = "datasource";
static final BuildingHashBasedNumberedShardSpec HASH_BASED_NUMBERED_SHARD_SPEC = new BuildingHashBasedNumberedShardSpec(
PARTITION_ID,
PARTITION_ID,
PARTITION_ID + 1,
Collections.singletonList("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
ParallelIndexTestingFactory.NESTED_OBJECT_MAPPER
);
static ObjectMapper createObjectMapper()
{
return TEST_UTILS.getTestObjectMapper();
}
static IndexIO getIndexIO()
{
return TEST_UTILS.getTestIndexIO();
}
@SuppressWarnings("SameParameterValue")
static class TuningConfigBuilder
{
private PartitionsSpec partitionsSpec =
new HashedPartitionsSpec(null, 2, null);
private boolean forceGuaranteedRollup = true;
private boolean logParseExceptions = false;
private int maxParseExceptions = Integer.MAX_VALUE;
TuningConfigBuilder partitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
return this;
}
TuningConfigBuilder forceGuaranteedRollup(boolean forceGuaranteedRollup)
{
this.forceGuaranteedRollup = forceGuaranteedRollup;
return this;
}
TuningConfigBuilder logParseExceptions(boolean logParseExceptions)
{
this.logParseExceptions = logParseExceptions;
return this;
}
TuningConfigBuilder maxParseExceptions(int maxParseExceptions)
{
this.maxParseExceptions = maxParseExceptions;
return this;
}
ParallelIndexTuningConfig build()
{
return new ParallelIndexTuningConfig(
1,
null,
null,
3,
4L,
5L,
6,
null,
partitionsSpec,
null,
null,
10,
forceGuaranteedRollup,
false,
14L,
null,
null,
16,
17,
18L,
Duration.ZERO,
20,
21,
22,
logParseExceptions,
maxParseExceptions,
25
);
}
}
static DataSchema createDataSchema(List<Interval> granularitySpecInputIntervals)
{
GranularitySpec granularitySpec = new ArbitraryGranularitySpec(Granularities.DAY, granularitySpecInputIntervals);
TimestampSpec timestampSpec = new TimestampSpec(SCHEMA_TIME, "auto", null);
DimensionsSpec dimensionsSpec = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of(SCHEMA_DIMENSION)),
null,
null
);
return new DataSchema(
DATASOURCE,
timestampSpec,
dimensionsSpec,
new AggregatorFactory[]{},
granularitySpec,
TransformSpec.NONE,
null,
NESTED_OBJECT_MAPPER
);
}
static ParallelIndexIngestionSpec createIngestionSpec(
InputSource inputSource,
InputFormat inputFormat,
ParallelIndexTuningConfig tuningConfig,
DataSchema dataSchema
)
{
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(null, inputSource, inputFormat, false);
return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig);
}
static class SingleDimensionPartitionsSpecBuilder
{
@Nullable
private String partitionDimension = SCHEMA_DIMENSION;
private boolean assumeGrouped = false;
SingleDimensionPartitionsSpecBuilder partitionDimension(@Nullable String partitionDimension)
{
this.partitionDimension = partitionDimension;
return this;
}
SingleDimensionPartitionsSpecBuilder assumeGrouped(boolean assumeGrouped)
{
this.assumeGrouped = assumeGrouped;
return this;
}
SingleDimensionPartitionsSpec build()
{
return new SingleDimensionPartitionsSpec(
1,
null,
partitionDimension,
assumeGrouped
);
}
}
static IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> createTaskClientFactory()
{
return (taskInfoProvider, callerId, numThreads, httpTimeout, numRetries) -> createTaskClient();
}
private static ParallelIndexSupervisorTaskClient createTaskClient()
{
ParallelIndexSupervisorTaskClient taskClient = EasyMock.niceMock(ParallelIndexSupervisorTaskClient.class);
EasyMock.replay(taskClient);
return taskClient;
}
static String createRow(long timestamp, Object dimensionValue)
{
try {
return NESTED_OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(
SCHEMA_TIME, timestamp,
SCHEMA_DIMENSION, dimensionValue
));
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
static InputFormat getInputFormat()
{
return new JsonInputFormat(null, null, null);
}
}