blob: e381ffe868148ee2c2210f2fb39a46165390f411 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.ResultGranularTimestampComparator;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
/**
*
*/
public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
{
private static final byte TIMESERIES_QUERY = 0x0;
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
};
private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE =
new TypeReference<Result<TimeseriesResultValue>>()
{
};
private final TimeseriesQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting
public TimeseriesQueryQueryToolChest()
{
this(DefaultTimeseriesQueryMetricsFactory.instance());
}
@Inject
public TimeseriesQueryQueryToolChest(TimeseriesQueryMetricsFactory queryMetricsFactory)
{
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
public QueryRunner<Result<TimeseriesResultValue>> mergeResults(
QueryRunner<Result<TimeseriesResultValue>> queryRunner
)
{
final QueryRunner<Result<TimeseriesResultValue>> resultMergeQueryRunner = new ResultMergeQueryRunner<Result<TimeseriesResultValue>>(
queryRunner,
this::createResultComparator,
this::createMergeFn
)
{
@Override
public Sequence<Result<TimeseriesResultValue>> doRun(
QueryRunner<Result<TimeseriesResultValue>> baseRunner,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
ResponseContext context
)
{
int limit = ((TimeseriesQuery) queryPlus.getQuery()).getLimit();
Sequence<Result<TimeseriesResultValue>> result = super.doRun(
baseRunner,
// Don't do post aggs until makePostComputeManipulatorFn() is called
queryPlus.withQuery(((TimeseriesQuery) queryPlus.getQuery()).withPostAggregatorSpecs(ImmutableList.of())),
context
);
if (limit < Integer.MAX_VALUE) {
return result.limit(limit);
}
return result;
}
};
return (queryPlus, responseContext) -> {
final TimeseriesQuery query = (TimeseriesQuery) queryPlus.getQuery();
final Sequence<Result<TimeseriesResultValue>> baseResults = resultMergeQueryRunner.run(
queryPlus.withQuery(
queryPlus.getQuery()
.withOverriddenContext(
ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)
)
),
responseContext
);
final Sequence<Result<TimeseriesResultValue>> finalSequence;
if (query.getGranularity().equals(Granularities.ALL) && !query.isSkipEmptyBuckets()) {
//Usally it is NOT Okay to materialize results via toList(), but Granularity is ALL thus we have only one record
final List<Result<TimeseriesResultValue>> val = baseResults.toList();
finalSequence = val.isEmpty() ? Sequences.simple(Collections.singletonList(
getNullTimeseriesResultValue(query))) : Sequences.simple(val);
} else {
finalSequence = baseResults;
}
if (query.isGrandTotal()) {
// Accumulate grand totals while iterating the sequence.
final Object[] grandTotals = new Object[query.getAggregatorSpecs().size()];
final Sequence<Result<TimeseriesResultValue>> mappedSequence = Sequences.map(
finalSequence,
resultValue -> {
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
final AggregatorFactory aggregatorFactory = query.getAggregatorSpecs().get(i);
final Object value = resultValue.getValue().getMetric(aggregatorFactory.getName());
if (grandTotals[i] == null) {
grandTotals[i] = value;
} else {
grandTotals[i] = aggregatorFactory.combine(grandTotals[i], value);
}
}
return resultValue;
}
);
return Sequences.concat(
ImmutableList.of(
mappedSequence,
Sequences.simple(
() -> {
final Map<String, Object> totalsMap = new HashMap<>();
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
totalsMap.put(query.getAggregatorSpecs().get(i).getName(), grandTotals[i]);
}
final Result<TimeseriesResultValue> result = new Result<>(
null,
new TimeseriesResultValue(totalsMap)
);
return Collections.singletonList(result).iterator();
}
)
)
);
} else {
return finalSequence;
}
};
}
@Override
public BinaryOperator<Result<TimeseriesResultValue>> createMergeFn(
Query<Result<TimeseriesResultValue>> query
)
{
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query;
return new TimeseriesBinaryFn(timeseriesQuery.getGranularity(), timeseriesQuery.getAggregatorSpecs());
}
@Override
public Comparator<Result<TimeseriesResultValue>> createResultComparator(Query<Result<TimeseriesResultValue>> query)
{
return ResultGranularTimestampComparator.create(query.getGranularity(), query.isDescending());
}
private Result<TimeseriesResultValue> getNullTimeseriesResultValue(TimeseriesQuery query)
{
List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
String[] aggregatorNames = new String[aggregatorSpecs.size()];
for (int i = 0; i < aggregatorSpecs.size(); i++) {
aggregators[i] =
aggregatorSpecs.get(i)
.factorize(
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> new MapBasedRow(null, null),
RowSignature.empty(),
false
)
);
aggregatorNames[i] = aggregatorSpecs.get(i).getName();
}
final DateTime start = query.getIntervals().isEmpty() ? DateTimes.EPOCH : query.getIntervals().get(0).getStart();
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(start);
for (int i = 0; i < aggregatorSpecs.size(); i++) {
bob.addMetric(aggregatorNames[i], aggregators[i].get());
aggregators[i].close();
}
return bob.build();
}
@Override
public TimeseriesQueryMetrics makeMetrics(TimeseriesQuery query)
{
TimeseriesQueryMetrics queryMetrics = queryMetricsFactory.makeMetrics();
queryMetrics.query(query);
return queryMetrics;
}
@Override
public TypeReference<Result<TimeseriesResultValue>> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> getCacheStrategy(final TimeseriesQuery query)
{
return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
@Override
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners)
{
return true;
}
@Override
public byte[] computeCacheKey(TimeseriesQuery query)
{
return new CacheKeyBuilder(TIMESERIES_QUERY)
.appendBoolean(query.isDescending())
.appendBoolean(query.isSkipEmptyBuckets())
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimensionsFilter())
.appendCacheables(query.getAggregatorSpecs())
.appendCacheable(query.getVirtualColumns())
.appendInt(query.getLimit())
.build();
}
@Override
public byte[] computeResultLevelCacheKey(TimeseriesQuery query)
{
final CacheKeyBuilder builder = new CacheKeyBuilder(TIMESERIES_QUERY)
.appendBoolean(query.isDescending())
.appendBoolean(query.isSkipEmptyBuckets())
.appendCacheable(query.getGranularity())
.appendCacheable(query.getDimensionsFilter())
.appendCacheables(query.getAggregatorSpecs())
.appendCacheable(query.getVirtualColumns())
.appendCacheables(query.getPostAggregatorSpecs())
.appendInt(query.getLimit())
.appendBoolean(query.isGrandTotal());
return builder.build();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Result<TimeseriesResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return input -> {
TimeseriesResultValue results = input.getValue();
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + aggs.size());
// Timestamp can be null if grandTotal is true.
if (isResultLevelCache) {
retVal.add(input.getTimestamp() == null ? null : input.getTimestamp().getMillis());
} else {
retVal.add(Preconditions.checkNotNull(input.getTimestamp(), "timestamp of input[%s]", input).getMillis());
}
for (AggregatorFactory agg : aggs) {
retVal.add(results.getMetric(agg.getName()));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(results.getMetric(postAgg.getName()));
}
}
return retVal;
};
}
@Override
public Function<Object, Result<TimeseriesResultValue>> pullFromCache(boolean isResultLevelCache)
{
return new Function<Object, Result<TimeseriesResultValue>>()
{
private final Granularity granularity = query.getGranularity();
@Override
public Result<TimeseriesResultValue> apply(Object input)
{
List<Object> results = (List<Object>) input;
final Map<String, Object> retVal = Maps.newLinkedHashMap();
Iterator<Object> resultIter = results.iterator();
final Number timestampNumber = (Number) resultIter.next();
final DateTime timestamp;
if (isResultLevelCache) {
timestamp = timestampNumber == null ? null : granularity.toDateTime(timestampNumber.longValue());
} else {
timestamp = granularity.toDateTime(Preconditions.checkNotNull(timestampNumber, "timestamp").longValue());
}
CacheStrategy.fetchAggregatorsFromCache(
aggs,
resultIter,
isResultLevelCache,
(aggName, aggPosition, aggValueObject) -> {
retVal.put(aggName, aggValueObject);
}
);
if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
while (postItr.hasNext() && resultIter.hasNext()) {
retVal.put(postItr.next().getName(), resultIter.next());
}
}
return new Result<>(
timestamp,
new TimeseriesResultValue(retVal)
);
}
};
}
};
}
@Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TimeseriesResultValue>> runner)
{
return (queryPlus, responseContext) -> {
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery();
if (timeseriesQuery.getDimensionsFilter() != null) {
timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(timeseriesQuery);
}
return runner.run(queryPlus, responseContext);
};
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePreComputeManipulatorFn(
final TimeseriesQuery query,
final MetricManipulationFn fn
)
{
return makeComputeManipulatorFn(query, fn, false);
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePostComputeManipulatorFn(
TimeseriesQuery query,
MetricManipulationFn fn
)
{
return makeComputeManipulatorFn(query, fn, true);
}
@Override
public RowSignature resultArraySignature(TimeseriesQuery query)
{
return RowSignature.builder()
.addTimeColumn()
.addAggregators(query.getAggregatorSpecs())
.addPostAggregators(query.getPostAggregatorSpecs())
.build();
}
@Override
public Sequence<Object[]> resultsAsArrays(
final TimeseriesQuery query,
final Sequence<Result<TimeseriesResultValue>> resultSequence
)
{
final List<String> fields = resultArraySignature(query).getColumnNames();
return Sequences.map(
resultSequence,
result -> {
final Object[] retVal = new Object[fields.size()];
// Position 0 is always __time.
retVal[0] = result.getTimestamp().getMillis();
// Add other fields.
final Map<String, Object> resultMap = result.getValue().getBaseObject();
for (int i = 1; i < fields.size(); i++) {
retVal[i] = resultMap.get(fields.get(i));
}
return retVal;
}
);
}
private Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makeComputeManipulatorFn(
final TimeseriesQuery query,
final MetricManipulationFn fn,
final boolean calculatePostAggs
)
{
return result -> {
final TimeseriesResultValue holder = result.getValue();
final Map<String, Object> values = new HashMap<>(holder.getBaseObject());
if (calculatePostAggs && !query.getPostAggregatorSpecs().isEmpty()) {
// put non finalized aggregators for calculating dependent post Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), holder.getMetric(agg.getName()));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(values));
}
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
return new Result<>(
result.getTimestamp(),
new TimeseriesResultValue(values)
);
};
}
}