blob: 843d248221e7105a7f7db40625fdf442d3060812 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
*/
public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Parameters extends TopNParams>
implements TopNAlgorithm<DimValSelector, Parameters>
{
public static Aggregator[] makeAggregators(Cursor cursor, List<AggregatorFactory> aggregatorSpecs)
{
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
int aggregatorIndex = 0;
for (AggregatorFactory spec : aggregatorSpecs) {
aggregators[aggregatorIndex] = spec.factorize(cursor.getColumnSelectorFactory());
++aggregatorIndex;
}
return aggregators;
}
protected static BufferAggregator[] makeBufferAggregators(Cursor cursor, List<AggregatorFactory> aggregatorSpecs)
{
BufferAggregator[] aggregators = new BufferAggregator[aggregatorSpecs.size()];
int aggregatorIndex = 0;
for (AggregatorFactory spec : aggregatorSpecs) {
aggregators[aggregatorIndex] = spec.factorizeBuffered(cursor.getColumnSelectorFactory());
++aggregatorIndex;
}
return aggregators;
}
protected final StorageAdapter storageAdapter;
protected BaseTopNAlgorithm(StorageAdapter storageAdapter)
{
this.storageAdapter = storageAdapter;
}
@Override
public void run(
Parameters params,
TopNResultBuilder resultBuilder,
DimValSelector dimValSelector,
@Nullable TopNQueryMetrics queryMetrics
)
{
if (params.getCardinality() != TopNParams.CARDINALITY_UNKNOWN) {
runWithCardinalityKnown(params, resultBuilder, dimValSelector, queryMetrics);
} else {
runWithCardinalityUnknown(params, resultBuilder, queryMetrics);
}
}
private void runWithCardinalityKnown(
Parameters params,
TopNResultBuilder resultBuilder,
DimValSelector dimValSelector,
@Nullable TopNQueryMetrics queryMetrics
)
{
if (queryMetrics != null) {
queryMetrics.startRecordingScanTime();
}
boolean hasDimValSelector = (dimValSelector != null);
int cardinality = params.getCardinality();
int numProcessed = 0;
long processedRows = 0;
while (numProcessed < cardinality) {
final int numToProcess;
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
DimValSelector theDimValSelector;
if (!hasDimValSelector) {
numToProcess = maxNumToProcess;
theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess);
} else {
//skip invalid, calculate length to have enough valid value to process or hit the end.
numToProcess = computeNewLength(dimValSelector, numProcessed, maxNumToProcess);
theDimValSelector = updateDimValSelector(dimValSelector, numProcessed, numToProcess);
}
DimValAggregateStore aggregatesStore = makeDimValAggregateStore(params);
processedRows = scanAndAggregate(params, theDimValSelector, aggregatesStore);
updateResults(params, theDimValSelector, aggregatesStore, resultBuilder);
closeAggregators(aggregatesStore);
numProcessed += numToProcess;
params.getCursor().reset();
}
if (queryMetrics != null) {
queryMetrics.addProcessedRows(processedRows);
queryMetrics.stopRecordingScanTime();
}
}
/**
* This function currently handles TopNs on long and float columns, which do not provide cardinality or an ID lookup.
* When cardinality is unknown, process everything in one pass.
* Existing implementations of makeDimValSelector() require cardinality as well, so the DimValSelector is not used.
*
* @param params TopN parameters from run()
* @param resultBuilder Result builder from run()
*/
private void runWithCardinalityUnknown(
Parameters params,
TopNResultBuilder resultBuilder,
@Nullable TopNQueryMetrics queryMetrics
)
{
DimValAggregateStore aggregatesStore = makeDimValAggregateStore(params);
if (queryMetrics != null) {
queryMetrics.startRecordingScanTime();
}
long processedRows = scanAndAggregate(params, null, aggregatesStore);
updateResults(params, null, aggregatesStore, resultBuilder);
closeAggregators(aggregatesStore);
params.getCursor().reset();
if (queryMetrics != null) {
queryMetrics.addProcessedRows(processedRows);
queryMetrics.stopRecordingScanTime();
}
}
protected abstract DimValSelector makeDimValSelector(Parameters params, int numProcessed, int numToProcess);
/**
* Skip invalid value, calculate length to have enough valid value to process or hit the end.
*
* @param dimValSelector the dim value selector which record value is valid or invalid.
* @param numProcessed the start position to process
* @param numToProcess the number of valid value to process
*
* @return the length between which have enough valid value to process or hit the end.
*/
protected int computeNewLength(DimValSelector dimValSelector, int numProcessed, int numToProcess)
{
return numToProcess;
}
protected abstract DimValSelector updateDimValSelector(
DimValSelector dimValSelector,
int numProcessed,
int numToProcess
);
protected abstract DimValAggregateStore makeDimValAggregateStore(Parameters params);
/**
* Returns the number of processed rows (i. e. after postFilters are applied inside the cursor being processed).
*/
protected abstract long scanAndAggregate(
Parameters params,
DimValSelector dimValSelector,
DimValAggregateStore dimValAggregateStore
);
protected abstract void updateResults(
Parameters params,
DimValSelector dimValSelector,
DimValAggregateStore dimValAggregateStore,
TopNResultBuilder resultBuilder
);
protected abstract void closeAggregators(
DimValAggregateStore dimValAggregateStore
);
public static class AggregatorArrayProvider extends BaseArrayProvider<Aggregator[][]>
{
Aggregator[][] expansionAggs;
int cardinality;
public AggregatorArrayProvider(
DimensionSelector dimSelector,
TopNQuery query,
int cardinality,
StorageAdapter storageAdapter
)
{
super(dimSelector, query, storageAdapter);
this.expansionAggs = new Aggregator[cardinality][];
this.cardinality = cardinality;
}
@Override
public Aggregator[][] build()
{
Pair<Integer, Integer> startEnd = computeStartEnd(cardinality);
Arrays.fill(expansionAggs, 0, startEnd.lhs, EMPTY_ARRAY);
Arrays.fill(expansionAggs, startEnd.lhs, startEnd.rhs, null);
Arrays.fill(expansionAggs, startEnd.rhs, expansionAggs.length, EMPTY_ARRAY);
return expansionAggs;
}
}
protected abstract static class BaseArrayProvider<T> implements TopNMetricSpecBuilder<T>
{
private volatile String previousStop;
private volatile boolean ignoreAfterThreshold;
private volatile int ignoreFirstN;
private volatile int keepOnlyN;
private final IdLookup idLookup;
private final TopNQuery query;
private final StorageAdapter storageAdapter;
public BaseArrayProvider(
DimensionSelector dimSelector,
TopNQuery query,
StorageAdapter storageAdapter
)
{
this.idLookup = dimSelector.idLookup();
this.query = query;
this.storageAdapter = storageAdapter;
previousStop = null;
ignoreAfterThreshold = false;
ignoreFirstN = 0;
keepOnlyN = dimSelector.getValueCardinality();
if (keepOnlyN < 0) {
throw new IAE("Cannot operate on a dimension with no dictionary");
}
}
@Override
public void skipTo(String previousStop)
{
ColumnCapabilities capabilities = storageAdapter.getColumnCapabilities(query.getDimensionSpec().getDimension());
if (capabilities != null && capabilities.areDictionaryValuesSorted().isTrue()) {
this.previousStop = previousStop;
}
}
@Override
public void ignoreAfterThreshold()
{
ignoreAfterThreshold = true;
}
@Override
public void ignoreFirstN(int n)
{
ignoreFirstN = n;
}
@Override
public void keepOnlyN(int n)
{
keepOnlyN = n;
}
@VisibleForTesting
public Pair<Integer, Integer> computeStartEnd(int cardinality)
{
int startIndex = ignoreFirstN;
if (previousStop != null) {
if (idLookup == null) {
throw new UnsupportedOperationException("Only DimensionSelectors which support idLookup() are supported yet");
}
int lookupId = idLookup.lookupId(previousStop) + 1;
if (lookupId < 0) {
lookupId *= -1;
}
if (lookupId > ignoreFirstN + keepOnlyN) {
startIndex = ignoreFirstN + keepOnlyN;
} else {
startIndex = Math.max(lookupId, startIndex);
}
}
int endIndex = Math.min(ignoreFirstN + keepOnlyN, cardinality);
if (ignoreAfterThreshold &&
query.getDimensionsFilter() == null &&
!storageAdapter.hasBuiltInFilters() &&
query.getIntervals().stream().anyMatch(interval -> interval.contains(storageAdapter.getInterval()))) {
endIndex = Math.min(endIndex, startIndex + query.getThreshold());
}
return Pair.of(startIndex, endIndex);
}
}
public static TopNResultBuilder makeResultBuilder(TopNParams params, TopNQuery query)
{
final Comparator comparator = query.getTopNMetricSpec()
.getComparator(query.getAggregatorSpecs(), query.getPostAggregatorSpecs());
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(),
query.getDimensionSpec(),
query.getThreshold(),
comparator,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
);
}
}