blob: 741b8f6dd978311db242d998f853b6a717926f7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.MappedSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SubqueryQueryRunner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
import org.apache.druid.query.groupby.strategy.GroupByStrategy;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.BinaryOperator;
/**
*
*/
public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupByQuery>
{
private static final byte GROUPBY_QUERY = 0x14;
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
};
private static final TypeReference<ResultRow> TYPE_REFERENCE = new TypeReference<ResultRow>()
{
};
public static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private final GroupByStrategySelector strategySelector;
private final GroupByQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting
public GroupByQueryQueryToolChest(GroupByStrategySelector strategySelector)
{
this(strategySelector, DefaultGroupByQueryMetricsFactory.instance());
}
@Inject
public GroupByQueryQueryToolChest(
GroupByStrategySelector strategySelector,
GroupByQueryMetricsFactory queryMetricsFactory
)
{
this.strategySelector = strategySelector;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
public QueryRunner<ResultRow> mergeResults(final QueryRunner<ResultRow> runner)
{
return (queryPlus, responseContext) -> {
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
return runner.run(queryPlus, responseContext);
}
final GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery();
if (strategySelector.strategize(groupByQuery).doMergeResults(groupByQuery)) {
return initAndMergeGroupByResults(groupByQuery, runner, responseContext);
}
return runner.run(queryPlus, responseContext);
};
}
@Override
public BinaryOperator<ResultRow> createMergeFn(Query<ResultRow> query)
{
return strategySelector.strategize((GroupByQuery) query).createMergeFn(query);
}
@Override
public Comparator<ResultRow> createResultComparator(Query<ResultRow> query)
{
return strategySelector.strategize((GroupByQuery) query).createResultComparator(query);
}
private Sequence<ResultRow> initAndMergeGroupByResults(
final GroupByQuery query,
QueryRunner<ResultRow> runner,
ResponseContext context
)
{
final GroupByStrategy groupByStrategy = strategySelector.strategize(query);
final GroupByQueryResource resource = groupByStrategy.prepareResource(query);
try {
final Sequence<ResultRow> mergedSequence = mergeGroupByResults(
groupByStrategy,
query,
resource,
runner,
context
);
return Sequences.withBaggage(mergedSequence, resource);
}
catch (Exception e) {
// Error creating the Sequence; release resources.
resource.close();
throw e;
}
}
private Sequence<ResultRow> mergeGroupByResults(
GroupByStrategy groupByStrategy,
final GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<ResultRow> runner,
ResponseContext context
)
{
if (isNestedQueryPushDown(query, groupByStrategy)) {
return mergeResultsWithNestedQueryPushDown(groupByStrategy, query, resource, runner, context);
}
return mergeGroupByResultsWithoutPushDown(groupByStrategy, query, resource, runner, context);
}
private Sequence<ResultRow> mergeGroupByResultsWithoutPushDown(
GroupByStrategy groupByStrategy,
GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<ResultRow> runner,
ResponseContext context
)
{
// If there's a subquery, merge subquery results and then apply the aggregator
final DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
final GroupByQuery subquery;
try {
// Inject outer query context keys into subquery if they don't already exist in the subquery context.
// Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win.
final Map<String, Object> subqueryContext = new TreeMap<>();
if (query.getContext() != null) {
for (Map.Entry<String, Object> entry : query.getContext().entrySet()) {
if (entry.getValue() != null) {
subqueryContext.put(entry.getKey(), entry.getValue());
}
}
}
if (((QueryDataSource) dataSource).getQuery().getContext() != null) {
subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext());
}
subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(subqueryContext);
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}
final Sequence<ResultRow> subqueryResult = mergeGroupByResults(
groupByStrategy,
subquery.withOverriddenContext(
ImmutableMap.of(
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user. (note this is only respected by groupBy v1)
GroupByQueryHelper.CTX_KEY_SORT_RESULTS,
false
)
),
resource,
runner,
context
);
final Sequence<ResultRow> finalizingResults = finalizeSubqueryResults(subqueryResult, subquery);
if (query.getSubtotalsSpec() != null) {
return groupByStrategy.processSubtotalsSpec(
query,
resource,
groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults, false)
);
} else {
return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult(
subquery,
query,
resource,
finalizingResults,
false
), query);
}
} else {
if (query.getSubtotalsSpec() != null) {
return groupByStrategy.processSubtotalsSpec(
query,
resource,
groupByStrategy.mergeResults(runner, query.withSubtotalsSpec(null), context)
);
} else {
return groupByStrategy.applyPostProcessing(groupByStrategy.mergeResults(runner, query, context), query);
}
}
}
private Sequence<ResultRow> mergeResultsWithNestedQueryPushDown(
GroupByStrategy groupByStrategy,
GroupByQuery query,
GroupByQueryResource resource,
QueryRunner<ResultRow> runner,
ResponseContext context
)
{
Sequence<ResultRow> pushDownQueryResults = groupByStrategy.mergeResults(runner, query, context);
final Sequence<ResultRow> finalizedResults = finalizeSubqueryResults(pushDownQueryResults, query);
GroupByQuery rewrittenQuery = rewriteNestedQueryForPushDown(query);
return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult(
query,
rewrittenQuery,
resource,
finalizedResults,
true
), query);
}
/**
* Rewrite the aggregator and dimension specs since the push down nested query will return
* results with dimension and aggregation specs of the original nested query.
*/
@VisibleForTesting
GroupByQuery rewriteNestedQueryForPushDown(GroupByQuery query)
{
return query.withAggregatorSpecs(Lists.transform(query.getAggregatorSpecs(), (agg) -> agg.getCombiningFactory()))
.withDimensionSpecs(Lists.transform(
query.getDimensions(),
(dim) -> new DefaultDimensionSpec(
dim.getOutputName(),
dim.getOutputName(),
dim.getOutputType()
)
));
}
private Sequence<ResultRow> finalizeSubqueryResults(Sequence<ResultRow> subqueryResult, GroupByQuery subquery)
{
final Sequence<ResultRow> finalizingResults;
if (QueryContexts.isFinalize(subquery, false)) {
finalizingResults = new MappedSequence<>(
subqueryResult,
makePreComputeManipulatorFn(
subquery,
MetricManipulatorFns.finalizing()
)::apply
);
} else {
finalizingResults = subqueryResult;
}
return finalizingResults;
}
public static boolean isNestedQueryPushDown(GroupByQuery q, GroupByStrategy strategy)
{
return q.getDataSource() instanceof QueryDataSource
&& q.getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false)
&& q.getSubtotalsSpec() == null
&& strategy.supportsNestedQueryPushDown();
}
@Override
public GroupByQueryMetrics makeMetrics(GroupByQuery query)
{
GroupByQueryMetrics queryMetrics = queryMetricsFactory.makeMetrics();
queryMetrics.query(query);
return queryMetrics;
}
@Override
public Function<ResultRow, ResultRow> makePreComputeManipulatorFn(
final GroupByQuery query,
final MetricManipulationFn fn
)
{
if (MetricManipulatorFns.identity().equals(fn)) {
return Functions.identity();
}
return row -> {
final ResultRow newRow = row.copy();
final List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
final int aggregatorStart = query.getResultRowAggregatorStart();
for (int i = 0; i < aggregatorSpecs.size(); i++) {
AggregatorFactory agg = aggregatorSpecs.get(i);
newRow.set(aggregatorStart + i, fn.manipulate(agg, row.get(aggregatorStart + i)));
}
return newRow;
};
}
@Override
public Function<ResultRow, ResultRow> makePostComputeManipulatorFn(
final GroupByQuery query,
final MetricManipulationFn fn
)
{
final BitSet optimizedDims = extractionsToRewrite(query);
final Function<ResultRow, ResultRow> preCompute = makePreComputeManipulatorFn(query, fn);
if (optimizedDims.isEmpty()) {
return preCompute;
}
// If we have optimizations that can be done at this level, we apply them here
final List<DimensionSpec> dimensions = query.getDimensions();
final List<ExtractionFn> extractionFns = new ArrayList<>(dimensions.size());
for (int i = 0; i < dimensions.size(); i++) {
final DimensionSpec dimensionSpec = dimensions.get(i);
final ExtractionFn extractionFnToAdd;
if (optimizedDims.get(i)) {
extractionFnToAdd = dimensionSpec.getExtractionFn();
} else {
extractionFnToAdd = null;
}
extractionFns.add(extractionFnToAdd);
}
final int dimensionStart = query.getResultRowDimensionStart();
return row -> {
// preCompute.apply(row) will either return the original row, or create a copy.
ResultRow newRow = preCompute.apply(row);
//noinspection ObjectEquality (if preCompute made a copy, no need to make another copy)
if (newRow == row) {
newRow = row.copy();
}
for (int i = optimizedDims.nextSetBit(0); i >= 0; i = optimizedDims.nextSetBit(i + 1)) {
newRow.set(
dimensionStart + i,
extractionFns.get(i).apply(newRow.get(dimensionStart + i))
);
}
return newRow;
};
}
@Override
public TypeReference<ResultRow> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final GroupByQuery query)
{
final boolean resultAsArray = query.getContextBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false);
// Serializer that writes array- or map-based rows as appropriate, based on the "resultAsArray" setting.
final JsonSerializer<ResultRow> serializer = new JsonSerializer<ResultRow>()
{
@Override
public void serialize(
final ResultRow resultRow,
final JsonGenerator jg,
final SerializerProvider serializers
) throws IOException
{
if (resultAsArray) {
jg.writeObject(resultRow.getArray());
} else {
jg.writeObject(resultRow.toMapBasedRow(query));
}
}
};
// Deserializer that can deserialize either array- or map-based rows.
final JsonDeserializer<ResultRow> deserializer = new JsonDeserializer<ResultRow>()
{
@Override
public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException
{
if (jp.isExpectedStartObjectToken()) {
final Row row = jp.readValueAs(Row.class);
return ResultRow.fromLegacyRow(row, query);
} else {
return ResultRow.of(jp.readValueAs(Object[].class));
}
}
};
class GroupByResultRowModule extends SimpleModule
{
private GroupByResultRowModule()
{
addSerializer(ResultRow.class, serializer);
addDeserializer(ResultRow.class, deserializer);
}
}
final ObjectMapper newObjectMapper = objectMapper.copy();
newObjectMapper.registerModule(new GroupByResultRowModule());
return newObjectMapper;
}
@Override
public QueryRunner<ResultRow> preMergeQueryDecoration(final QueryRunner<ResultRow> runner)
{
return new SubqueryQueryRunner<>(
new QueryRunner<ResultRow>()
{
@Override
public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext responseContext)
{
GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery();
if (groupByQuery.getDimFilter() != null) {
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());
}
final GroupByQuery delegateGroupByQuery = groupByQuery;
final List<DimensionSpec> dimensionSpecs = new ArrayList<>();
final BitSet optimizedDimensions = extractionsToRewrite(delegateGroupByQuery);
final List<DimensionSpec> dimensions = delegateGroupByQuery.getDimensions();
for (int i = 0; i < dimensions.size(); i++) {
final DimensionSpec dimensionSpec = dimensions.get(i);
if (optimizedDimensions.get(i)) {
dimensionSpecs.add(
new DefaultDimensionSpec(dimensionSpec.getDimension(), dimensionSpec.getOutputName())
);
} else {
dimensionSpecs.add(dimensionSpec);
}
}
return runner.run(
queryPlus.withQuery(delegateGroupByQuery.withDimensionSpecs(dimensionSpecs)),
responseContext
);
}
}
);
}
@Override
public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(final GroupByQuery query)
{
return new CacheStrategy<ResultRow, Object, GroupByQuery>()
{
private static final byte CACHE_STRATEGY_VERSION = 0x1;
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<DimensionSpec> dims = query.getDimensions();
@Override
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners)
{
return strategySelector.strategize(query).isCacheable(willMergeRunners);
}
@Override
public byte[] computeCacheKey(GroupByQuery query)
{
return new CacheKeyBuilder(GROUPBY_QUERY)
.appendByte(CACHE_STRATEGY_VERSION)
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimFilter())
.appendCacheables(query.getAggregatorSpecs())
.appendCacheables(query.getDimensions())
.appendCacheable(query.getVirtualColumns())
.build();
}
@Override
public byte[] computeResultLevelCacheKey(GroupByQuery query)
{
final CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY)
.appendByte(CACHE_STRATEGY_VERSION)
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimFilter())
.appendCacheables(query.getAggregatorSpecs())
.appendCacheables(query.getDimensions())
.appendCacheable(query.getVirtualColumns())
.appendCacheable(query.getHavingSpec())
.appendCacheable(query.getLimitSpec())
.appendCacheables(query.getPostAggregatorSpecs());
if (query.getSubtotalsSpec() != null && !query.getSubtotalsSpec().isEmpty()) {
for (List<String> subTotalSpec : query.getSubtotalsSpec()) {
builder.appendStrings(subTotalSpec);
}
}
return builder.build();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<ResultRow, Object> prepareForCache(boolean isResultLevelCache)
{
final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();
return new Function<ResultRow, Object>()
{
@Override
public Object apply(ResultRow resultRow)
{
final List<Object> retVal = new ArrayList<>(1 + dims.size() + aggs.size());
int inPos = 0;
if (resultRowHasTimestamp) {
retVal.add(resultRow.getLong(inPos++));
} else {
retVal.add(query.getUniversalTimestamp().getMillis());
}
for (int i = 0; i < dims.size(); i++) {
retVal.add(resultRow.get(inPos++));
}
for (int i = 0; i < aggs.size(); i++) {
retVal.add(resultRow.get(inPos++));
}
if (isResultLevelCache) {
for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++) {
retVal.add(resultRow.get(inPos++));
}
}
return retVal;
}
};
}
@Override
public Function<Object, ResultRow> pullFromCache(boolean isResultLevelCache)
{
final boolean resultRowHasTimestamp = query.getResultRowHasTimestamp();
final int dimensionStart = query.getResultRowDimensionStart();
final int aggregatorStart = query.getResultRowAggregatorStart();
final int postAggregatorStart = query.getResultRowPostAggregatorStart();
return new Function<Object, ResultRow>()
{
private final Granularity granularity = query.getGranularity();
@Override
public ResultRow apply(Object input)
{
Iterator<Object> results = ((List<Object>) input).iterator();
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
final int size = isResultLevelCache
? query.getResultRowSizeWithPostAggregators()
: query.getResultRowSizeWithoutPostAggregators();
final ResultRow resultRow = ResultRow.create(size);
if (resultRowHasTimestamp) {
resultRow.set(0, timestamp.getMillis());
}
final Iterator<DimensionSpec> dimsIter = dims.iterator();
int dimPos = 0;
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec dimensionSpec = dimsIter.next();
// Must convert generic Jackson-deserialized type into the proper type.
resultRow.set(
dimensionStart + dimPos,
DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType())
);
dimPos++;
}
CacheStrategy.fetchAggregatorsFromCache(
aggs,
results,
isResultLevelCache,
(aggName, aggPosition, aggValueObject) -> {
resultRow.set(aggregatorStart + aggPosition, aggValueObject);
}
);
if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
int postPos = 0;
while (postItr.hasNext() && results.hasNext()) {
resultRow.set(postAggregatorStart + postPos, results.next());
}
}
if (dimsIter.hasNext() || results.hasNext()) {
throw new ISE(
"Found left over objects while reading from cache!! dimsIter[%s] results[%s]",
dimsIter.hasNext(),
results.hasNext()
);
}
return resultRow;
}
};
}
};
}
@Override
public boolean canPerformSubquery(Query<?> subquery)
{
Query<?> current = subquery;
while (current != null) {
if (!(current instanceof GroupByQuery)) {
return false;
}
if (current.getDataSource() instanceof QueryDataSource) {
current = ((QueryDataSource) current.getDataSource()).getQuery();
} else {
current = null;
}
}
return true;
}
@Override
public RowSignature resultArraySignature(GroupByQuery query)
{
return query.getResultRowSignature();
}
@Override
public Sequence<Object[]> resultsAsArrays(final GroupByQuery query, final Sequence<ResultRow> resultSequence)
{
return resultSequence.map(ResultRow::getArray);
}
/**
* This function checks the query for dimensions which can be optimized by applying the dimension extraction
* as the final step of the query instead of on every event.
*
* @param query The query to check for optimizations
*
* @return The set of dimensions (as offsets into {@code query.getDimensions()}) which can be extracted at the last
* second upon query completion.
*/
private static BitSet extractionsToRewrite(GroupByQuery query)
{
final BitSet retVal = new BitSet();
final List<DimensionSpec> dimensions = query.getDimensions();
for (int i = 0; i < dimensions.size(); i++) {
final DimensionSpec dimensionSpec = dimensions.get(i);
if (dimensionSpec.getExtractionFn() != null
&& ExtractionFn.ExtractionType.ONE_TO_ONE.equals(dimensionSpec.getExtractionFn().getExtractionType())) {
retVal.set(i);
}
}
return retVal;
}
}