blob: cb91b5b3c33c60fe69b2d0a44ae67d015139afad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
import org.apache.druid.segment.FloatColumnSelector;
import org.apache.druid.segment.LongColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.virtual.ExpressionSelectors;
import org.apache.druid.segment.virtual.ExpressionVectorSelectors;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@PublicApi
public class AggregatorUtil
{
public static final byte STRING_SEPARATOR = (byte) 0xFF;
public static final byte COUNT_CACHE_TYPE_ID = 0x0;
public static final byte LONG_SUM_CACHE_TYPE_ID = 0x1;
public static final byte DOUBLE_SUM_CACHE_TYPE_ID = 0x2;
public static final byte DOUBLE_MAX_CACHE_TYPE_ID = 0x3;
public static final byte DOUBLE_MIN_CACHE_TYPE_ID = 0x4;
public static final byte HYPER_UNIQUE_CACHE_TYPE_ID = 0x5;
public static final byte JS_CACHE_TYPE_ID = 0x6;
public static final byte HIST_CACHE_TYPE_ID = 0x7;
public static final byte CARD_CACHE_TYPE_ID = 0x8;
public static final byte FILTERED_AGG_CACHE_TYPE_ID = 0x9;
public static final byte LONG_MAX_CACHE_TYPE_ID = 0xA;
public static final byte LONG_MIN_CACHE_TYPE_ID = 0xB;
public static final byte FLOAT_SUM_CACHE_TYPE_ID = 0xC;
public static final byte FLOAT_MAX_CACHE_TYPE_ID = 0xD;
public static final byte FLOAT_MIN_CACHE_TYPE_ID = 0xE;
public static final byte SKETCH_MERGE_CACHE_TYPE_ID = 0xF;
public static final byte DISTINCT_COUNT_CACHE_KEY = 0x10;
public static final byte FLOAT_LAST_CACHE_TYPE_ID = 0x11;
public static final byte APPROX_HIST_CACHE_TYPE_ID = 0x12;
public static final byte APPROX_HIST_FOLDING_CACHE_TYPE_ID = 0x13;
public static final byte DOUBLE_FIRST_CACHE_TYPE_ID = 0x14;
public static final byte DOUBLE_LAST_CACHE_TYPE_ID = 0x15;
public static final byte FLOAT_FIRST_CACHE_TYPE_ID = 0x16;
public static final byte LONG_FIRST_CACHE_TYPE_ID = 0x17;
public static final byte LONG_LAST_CACHE_TYPE_ID = 0x18;
public static final byte TIMESTAMP_CACHE_TYPE_ID = 0x19;
public static final byte VARIANCE_CACHE_TYPE_ID = 0x1A;
// Quantiles sketch aggregator
public static final byte QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID = 0x1B;
public static final byte QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID = 0x1C;
public static final byte QUANTILES_DOUBLES_SKETCH_TO_HISTOGRAM_CACHE_TYPE_ID = 0x1D;
public static final byte QUANTILES_DOUBLES_SKETCH_TO_QUANTILE_CACHE_TYPE_ID = 0x1E;
public static final byte QUANTILES_DOUBLES_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 0x1F;
public static final byte QUANTILES_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x20;
// ArrayOfDoublesSketch aggregator
public static final byte ARRAY_OF_DOUBLES_SKETCH_CACHE_TYPE_ID = 0x21;
public static final byte ARRAY_OF_DOUBLES_SKETCH_SET_OP_CACHE_TYPE_ID = 0x22;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_ESTIMATE_CACHE_TYPE_ID = 0x23;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_ESTIMATE_AND_BOUNDS_CACHE_TYPE_ID = 0x24;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_MEANS_CACHE_TYPE_ID = 0x25;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_VARIANCES_CACHE_TYPE_ID = 0x26;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_NUM_ENTRIES_CACHE_TYPE_ID = 0x27;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_QUANTILES_SKETCH_CACHE_TYPE_ID = 0x28;
public static final byte ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID = 0x29;
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x2A;
// StringFirst, StringLast aggregator
public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B;
public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C;
// Suppressed aggregator
public static final byte SUPPRESSED_AGG_CACHE_TYPE_ID = 0x2D;
// HllSketch module in datasketches extension
public static final byte HLL_SKETCH_BUILD_CACHE_TYPE_ID = 0x2E;
public static final byte HLL_SKETCH_MERGE_CACHE_TYPE_ID = 0x2F;
public static final byte HLL_SKETCH_UNION_CACHE_TYPE_ID = 0x30;
public static final byte HLL_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x31;
public static final byte HLL_SKETCH_TO_ESTIMATE_AND_BOUNDS_CACHE_TYPE_ID = 0x32;
// Fixed buckets histogram aggregator
public static final byte FIXED_BUCKET_HIST_CACHE_TYPE_ID = 0x33;
// bloom filter extension
public static final byte BLOOM_FILTER_CACHE_TYPE_ID = 0x34;
public static final byte BLOOM_FILTER_MERGE_CACHE_TYPE_ID = 0x35;
// Quantiles sketch in momentsketch extension
public static final byte MOMENTS_SKETCH_BUILD_CACHE_TYPE_ID = 0x36;
public static final byte MOMENTS_SKETCH_MERGE_CACHE_TYPE_ID = 0x37;
// TDigest sketch aggregators
public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38;
public static final byte MEAN_CACHE_TYPE_ID = 0x41;
// ANY aggregator
public static final byte LONG_ANY_CACHE_TYPE_ID = 0x42;
public static final byte DOUBLE_ANY_CACHE_TYPE_ID = 0x43;
public static final byte FLOAT_ANY_CACHE_TYPE_ID = 0x44;
public static final byte STRING_ANY_CACHE_TYPE_ID = 0x45;
// GROUPING aggregator
public static final byte GROUPING_CACHE_TYPE_ID = 0x46;
// expression lambda aggregator
public static final byte EXPRESSION_LAMBDA_CACHE_TYPE_ID = 0x47;
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
*
* @param postAggregatorList List of postAggregator, there is a restriction that the list should be in an order such
* that all the dependencies of any given aggregator should occur before that aggregator.
* See AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example.
* @param postAggName name of the postAgg on which dependency is to be calculated
*
* @return the list of dependent postAggregators
*/
public static List<PostAggregator> pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String postAggName)
{
ArrayList<PostAggregator> rv = new ArrayList<>();
Set<String> deps = new HashSet<>();
deps.add(postAggName);
// Iterate backwards to find the last calculated aggregate and add dependent aggregator as we find dependencies
// in reverse order
for (PostAggregator agg : Lists.reverse(postAggregatorList)) {
if (deps.contains(agg.getName())) {
rv.add(agg); // add to the beginning of List
deps.remove(agg.getName());
deps.addAll(agg.getDependentFields());
}
}
Collections.reverse(rv);
return rv;
}
public static Pair<List<AggregatorFactory>, List<PostAggregator>> condensedAggregators(
List<AggregatorFactory> aggList,
List<PostAggregator> postAggList,
String metric
)
{
List<PostAggregator> condensedPostAggs = AggregatorUtil.pruneDependentPostAgg(postAggList, metric);
// calculate dependent aggregators for these postAgg
Set<String> dependencySet = new HashSet<>();
dependencySet.add(metric);
for (PostAggregator postAggregator : condensedPostAggs) {
dependencySet.addAll(postAggregator.getDependentFields());
}
List<AggregatorFactory> condensedAggs = new ArrayList<>();
for (AggregatorFactory aggregatorSpec : aggList) {
if (dependencySet.contains(aggregatorSpec.getName())) {
condensedAggs.add(aggregatorSpec);
}
}
return new Pair<>(condensedAggs, condensedPostAggs);
}
/**
* Only one of fieldName and fieldExpression should be non-null
*/
static ColumnValueSelector makeColumnValueSelectorWithFloatDefault(
final ColumnSelectorFactory metricFactory,
@Nullable final String fieldName,
@Nullable final Expr fieldExpression,
final float nullValue
)
{
if ((fieldName == null) == (fieldExpression == null)) {
throw new IllegalArgumentException("Only one of fieldName or expression should be non-null");
}
if (fieldName != null) {
return metricFactory.makeColumnValueSelector(fieldName);
} else {
final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
class ExpressionFloatColumnSelector implements FloatColumnSelector
{
@Override
public float getFloat()
{
// Although baseSelector.getObject is nullable
// exprEval returned from Expression selectors is never null.
final ExprEval exprEval = baseSelector.getObject();
return exprEval.isNumericNull() ? nullValue : (float) exprEval.asDouble();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
}
@Override
public boolean isNull()
{
final ExprEval exprEval = baseSelector.getObject();
return exprEval == null || exprEval.isNumericNull();
}
}
return new ExpressionFloatColumnSelector();
}
}
/**
* Only one of fieldName and fieldExpression should be non-null
*/
static ColumnValueSelector makeColumnValueSelectorWithLongDefault(
final ColumnSelectorFactory metricFactory,
@Nullable final String fieldName,
@Nullable final Expr fieldExpression,
final long nullValue
)
{
if ((fieldName == null) == (fieldExpression == null)) {
throw new IllegalArgumentException("Only one of fieldName and fieldExpression should be non-null");
}
if (fieldName != null) {
return metricFactory.makeColumnValueSelector(fieldName);
} else {
final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
class ExpressionLongColumnSelector implements LongColumnSelector
{
@Override
public long getLong()
{
final ExprEval exprEval = baseSelector.getObject();
return exprEval.isNumericNull() ? nullValue : exprEval.asLong();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
}
@Override
public boolean isNull()
{
final ExprEval exprEval = baseSelector.getObject();
return exprEval == null || exprEval.isNumericNull();
}
}
return new ExpressionLongColumnSelector();
}
}
/**
* Only one of fieldName and fieldExpression should be non-null
*/
static ColumnValueSelector makeColumnValueSelectorWithDoubleDefault(
final ColumnSelectorFactory metricFactory,
@Nullable final String fieldName,
@Nullable final Expr fieldExpression,
final double nullValue
)
{
if ((fieldName == null) == (fieldExpression == null)) {
throw new IllegalArgumentException("Only one of fieldName and fieldExpression should be non-null");
}
if (fieldName != null) {
return metricFactory.makeColumnValueSelector(fieldName);
} else {
final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression);
class ExpressionDoubleColumnSelector implements DoubleColumnSelector
{
@Override
public double getDouble()
{
final ExprEval exprEval = baseSelector.getObject();
return exprEval.isNumericNull() ? nullValue : exprEval.asDouble();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
}
@Override
public boolean isNull()
{
final ExprEval exprEval = baseSelector.getObject();
return exprEval == null || exprEval.isNumericNull();
}
}
return new ExpressionDoubleColumnSelector();
}
}
public static boolean canVectorize(
ColumnInspector columnInspector,
@Nullable String fieldName,
@Nullable String expression,
Supplier<Expr> fieldExpression
)
{
if (fieldName != null) {
final ColumnCapabilities capabilities = columnInspector.getColumnCapabilities(fieldName);
return capabilities == null || ValueType.isNumeric(capabilities.getType());
}
if (expression != null) {
return fieldExpression.get().canVectorize(columnInspector);
}
return false;
}
/**
* Make a {@link VectorValueSelector} for primitive numeric or expression virtual column inputs.
*/
public static VectorValueSelector makeVectorValueSelector(
VectorColumnSelectorFactory columnSelectorFactory,
@Nullable String fieldName,
@Nullable String expression,
Supplier<Expr> fieldExpression
)
{
if ((fieldName == null) == (expression == null)) {
throw new IllegalArgumentException("Only one of fieldName or expression should be non-null");
}
if (expression != null) {
return ExpressionVectorSelectors.makeVectorValueSelector(columnSelectorFactory, fieldExpression.get());
}
return columnSelectorFactory.makeValueSelector(fieldName);
}
public static Supplier<byte[]> getSimpleAggregatorCacheKeySupplier(
byte aggregatorType,
String fieldName,
Supplier<Expr> fieldExpression
)
{
return Suppliers.memoize(() -> {
byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
byte[] expressionBytes = Optional.ofNullable(fieldExpression.get())
.map(Expr::getCacheKey)
.orElse(StringUtils.EMPTY_BYTES);
return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
.put(aggregatorType)
.put(fieldNameBytes)
.put(AggregatorUtil.STRING_SEPARATOR)
.put(expressionBytes)
.array();
});
}
}