blob: d0c0fb064e031fafcd8d25db09962422f7af1da7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleDoubleBufferAggregator;
import org.apache.druid.query.monomorphicprocessing.SpecializationService;
import org.apache.druid.query.monomorphicprocessing.SpecializationState;
import org.apache.druid.query.monomorphicprocessing.StringRuntimeShape;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.FilteredOffset;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.Offset;
import org.apache.druid.segment.historical.HistoricalColumnSelector;
import org.apache.druid.segment.historical.HistoricalCursor;
import org.apache.druid.segment.historical.HistoricalDimensionSelector;
import org.apache.druid.segment.historical.SingleValueHistoricalDimensionSelector;
import org.apache.druid.utils.CloseableUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* This {@link TopNAlgorithm} is highly specialized for processing aggregates on string columns that are
* {@link ColumnCapabilities#isDictionaryEncoded()} and {@link ColumnCapabilities#areDictionaryValuesUnique()}. This
* algorithm is built around using a direct {@link ByteBuffer} from the 'processing pool' of intermediary results
* buffers, to aggregate using the dictionary id directly as the key, to defer looking up the value until is necessary.
*
* At runtime, this implementation is specialized with wizardry to optimize for processing common top-n query shapes,
* see {@link #computeSpecializedScanAndAggregateImplementations},
* {@link Generic1AggPooledTopNScanner} and {@link Generic1AggPooledTopNScannerPrototype},
* {@link Generic2AggPooledTopNScanner} and {@link Generic2AggPooledTopNScannerPrototype},
* {@link org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop},
* {@link org.apache.druid.query.monomorphicprocessing.HotLoopCallee},
* {@link org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector} for more details.
*/
public class PooledTopNAlgorithm
extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
{
private static boolean SPECIALIZE_GENERIC_ONE_AGG_POOLED_TOPN =
!Boolean.getBoolean("dontSpecializeGeneric1AggPooledTopN");
private static boolean SPECIALIZE_GENERIC_TWO_AGG_POOLED_TOPN =
!Boolean.getBoolean("dontSpecializeGeneric2AggPooledTopN");
private static boolean SPECIALIZE_HISTORICAL_ONE_SIMPLE_DOUBLE_AGG_POOLED_TOPN =
!Boolean.getBoolean("dontSpecializeHistorical1SimpleDoubleAggPooledTopN");
private static boolean SPECIALIZE_HISTORICAL_SINGLE_VALUE_DIM_SELECTOR_ONE_SIMPLE_DOUBLE_AGG_POOLED_TOPN =
!Boolean.getBoolean("dontSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN");
/**
* See TopNQueryRunnerTest
*/
@VisibleForTesting
static void setSpecializeGeneric1AggPooledTopN(boolean value)
{
PooledTopNAlgorithm.SPECIALIZE_GENERIC_ONE_AGG_POOLED_TOPN = value;
computeSpecializedScanAndAggregateImplementations();
}
@VisibleForTesting
static void setSpecializeGeneric2AggPooledTopN(boolean value)
{
PooledTopNAlgorithm.SPECIALIZE_GENERIC_TWO_AGG_POOLED_TOPN = value;
computeSpecializedScanAndAggregateImplementations();
}
@VisibleForTesting
static void setSpecializeHistorical1SimpleDoubleAggPooledTopN(boolean value)
{
PooledTopNAlgorithm.SPECIALIZE_HISTORICAL_ONE_SIMPLE_DOUBLE_AGG_POOLED_TOPN = value;
computeSpecializedScanAndAggregateImplementations();
}
@VisibleForTesting
static void setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN(boolean value)
{
PooledTopNAlgorithm.SPECIALIZE_HISTORICAL_SINGLE_VALUE_DIM_SELECTOR_ONE_SIMPLE_DOUBLE_AGG_POOLED_TOPN = value;
computeSpecializedScanAndAggregateImplementations();
}
private static final Generic1AggPooledTopNScanner DEFAULT_GENERIC_ONE_AGG_SCANNER =
new Generic1AggPooledTopNScannerPrototype();
private static final Generic2AggPooledTopNScanner DEFAULT_GENERIC_TWO_AGG_SCANNER =
new Generic2AggPooledTopNScannerPrototype();
private static final Historical1AggPooledTopNScanner DEFAULT_HISTORICAL_ONE_SIMPLE_DOUBLE_AGG_SCANNER =
new Historical1SimpleDoubleAggPooledTopNScannerPrototype();
private static final Historical1AggPooledTopNScanner DEFAULT_HISTORICAL_SINGLE_VALUE_DIM_SELECTOR_ONE_SIMPLE_DOUBLE_AGG_SCANNER =
new HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype();
private interface ScanAndAggregate
{
/**
* If this implementation of ScanAndAggregate is executable with the given parameters, run it and return the number
* of processed rows. Otherwise return -1 (scanning and aggregation is not done).
*/
long scanAndAggregate(
PooledTopNParams params,
int[] positions,
BufferAggregator[] theAggregators
);
}
private static final List<ScanAndAggregate> SPECIALIZED_SCAN_AND_AGGREGATE_IMPLEMENTATIONS = new ArrayList<>();
static {
computeSpecializedScanAndAggregateImplementations();
}
private static void computeSpecializedScanAndAggregateImplementations()
{
SPECIALIZED_SCAN_AND_AGGREGATE_IMPLEMENTATIONS.clear();
// The order of the following `if` blocks matters, "more specialized" implementations go first
if (SPECIALIZE_HISTORICAL_SINGLE_VALUE_DIM_SELECTOR_ONE_SIMPLE_DOUBLE_AGG_POOLED_TOPN) {
SPECIALIZED_SCAN_AND_AGGREGATE_IMPLEMENTATIONS.add((params, positions, theAggregators) -> {
if (theAggregators.length == 1) {
BufferAggregator aggregator = theAggregators[0];
final Cursor cursor = params.getCursor();
if (cursor instanceof HistoricalCursor &&
// FilteredOffset.clone() is not supported. This condition should be removed if
// HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype
// doesn't clone offset anymore.
!(((HistoricalCursor) cursor).getOffset() instanceof FilteredOffset) &&
aggregator instanceof SimpleDoubleBufferAggregator &&
params.getDimSelector() instanceof SingleValueHistoricalDimensionSelector &&
((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalColumnSelector) {
return scanAndAggregateHistorical1SimpleDoubleAgg(
params,
positions,
(SimpleDoubleBufferAggregator) aggregator,
(HistoricalCursor) cursor,
DEFAULT_HISTORICAL_SINGLE_VALUE_DIM_SELECTOR_ONE_SIMPLE_DOUBLE_AGG_SCANNER
);
}
}
return -1;
});
}
if (SPECIALIZE_HISTORICAL_ONE_SIMPLE_DOUBLE_AGG_POOLED_TOPN) {
SPECIALIZED_SCAN_AND_AGGREGATE_IMPLEMENTATIONS.add((params, positions, theAggregators) -> {
if (theAggregators.length == 1) {
BufferAggregator aggregator = theAggregators[0];
final Cursor cursor = params.getCursor();
if (cursor instanceof HistoricalCursor &&
// FilteredOffset.clone() is not supported. This condition should be removed if
// Historical1SimpleDoubleAggPooledTopNScannerPrototype
// doesn't clone offset anymore.
!(((HistoricalCursor) cursor).getOffset() instanceof FilteredOffset) &&
aggregator instanceof SimpleDoubleBufferAggregator &&
params.getDimSelector() instanceof HistoricalDimensionSelector &&
((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalColumnSelector) {
return scanAndAggregateHistorical1SimpleDoubleAgg(
params,
positions,
(SimpleDoubleBufferAggregator) aggregator,
(HistoricalCursor) cursor,
DEFAULT_HISTORICAL_ONE_SIMPLE_DOUBLE_AGG_SCANNER
);
}
}
return -1;
});
}
if (SPECIALIZE_GENERIC_ONE_AGG_POOLED_TOPN) {
SPECIALIZED_SCAN_AND_AGGREGATE_IMPLEMENTATIONS.add((params, positions, theAggregators) -> {
if (theAggregators.length == 1) {
return scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], params.getCursor());
}
return -1;
});
}
if (SPECIALIZE_GENERIC_TWO_AGG_POOLED_TOPN) {
SPECIALIZED_SCAN_AND_AGGREGATE_IMPLEMENTATIONS.add((params, positions, theAggregators) -> {
if (theAggregators.length == 2) {
return scanAndAggregateGeneric2Agg(params, positions, theAggregators, params.getCursor());
}
return -1;
});
}
}
private final TopNQuery query;
private final NonBlockingPool<ByteBuffer> bufferPool;
private static final int AGG_UNROLL_COUNT = 8; // Must be able to fit loop below
public PooledTopNAlgorithm(
StorageAdapter storageAdapter,
TopNQuery query,
NonBlockingPool<ByteBuffer> bufferPool
)
{
super(storageAdapter);
this.query = query;
this.bufferPool = bufferPool;
}
@Override
public PooledTopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor)
{
final DimensionSelector dimSelector = (DimensionSelector) selectorPlus.getSelector();
final int cardinality = dimSelector.getValueCardinality();
if (cardinality < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with no dictionary");
}
final TopNMetricSpecBuilder<int[]> arrayProvider = new BaseArrayProvider<int[]>(
dimSelector,
query,
storageAdapter
)
{
private final int[] positions = new int[cardinality];
@Override
public int[] build()
{
Pair<Integer, Integer> startEnd = computeStartEnd(cardinality);
Arrays.fill(positions, 0, startEnd.lhs, SKIP_POSITION_VALUE);
Arrays.fill(positions, startEnd.lhs, startEnd.rhs, INIT_POSITION_VALUE);
Arrays.fill(positions, startEnd.rhs, positions.length, SKIP_POSITION_VALUE);
return positions;
}
};
final ResourceHolder<ByteBuffer> resultsBufHolder = bufferPool.take();
try {
final ByteBuffer resultsBuf = resultsBufHolder.get();
resultsBuf.clear();
final int numBytesToWorkWith = resultsBuf.remaining();
final int[] aggregatorSizes = new int[query.getAggregatorSpecs().size()];
int numBytesPerRecord = 0;
for (int i = 0; i < query.getAggregatorSpecs().size(); ++i) {
aggregatorSizes[i] = query.getAggregatorSpecs().get(i).getMaxIntermediateSizeWithNulls();
numBytesPerRecord += aggregatorSizes[i];
}
final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality;
return PooledTopNParams.builder()
.withSelectorPlus(selectorPlus)
.withCursor(cursor)
.withResultsBufHolder(resultsBufHolder)
.withResultsBuf(resultsBuf)
.withArrayProvider(arrayProvider)
.withNumBytesPerRecord(numBytesPerRecord)
.withNumValuesPerPass(numValuesPerPass)
.withAggregatorSizes(aggregatorSizes)
.build();
}
catch (Throwable e) {
resultsBufHolder.close();
throw e;
}
}
@Override
protected int[] makeDimValSelector(PooledTopNParams params, int numProcessed, int numToProcess)
{
final TopNMetricSpecBuilder<int[]> arrayProvider = params.getArrayProvider();
if (!query.getDimensionSpec().preservesOrdering()) {
return arrayProvider.build();
}
arrayProvider.ignoreFirstN(numProcessed);
arrayProvider.keepOnlyN(numToProcess);
return query.getTopNMetricSpec().configureOptimizer(arrayProvider).build();
}
@Override
protected int computeNewLength(int[] dimValSelector, int numProcessed, int numToProcess)
{
int valid = 0;
int length = 0;
for (int i = numProcessed; i < dimValSelector.length && valid < numToProcess; i++) {
length++;
if (SKIP_POSITION_VALUE != dimValSelector[i]) {
valid++;
}
}
return length;
}
@Override
protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int numToProcess)
{
final int[] retVal = Arrays.copyOf(dimValSelector, dimValSelector.length);
final int validEnd = Math.min(retVal.length, numProcessed + numToProcess);
final int end = Math.max(retVal.length, validEnd);
Arrays.fill(retVal, 0, numProcessed, SKIP_POSITION_VALUE);
Arrays.fill(retVal, validEnd, end, SKIP_POSITION_VALUE);
return retVal;
}
@Override
protected BufferAggregator[] makeDimValAggregateStore(PooledTopNParams params)
{
return makeBufferAggregators(params.getCursor(), query.getAggregatorSpecs());
}
@Override
protected long scanAndAggregate(
final PooledTopNParams params,
final int[] positions,
final BufferAggregator[] theAggregators
)
{
for (ScanAndAggregate specializedScanAndAggregate : SPECIALIZED_SCAN_AND_AGGREGATE_IMPLEMENTATIONS) {
long processedRows = specializedScanAndAggregate.scanAndAggregate(params, positions, theAggregators);
if (processedRows >= 0) {
BaseQuery.checkInterrupted();
return processedRows;
}
}
long processedRows = scanAndAggregateDefault(params, positions, theAggregators);
BaseQuery.checkInterrupted();
return processedRows;
}
private static long scanAndAggregateHistorical1SimpleDoubleAgg(
PooledTopNParams params,
int[] positions,
SimpleDoubleBufferAggregator aggregator,
HistoricalCursor cursor,
Historical1AggPooledTopNScanner prototypeScanner
)
{
String runtimeShape = StringRuntimeShape.of(aggregator);
SpecializationState<Historical1AggPooledTopNScanner> specializationState =
SpecializationService.getSpecializationState(
prototypeScanner.getClass(),
runtimeShape,
ImmutableMap.of(Offset.class, cursor.getOffset().getClass())
);
Historical1AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(prototypeScanner);
long processedRows = scanner.scanAndAggregate(
(HistoricalDimensionSelector) params.getDimSelector(),
aggregator.getSelector(),
aggregator,
params.getAggregatorSizes()[0],
cursor,
positions,
params.getResultsBuf()
);
specializationState.accountLoopIterations(processedRows);
return processedRows;
}
private static long scanAndAggregateGeneric1Agg(
PooledTopNParams params,
int[] positions,
BufferAggregator aggregator,
Cursor cursor
)
{
String runtimeShape = StringRuntimeShape.of(aggregator);
Class<? extends Generic1AggPooledTopNScanner> prototypeClass = Generic1AggPooledTopNScannerPrototype.class;
SpecializationState<Generic1AggPooledTopNScanner> specializationState = SpecializationService
.getSpecializationState(prototypeClass, runtimeShape);
Generic1AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(DEFAULT_GENERIC_ONE_AGG_SCANNER);
long processedRows = scanner.scanAndAggregate(
params.getDimSelector(),
aggregator,
params.getAggregatorSizes()[0],
cursor,
positions,
params.getResultsBuf()
);
specializationState.accountLoopIterations(processedRows);
return processedRows;
}
private static long scanAndAggregateGeneric2Agg(
PooledTopNParams params,
int[] positions,
BufferAggregator[] theAggregators,
Cursor cursor
)
{
String runtimeShape = StringRuntimeShape.of(theAggregators);
Class<? extends Generic2AggPooledTopNScanner> prototypeClass = Generic2AggPooledTopNScannerPrototype.class;
SpecializationState<Generic2AggPooledTopNScanner> specializationState = SpecializationService
.getSpecializationState(prototypeClass, runtimeShape);
Generic2AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(DEFAULT_GENERIC_TWO_AGG_SCANNER);
int[] aggregatorSizes = params.getAggregatorSizes();
long processedRows = scanner.scanAndAggregate(
params.getDimSelector(),
theAggregators[0],
aggregatorSizes[0],
theAggregators[1],
aggregatorSizes[1],
cursor,
positions,
params.getResultsBuf()
);
specializationState.accountLoopIterations(processedRows);
return processedRows;
}
/**
* Use aggressive loop unrolling to aggregate the data
*
* How this works: The aggregates are evaluated AGG_UNROLL_COUNT at a time. This was chosen to be 8 rather arbitrarily.
* The offsets into the output buffer are precalculated and stored in aggregatorOffsets
*
* For queries whose aggregate count is less than AGG_UNROLL_COUNT, the aggregates evaluted in a switch statement.
* See http://en.wikipedia.org/wiki/Duff's_device for more information on this kind of approach
*
* This allows out of order execution of the code. In local tests, the JVM inlines all the way to this function.
*
* If there are more than AGG_UNROLL_COUNT aggregates, then the remainder is calculated with the switch, and the
* blocks of AGG_UNROLL_COUNT are calculated in a partially unrolled for-loop.
*
* Putting the switch first allows for optimization for the common case (less than AGG_UNROLL_COUNT aggs) but
* still optimizes the high quantity of aggregate queries which benefit greatly from any speed improvements
* (they simply take longer to start with).
*/
private static long scanAndAggregateDefault(
final PooledTopNParams params,
final int[] positions,
final BufferAggregator[] theAggregators
)
{
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
final ByteBuffer resultsBuf = params.getResultsBuf();
final int numBytesPerRecord = params.getNumBytesPerRecord();
final int[] aggregatorSizes = params.getAggregatorSizes();
final Cursor cursor = params.getCursor();
final DimensionSelector dimSelector = params.getDimSelector();
final int[] aggregatorOffsets = new int[aggregatorSizes.length];
for (int j = 0, offset = 0; j < aggregatorSizes.length; ++j) {
aggregatorOffsets[j] = offset;
offset += aggregatorSizes[j];
}
final int aggSize = theAggregators.length;
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
int currentPosition = 0;
long processedRows = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimSelector.getRow();
final int dimSize = dimValues.size();
final int dimExtra = dimSize % AGG_UNROLL_COUNT;
switch (dimExtra) {
case 7:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(6),
currentPosition
);
// fall through
case 6:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(5),
currentPosition
);
// fall through
case 5:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(4),
currentPosition
);
// fall through
case 4:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(3),
currentPosition
);
// fall through
case 3:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(2),
currentPosition
);
// fall through
case 2:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(1),
currentPosition
);
// fall through
case 1:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(0),
currentPosition
);
}
for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 1),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 2),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 3),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 4),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 5),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 6),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 7),
currentPosition
);
}
cursor.advanceUninterruptibly();
processedRows++;
}
return processedRows;
}
/**
* Returns a new currentPosition, incremented if a new position was initialized, otherwise the same position as passed
* in the last argument.
*/
private static int aggregateDimValue(
final int[] positions,
final BufferAggregator[] theAggregators,
final ByteBuffer resultsBuf,
final int numBytesPerRecord,
final int[] aggregatorOffsets,
final int aggSize,
final int aggExtra,
final int dimIndex,
int currentPosition
)
{
if (SKIP_POSITION_VALUE == positions[dimIndex]) {
return currentPosition;
}
if (INIT_POSITION_VALUE == positions[dimIndex]) {
positions[dimIndex] = currentPosition * numBytesPerRecord;
currentPosition++;
final int pos = positions[dimIndex];
for (int j = 0; j < aggSize; ++j) {
theAggregators[j].init(resultsBuf, pos + aggregatorOffsets[j]);
}
}
final int position = positions[dimIndex];
switch (aggExtra) {
case 7:
theAggregators[6].aggregate(resultsBuf, position + aggregatorOffsets[6]);
// fall through
case 6:
theAggregators[5].aggregate(resultsBuf, position + aggregatorOffsets[5]);
// fall through
case 5:
theAggregators[4].aggregate(resultsBuf, position + aggregatorOffsets[4]);
// fall through
case 4:
theAggregators[3].aggregate(resultsBuf, position + aggregatorOffsets[3]);
// fall through
case 3:
theAggregators[2].aggregate(resultsBuf, position + aggregatorOffsets[2]);
// fall through
case 2:
theAggregators[1].aggregate(resultsBuf, position + aggregatorOffsets[1]);
// fall through
case 1:
theAggregators[0].aggregate(resultsBuf, position + aggregatorOffsets[0]);
}
for (int j = aggExtra; j < aggSize; j += AGG_UNROLL_COUNT) {
theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]);
theAggregators[j + 1].aggregate(resultsBuf, position + aggregatorOffsets[j + 1]);
theAggregators[j + 2].aggregate(resultsBuf, position + aggregatorOffsets[j + 2]);
theAggregators[j + 3].aggregate(resultsBuf, position + aggregatorOffsets[j + 3]);
theAggregators[j + 4].aggregate(resultsBuf, position + aggregatorOffsets[j + 4]);
theAggregators[j + 5].aggregate(resultsBuf, position + aggregatorOffsets[j + 5]);
theAggregators[j + 6].aggregate(resultsBuf, position + aggregatorOffsets[j + 6]);
theAggregators[j + 7].aggregate(resultsBuf, position + aggregatorOffsets[j + 7]);
}
return currentPosition;
}
@Override
protected void updateResults(
PooledTopNParams params,
int[] positions,
BufferAggregator[] theAggregators,
TopNResultBuilder resultBuilder
)
{
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
final ByteBuffer resultsBuf = params.getResultsBuf();
final int[] aggregatorSizes = params.getAggregatorSizes();
final DimensionSelector dimSelector = params.getDimSelector();
for (int i = 0; i < positions.length; i++) {
int position = positions[i];
if (position >= 0) {
Object[] vals = new Object[theAggregators.length];
for (int j = 0; j < theAggregators.length; j++) {
vals[j] = theAggregators[j].get(resultsBuf, position);
position += aggregatorSizes[j];
}
// Output type must be STRING in order for PooledTopNAlgorithm to make sense; so no need to convert value.
resultBuilder.addEntry(dimSelector.lookupName(i), i, vals);
}
}
}
@Override
protected void resetAggregators(BufferAggregator[] bufferAggregators)
{
for (BufferAggregator agg : bufferAggregators) {
agg.close();
}
}
@Override
public void cleanup(PooledTopNParams params)
{
if (params != null) {
ResourceHolder<ByteBuffer> resultsBufHolder = params.getResultsBufHolder();
if (resultsBufHolder != null) {
resultsBufHolder.get().clear();
}
CloseableUtils.closeAndWrapExceptions(resultsBufHolder);
}
}
public static class PooledTopNParams extends TopNParams
{
private final ResourceHolder<ByteBuffer> resultsBufHolder;
private final ByteBuffer resultsBuf;
private final int[] aggregatorSizes;
private final int numBytesPerRecord;
private final TopNMetricSpecBuilder<int[]> arrayProvider;
public PooledTopNParams(
ColumnSelectorPlus selectorPlus,
Cursor cursor,
ResourceHolder<ByteBuffer> resultsBufHolder,
ByteBuffer resultsBuf,
int[] aggregatorSizes,
int numBytesPerRecord,
int numValuesPerPass,
TopNMetricSpecBuilder<int[]> arrayProvider
)
{
super(selectorPlus, cursor, numValuesPerPass);
this.resultsBufHolder = resultsBufHolder;
this.resultsBuf = resultsBuf;
this.aggregatorSizes = aggregatorSizes;
this.numBytesPerRecord = numBytesPerRecord;
this.arrayProvider = arrayProvider;
}
public static Builder builder()
{
return new Builder();
}
public ResourceHolder<ByteBuffer> getResultsBufHolder()
{
return resultsBufHolder;
}
public ByteBuffer getResultsBuf()
{
return resultsBuf;
}
public int[] getAggregatorSizes()
{
return aggregatorSizes;
}
public int getNumBytesPerRecord()
{
return numBytesPerRecord;
}
public TopNMetricSpecBuilder<int[]> getArrayProvider()
{
return arrayProvider;
}
public static class Builder
{
private ColumnSelectorPlus selectorPlus;
private Cursor cursor;
private ResourceHolder<ByteBuffer> resultsBufHolder;
private ByteBuffer resultsBuf;
private int[] aggregatorSizes;
private int numBytesPerRecord;
private int numValuesPerPass;
private TopNMetricSpecBuilder<int[]> arrayProvider;
public Builder withSelectorPlus(ColumnSelectorPlus selectorPlus)
{
this.selectorPlus = selectorPlus;
return this;
}
public Builder withCursor(Cursor cursor)
{
this.cursor = cursor;
return this;
}
public Builder withResultsBufHolder(ResourceHolder<ByteBuffer> resultsBufHolder)
{
this.resultsBufHolder = resultsBufHolder;
return this;
}
public Builder withResultsBuf(ByteBuffer resultsBuf)
{
this.resultsBuf = resultsBuf;
return this;
}
public Builder withAggregatorSizes(int[] aggregatorSizes)
{
this.aggregatorSizes = aggregatorSizes;
return this;
}
public Builder withNumBytesPerRecord(int numBytesPerRecord)
{
this.numBytesPerRecord = numBytesPerRecord;
return this;
}
public Builder withNumValuesPerPass(int numValuesPerPass)
{
this.numValuesPerPass = numValuesPerPass;
return this;
}
public Builder withArrayProvider(TopNMetricSpecBuilder<int[]> arrayProvider)
{
this.arrayProvider = arrayProvider;
return this;
}
public PooledTopNParams build()
{
return new PooledTopNParams(
selectorPlus,
cursor,
resultsBufHolder,
resultsBuf,
aggregatorSizes,
numBytesPerRecord,
numValuesPerPass,
arrayProvider
);
}
}
}
}