blob: a40bd69ee55c6d46fb7237629da7e618912e8f66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.metadata;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.MappedSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.BySegmentSkippingQueryRunner;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.timeline.LogicalSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BinaryOperator;
public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
{
private static final TypeReference<SegmentAnalysis> TYPE_REFERENCE = new TypeReference<SegmentAnalysis>()
{
};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
private static final byte SEGMENT_METADATA_QUERY = 0x16;
private static final Function<SegmentAnalysis, SegmentAnalysis> MERGE_TRANSFORM_FN = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return finalizeAnalysis(analysis);
}
};
private final SegmentMetadataQueryConfig config;
private final GenericQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting
public SegmentMetadataQueryQueryToolChest(SegmentMetadataQueryConfig config)
{
this(config, DefaultGenericQueryMetricsFactory.instance());
}
@Inject
public SegmentMetadataQueryQueryToolChest(
SegmentMetadataQueryConfig config,
GenericQueryMetricsFactory queryMetricsFactory
)
{
this.config = config;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
{
return new BySegmentSkippingQueryRunner<SegmentAnalysis>(runner)
{
@Override
public Sequence<SegmentAnalysis> doRun(
QueryRunner<SegmentAnalysis> baseRunner,
QueryPlus<SegmentAnalysis> queryPlus,
ResponseContext context
)
{
SegmentMetadataQuery updatedQuery = ((SegmentMetadataQuery) queryPlus.getQuery()).withFinalizedAnalysisTypes(config);
QueryPlus<SegmentAnalysis> updatedQueryPlus = queryPlus.withQuery(updatedQuery);
return new MappedSequence<>(
CombiningSequence.create(
baseRunner.run(updatedQueryPlus, context),
makeOrdering(updatedQuery),
createMergeFn(updatedQuery)
),
MERGE_TRANSFORM_FN::apply
);
}
private Ordering<SegmentAnalysis> makeOrdering(SegmentMetadataQuery query)
{
return (Ordering<SegmentAnalysis>) SegmentMetadataQueryQueryToolChest.this.createResultComparator(query);
}
private BinaryOperator<SegmentAnalysis> createMergeFn(final SegmentMetadataQuery inQ)
{
return SegmentMetadataQueryQueryToolChest.this.createMergeFn(inQ);
}
};
}
@Override
public BinaryOperator<SegmentAnalysis> createMergeFn(Query<SegmentAnalysis> query)
{
return (arg1, arg2) -> mergeAnalyses(arg1, arg2, ((SegmentMetadataQuery) query).isLenientAggregatorMerge());
}
@Override
public Comparator<SegmentAnalysis> createResultComparator(Query<SegmentAnalysis> query)
{
SegmentMetadataQuery segmentMetadataQuery = (SegmentMetadataQuery) query;
if (segmentMetadataQuery.isMerge()) {
// Merge everything always
return Comparators.alwaysEqual();
}
return segmentMetadataQuery.getResultOrdering(); // No two elements should be equal, so it should never merge
}
@Override
public QueryMetrics<Query<?>> makeMetrics(SegmentMetadataQuery query)
{
return queryMetricsFactory.makeMetrics(query);
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> makePreComputeManipulatorFn(
SegmentMetadataQuery query,
MetricManipulationFn fn
)
{
return Functions.identity();
}
@Override
public TypeReference<SegmentAnalysis> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> getCacheStrategy(final SegmentMetadataQuery query)
{
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(SegmentMetadataQuery query)
{
SegmentMetadataQuery updatedQuery = query.withFinalizedAnalysisTypes(config);
byte[] includerBytes = updatedQuery.getToInclude().getCacheKey();
byte[] analysisTypesBytes = updatedQuery.getAnalysisTypesCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length + analysisTypesBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.put(analysisTypesBytes)
.array();
}
@Override
public byte[] computeResultLevelCacheKey(SegmentMetadataQuery query)
{
// need to include query "merge" and "lenientAggregatorMerge" for result level cache key
return new CacheKeyBuilder(SEGMENT_METADATA_QUERY).appendByteArray(computeCacheKey(query))
.appendBoolean(query.isMerge())
.appendBoolean(query.isLenientAggregatorMerge())
.build();
}
@Override
public TypeReference<SegmentAnalysis> getCacheObjectClazz()
{
return getResultTypeReference();
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> prepareForCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
{
return input;
}
};
}
@Override
public Function<SegmentAnalysis, SegmentAnalysis> pullFromCache(boolean isResultLevelCache)
{
return new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(@Nullable SegmentAnalysis input)
{
return input;
}
};
}
};
}
@Override
public <T extends LogicalSegment> List<T> filterSegments(SegmentMetadataQuery query, List<T> segments)
{
if (!query.isUsingDefaultInterval()) {
return segments;
}
if (segments.size() <= 1) {
return segments;
}
final T max = segments.get(segments.size() - 1);
DateTime targetEnd = max.getInterval().getEnd();
final Interval targetInterval = new Interval(config.getDefaultHistory(), targetEnd);
return Lists.newArrayList(
Iterables.filter(
segments,
new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return (input.getInterval().overlaps(targetInterval));
}
}
)
);
}
@VisibleForTesting
public static SegmentAnalysis mergeAnalyses(
final SegmentAnalysis arg1,
final SegmentAnalysis arg2,
boolean lenientAggregatorMerge
)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
List<Interval> newIntervals = null;
if (arg1.getIntervals() != null) {
newIntervals = new ArrayList<>(arg1.getIntervals());
}
if (arg2.getIntervals() != null) {
if (newIntervals == null) {
newIntervals = new ArrayList<>();
}
newIntervals.addAll(arg2.getIntervals());
}
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
Map<String, ColumnAnalysis> columns = new TreeMap<>();
Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet());
for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) {
final String columnName = entry.getKey();
columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName)));
rightColumnNames.remove(columnName);
}
for (String columnName : rightColumnNames) {
columns.put(columnName, rightColumns.get(columnName));
}
final Map<String, AggregatorFactory> aggregators = new HashMap<>();
if (lenientAggregatorMerge) {
// Merge each aggregator individually, ignoring nulls
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
final String aggregatorName = entry.getKey();
final AggregatorFactory aggregator = entry.getValue();
AggregatorFactory merged = aggregators.get(aggregatorName);
if (merged != null) {
try {
merged = merged.getMergingFactory(aggregator);
}
catch (AggregatorFactoryNotMergeableException e) {
merged = null;
}
} else {
merged = aggregator;
}
aggregators.put(aggregatorName, merged);
}
}
}
} else {
final AggregatorFactory[] aggs1 = arg1.getAggregators() != null
? arg1.getAggregators()
.values()
.toArray(new AggregatorFactory[0])
: null;
final AggregatorFactory[] aggs2 = arg2.getAggregators() != null
? arg2.getAggregators()
.values()
.toArray(new AggregatorFactory[0])
: null;
final AggregatorFactory[] merged = AggregatorFactory.mergeAggregators(Arrays.asList(aggs1, aggs2));
if (merged != null) {
for (AggregatorFactory aggregator : merged) {
aggregators.put(aggregator.getName(), aggregator);
}
}
}
final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec(
Lists.newArrayList(
arg1.getTimestampSpec(),
arg2.getTimestampSpec()
)
);
final Granularity queryGranularity = Granularity.mergeGranularities(
Lists.newArrayList(
arg1.getQueryGranularity(),
arg2.getQueryGranularity()
)
);
final String mergedId;
if (arg1.getId() != null && arg2.getId() != null && arg1.getId().equals(arg2.getId())) {
mergedId = arg1.getId();
} else {
mergedId = "merged";
}
final Boolean rollup;
if (arg1.isRollup() != null && arg2.isRollup() != null && arg1.isRollup().equals(arg2.isRollup())) {
rollup = arg1.isRollup();
} else {
rollup = null;
}
return new SegmentAnalysis(
mergedId,
newIntervals,
columns,
arg1.getSize() + arg2.getSize(),
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators,
timestampSpec,
queryGranularity,
rollup
);
}
@VisibleForTesting
public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows(),
analysis.getAggregators(),
analysis.getTimestampSpec(),
analysis.getQueryGranularity(),
analysis.isRollup()
);
}
public SegmentMetadataQueryConfig getConfig()
{
return this.config;
}
}