blob: 6ba5ad3f7d5db8fb3fe745bead27e707deb22a39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
{
public static final String TYPE = "partial_dimension_cardinality";
private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String supervisorTaskId;
private final ObjectMapper jsonMapper;
@JsonCreator
PartialDimensionCardinalityTask(
// id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
@JsonProperty("id") @Nullable String id,
@JsonProperty("groupId") final String groupId,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("supervisorTaskId") final String supervisorTaskId,
@JsonProperty("numAttempts") final int numAttempts, // zero-based counting
@JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper
)
{
super(
getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
groupId,
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
context
);
Preconditions.checkArgument(
ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
"%s partitionsSpec required",
HashedPartitionsSpec.NAME
);
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.jsonMapper = jsonMapper;
}
@JsonProperty
private int getNumAttempts()
{
return numAttempts;
}
@JsonProperty("spec")
private ParallelIndexIngestionSpec getIngestionSchema()
{
return ingestionSchema;
}
@JsonProperty
private String getSupervisorTaskId()
{
return supervisorTaskId;
}
@Override
public String getType()
{
return TYPE;
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
taskActionClient,
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
DataSchema dataSchema = ingestionSchema.getDataSchema();
GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
if (partitionDimensions == null) {
partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
}
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser()
);
InputFormat inputFormat = inputSource.needsFormat()
? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
: null;
final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
buildSegmentsMeters,
tuningConfig.isLogParseExceptions(),
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
try (
final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
toolbox.getIndexingTmpDir(),
dataSchema,
inputSource,
inputFormat,
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);
) {
Map<Interval, byte[]> cardinalities = determineCardinalities(
inputRowIterator,
granularitySpec,
partitionDimensions
);
sendReport(
toolbox,
new DimensionCardinalityReport(getId(), cardinalities)
);
}
return TaskStatus.success(getId());
}
private Map<Interval, byte[]> determineCardinalities(
CloseableIterator<InputRow> inputRowIterator,
GranularitySpec granularitySpec,
List<String> partitionDimensions
)
{
Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
while (inputRowIterator.hasNext()) {
InputRow inputRow = inputRowIterator.next();
//noinspection ConstantConditions (null rows are filtered out by FilteringCloseableInputRowIterator
DateTime timestamp = inputRow.getTimestamp();
//noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
Interval interval = granularitySpec.bucketInterval(timestamp).get();
Granularity queryGranularity = granularitySpec.getQueryGranularity();
HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
interval,
(intervalKey) -> {
return DimensionCardinalityReport.createHllSketchForReport();
}
);
List<Object> groupKey = HashBasedNumberedShardSpec.getGroupKey(
partitionDimensions,
queryGranularity.bucketStart(timestamp).getMillis(),
inputRow
);
try {
hllSketch.update(
jsonMapper.writeValueAsBytes(groupKey)
);
}
catch (JsonProcessingException jpe) {
throw new RuntimeException(jpe);
}
}
// Serialize the collectors for sending to the supervisor task
Map<Interval, byte[]> newMap = new HashMap<>();
for (Map.Entry<Interval, HllSketch> entry : intervalToCardinalities.entrySet()) {
newMap.put(entry.getKey(), entry.getValue().toCompactByteArray());
}
return newMap;
}
private void sendReport(TaskToolbox toolbox, DimensionCardinalityReport report)
{
final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientFactory().build(
new ClientBasedTaskInfoProvider(toolbox.getIndexingServiceClient()),
getId(),
1, // always use a single http thread
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
taskClient.report(supervisorTaskId, report);
}
}