blob: 12db19543536c47a7f52d1fd8fdca2bad5a6d4d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.query;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.ByteArray;
/**
* Aggregation operator that utilizes dictionary or column metadata for serving aggregation queries to avoid scanning.
* The scanless operator is selected in the plan maker, if the query is of aggregation type min, max, minmaxrange,
* distinctcount, distinctcounthll, distinctcountrawhll, segmentpartitioneddistinctcount, distinctcountsmarthll,
* and the column has a dictionary, or has column metadata with min and max value defined. It also supports count(*) if
* the query has no filter.
* We don't use this operator if the segment has star tree,
* as the dictionary will have aggregated values for the metrics, and dimensions will have star node value.
*
* For min value, we use the first value from the dictionary, falling back to the column metadata min value if there
* is no dictionary.
* For max value we use the last value from dictionary, falling back to the column metadata max value if there
* is no dictionary.
*/
@SuppressWarnings("rawtypes")
public class NonScanBasedAggregationOperator extends BaseOperator<AggregationResultsBlock> {
private static final String EXPLAIN_NAME = "AGGREGATE_NO_SCAN";
private final AggregationFunction[] _aggregationFunctions;
private final DataSource[] _dataSources;
private final int _numTotalDocs;
public NonScanBasedAggregationOperator(AggregationFunction[] aggregationFunctions, DataSource[] dataSources,
int numTotalDocs) {
_aggregationFunctions = aggregationFunctions;
_dataSources = dataSources;
_numTotalDocs = numTotalDocs;
}
@Override
protected AggregationResultsBlock getNextBlock() {
List<Object> aggregationResults = new ArrayList<>(_aggregationFunctions.length);
for (int i = 0; i < _aggregationFunctions.length; i++) {
AggregationFunction aggregationFunction = _aggregationFunctions[i];
// note that dataSource will be null for COUNT, sp do not interact with it until it's known this isn't a COUNT
DataSource dataSource = _dataSources[i];
Object result;
switch (aggregationFunction.getType()) {
case COUNT:
result = (long) _numTotalDocs;
break;
case MIN:
case MINMV:
result = getMinValue(dataSource);
break;
case MAX:
case MAXMV:
result = getMaxValue(dataSource);
break;
case MINMAXRANGE:
case MINMAXRANGEMV:
result = new MinMaxRangePair(getMinValue(dataSource), getMaxValue(dataSource));
break;
case DISTINCTCOUNT:
case DISTINCTCOUNTMV:
result = getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
break;
case DISTINCTCOUNTHLL:
case DISTINCTCOUNTHLLMV:
result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
(DistinctCountHLLAggregationFunction) aggregationFunction);
break;
case DISTINCTCOUNTRAWHLL:
case DISTINCTCOUNTRAWHLLMV:
result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
((DistinctCountRawHLLAggregationFunction) aggregationFunction).getDistinctCountHLLAggregationFunction());
break;
case SEGMENTPARTITIONEDDISTINCTCOUNT:
result = (long) Objects.requireNonNull(dataSource.getDictionary()).length();
break;
case DISTINCTCOUNTSMARTHLL:
result = getDistinctCountSmartHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
(DistinctCountSmartHLLAggregationFunction) aggregationFunction);
break;
default:
throw new IllegalStateException(
"Non-scan based aggregation operator does not support function type: " + aggregationFunction.getType());
}
aggregationResults.add(result);
}
// Build intermediate result block based on aggregation result from the executor.
return new AggregationResultsBlock(_aggregationFunctions, aggregationResults);
}
private static Double getMinValue(DataSource dataSource) {
Dictionary dictionary = dataSource.getDictionary();
if (dictionary != null) {
return toDouble(dictionary.getMinVal());
}
return toDouble(dataSource.getDataSourceMetadata().getMinValue());
}
private static Double getMaxValue(DataSource dataSource) {
Dictionary dictionary = dataSource.getDictionary();
if (dictionary != null) {
return toDouble(dictionary.getMaxVal());
}
return toDouble(dataSource.getDataSourceMetadata().getMaxValue());
}
private static Double toDouble(Comparable<?> value) {
if (value instanceof Double) {
return (Double) value;
} else if (value instanceof Number) {
return ((Number) value).doubleValue();
} else {
return Double.parseDouble(value.toString());
}
}
private static Set getDistinctValueSet(Dictionary dictionary) {
int dictionarySize = dictionary.length();
switch (dictionary.getValueType()) {
case INT:
IntOpenHashSet intSet = new IntOpenHashSet(dictionarySize);
for (int dictId = 0; dictId < dictionarySize; dictId++) {
intSet.add(dictionary.getIntValue(dictId));
}
return intSet;
case LONG:
LongOpenHashSet longSet = new LongOpenHashSet(dictionarySize);
for (int dictId = 0; dictId < dictionarySize; dictId++) {
longSet.add(dictionary.getLongValue(dictId));
}
return longSet;
case FLOAT:
FloatOpenHashSet floatSet = new FloatOpenHashSet(dictionarySize);
for (int dictId = 0; dictId < dictionarySize; dictId++) {
floatSet.add(dictionary.getFloatValue(dictId));
}
return floatSet;
case DOUBLE:
DoubleOpenHashSet doubleSet = new DoubleOpenHashSet(dictionarySize);
for (int dictId = 0; dictId < dictionarySize; dictId++) {
doubleSet.add(dictionary.getDoubleValue(dictId));
}
return doubleSet;
case STRING:
ObjectOpenHashSet<String> stringSet = new ObjectOpenHashSet<>(dictionarySize);
for (int dictId = 0; dictId < dictionarySize; dictId++) {
stringSet.add(dictionary.getStringValue(dictId));
}
return stringSet;
case BYTES:
ObjectOpenHashSet<ByteArray> bytesSet = new ObjectOpenHashSet<>(dictionarySize);
for (int dictId = 0; dictId < dictionarySize; dictId++) {
bytesSet.add(new ByteArray(dictionary.getBytesValue(dictId)));
}
return bytesSet;
default:
throw new IllegalStateException();
}
}
private static HyperLogLog getDistinctValueHLL(Dictionary dictionary, int log2m) {
HyperLogLog hll = new HyperLogLog(log2m);
int length = dictionary.length();
for (int i = 0; i < length; i++) {
hll.offer(dictionary.get(i));
}
return hll;
}
private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
DistinctCountHLLAggregationFunction function) {
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
// Treat BYTES value as serialized HyperLogLog
try {
HyperLogLog hll = ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(dictionary.getBytesValue(0));
int length = dictionary.length();
for (int i = 1; i < length; i++) {
hll.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(dictionary.getBytesValue(i)));
}
return hll;
} catch (Exception e) {
throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
}
} else {
return getDistinctValueHLL(dictionary, function.getLog2m());
}
}
private static Object getDistinctCountSmartHLLResult(Dictionary dictionary,
DistinctCountSmartHLLAggregationFunction function) {
if (dictionary.length() > function.getThreshold()) {
// Store values into a HLL when the dictionary size exceeds the conversion threshold
return getDistinctValueHLL(dictionary, function.getLog2m());
} else {
return getDistinctValueSet(dictionary);
}
}
@Override
public String toExplainString() {
return EXPLAIN_NAME;
}
@Override
public List<Operator> getChildOperators() {
return Collections.emptyList();
}
@Override
public ExecutionStatistics getExecutionStatistics() {
// NOTE: Set numDocsScanned to numTotalDocs for backward compatibility.
return new ExecutionStatistics(_numTotalDocs, 0, 0, _numTotalDocs);
}
}